Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spp_programs/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"name": "OpenSPP Programs",
"summary": "Manage programs, cycles, beneficiary enrollment, entitlements (cash and in-kind), payments, and fund tracking for social protection.",
"category": "OpenSPP/Core",
"version": "19.0.2.0.6",
"version": "19.0.2.0.7",
"sequence": 1,
"author": "OpenSPP.org",
"website": "https://github.com/OpenSPP/OpenSPP2",
Expand Down
16 changes: 13 additions & 3 deletions spp_programs/models/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,9 @@ def _get_beneficiaries_domain(self, states=None):
return domain

@api.model
def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False, last_id=None):
def get_beneficiaries(
self, state, offset=0, limit=None, order=None, count=False, last_id=None, min_id=None, max_id=None
):
"""
Get beneficiaries by state with pagination support.

Expand All @@ -624,9 +626,12 @@ def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False
:param order: Sort order
:param count: If True, return count instead of records
:param last_id: For cursor-based pagination - ID of last record from previous batch (more efficient)
:param min_id: For ID-range pagination - minimum record ID (inclusive)
:param max_id: For ID-range pagination - maximum record ID (inclusive)
:return: Recordset or count

Note: For large datasets, use cursor-based pagination with last_id parameter instead of offset.
Note: For large datasets, prefer min_id/max_id (ID-range) or last_id (cursor)
pagination over offset-based pagination.
"""
if isinstance(state, str):
state = [state]
Expand All @@ -635,7 +640,12 @@ def get_beneficiaries(self, state, offset=0, limit=None, order=None, count=False
if count:
return self.env["spp.cycle.membership"].search_count(domain, limit=limit)

# Use cursor-based pagination if last_id is provided (more efficient)
# ID-range pagination (best for parallel job dispatch)
if min_id is not None and max_id is not None:
domain = domain + [("id", ">=", min_id), ("id", "<=", max_id)]
return self.env["spp.cycle.membership"].search(domain, order=order or "id")

# Cursor-based pagination (good for sequential iteration)
if last_id is not None:
domain = domain + [("id", ">", last_id)]
return self.env["spp.cycle.membership"].search(domain, limit=limit, order=order or "id")
Expand Down
10 changes: 6 additions & 4 deletions spp_programs/models/managers/cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ def mark_prepare_entitlement_as_done(self, cycle, msg):
cycle._compute_total_entitlements_count()
return

def _prepare_entitlements(self, cycle, offset=0, limit=None, do_count=False):
def _prepare_entitlements(self, cycle, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
"""Prepare Entitlements
Get the beneficiaries and generate their entitlements.

:param cycle: The cycle
:param offset: Optional integer value for the ORM search offset
:param limit: Optional integer value for the ORM search limit
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
:param min_id: Minimum record ID for ID-range pagination (inclusive)
:param max_id: Maximum record ID for ID-range pagination (inclusive)
:param do_count: Boolean - set to False to not run compute function
:return:
"""
super()._prepare_entitlements(cycle, offset, limit, do_count)
super()._prepare_entitlements(cycle, offset, limit, min_id=min_id, max_id=max_id, do_count=do_count)
if do_count:
# Update Statistics
cycle._compute_inkind_entitlements_count()
Expand Down
46 changes: 35 additions & 11 deletions spp_programs/models/managers/cycle_manager_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from odoo.addons.job_worker.delay import group

from .. import constants
from .pagination_utils import compute_id_ranges

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -515,21 +516,32 @@ def _check_eligibility_async(self, cycle, beneficiaries_count):
cycle.message_post(body=_("Eligibility check of %s beneficiaries started.", beneficiaries_count))
cycle.write({"is_locked": True, "locked_reason": "Eligibility check of beneficiaries"})

states = ("draft", "enrolled", "not_eligible")
id_ranges = compute_id_ranges(
self.env.cr,
"spp_cycle_membership",
"cycle_id = %s AND state IN %s",
(cycle.id, states),
self.MAX_ROW_JOB_QUEUE,
)

jobs = []
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(
self.delayable(channel="cycle")._check_eligibility(cycle, offset=i, limit=self.MAX_ROW_JOB_QUEUE)
)
for min_id, max_id in id_ranges:
jobs.append(self.delayable(channel="cycle")._check_eligibility(cycle, min_id=min_id, max_id=max_id))
main_job = group(*jobs)
main_job.on_done(self.delayable(channel="cycle").mark_check_eligibility_as_done(cycle))
main_job.delay()

def _check_eligibility(self, cycle, beneficiaries=None, offset=0, limit=None, do_count=False):
def _check_eligibility(
self, cycle, beneficiaries=None, offset=0, limit=None, min_id=None, max_id=None, do_count=False
):
if beneficiaries is None:
beneficiaries = cycle.get_beneficiaries(
["draft", "enrolled", "not_eligible"],
offset=offset,
limit=limit,
min_id=min_id,
max_id=max_id,
order="id",
)

Expand Down Expand Up @@ -585,26 +597,38 @@ def _prepare_entitlements_async(self, cycle, beneficiaries_count):
}
)

id_ranges = compute_id_ranges(
self.env.cr,
"spp_cycle_membership",
"cycle_id = %s AND state IN %s",
(cycle.id, ("enrolled",)),
self.MAX_ROW_JOB_QUEUE,
)

jobs = []
for i in range(0, beneficiaries_count, self.MAX_ROW_JOB_QUEUE):
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, i, self.MAX_ROW_JOB_QUEUE))
for min_id, max_id in id_ranges:
jobs.append(self.delayable(channel="cycle")._prepare_entitlements(cycle, min_id=min_id, max_id=max_id))
main_job = group(*jobs)
main_job.on_done(
self.delayable(channel="cycle").mark_prepare_entitlement_as_done(cycle, _("Entitlement Ready."))
)
main_job.delay()

def _prepare_entitlements(self, cycle, offset=0, limit=None, do_count=False):
def _prepare_entitlements(self, cycle, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
"""Prepare Entitlements
Get the beneficiaries and generate their entitlements.

:param cycle: The cycle
:param offset: Optional integer value for the ORM search offset
:param limit: Optional integer value for the ORM search limit
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
:param min_id: Minimum record ID for ID-range pagination (inclusive)
:param max_id: Maximum record ID for ID-range pagination (inclusive)
:param do_count: Boolean - set to False to not run compute function
:return:
"""
beneficiaries = cycle.get_beneficiaries(["enrolled"], offset=offset, limit=limit, order="id")
beneficiaries = cycle.get_beneficiaries(
["enrolled"], offset=offset, limit=limit, min_id=min_id, max_id=max_id, order="id"
)
ent_manager = self.program_id.get_manager(constants.MANAGER_ENTITLEMENT)
if not ent_manager:
raise UserError(_("No Entitlement Manager defined."))
Expand Down
63 changes: 63 additions & 0 deletions spp_programs/models/managers/pagination_utils.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kneckinator is this really not intended to be added to the init.py?

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
"""Keyset pagination utilities for async job dispatch.

OFFSET-based pagination causes PostgreSQL to scan and discard N rows for
OFFSET N, making later batches progressively slower (O(N) per batch).

This module provides ID-range batching using the NTILE window function,
which pre-computes (min_id, max_id) boundaries in a single query. Each
job then uses WHERE id BETWEEN min_id AND max_id, which is O(1) via the
primary key index regardless of batch position.
"""

import math


def compute_id_ranges(cr, table, where_clause, params, batch_size):
"""Compute ID-range boundaries for parallel job dispatch.

Uses PostgreSQL's NTILE window function to split matching rows into
roughly equal-sized buckets, then returns the (min_id, max_id) of each.

:param cr: Database cursor
:param table: Table name (e.g. 'spp_program_membership')
:param where_clause: SQL WHERE clause without 'WHERE' keyword
(e.g. 'program_id = %s AND state IN %s')
:param params: Tuple of parameters for the WHERE clause
:param batch_size: Target number of rows per batch
:return: List of (min_id, max_id) tuples, ordered by min_id
"""
# Get total count to calculate number of batches
cr.execute(
f"SELECT COUNT(*) FROM {table} WHERE {where_clause}", # noqa: S608 # nosec B608
params,
)
total = cr.fetchone()[0]
if total == 0:
return []

num_batches = math.ceil(total / batch_size)
if num_batches <= 1:
cr.execute(
f"SELECT MIN(id), MAX(id) FROM {table} WHERE {where_clause}", # noqa: S608 # nosec B608
params,
)
row = cr.fetchone()
return [(row[0], row[1])] if row and row[0] is not None else []

# Use NTILE to split rows into equal-sized buckets, then get
# the min/max ID of each bucket as the range boundaries.
cr.execute(
f"""
SELECT MIN(id) AS min_id, MAX(id) AS max_id
FROM (
SELECT id, NTILE(%s) OVER (ORDER BY id) AS tile
FROM {table}
WHERE {where_clause}
) sub
GROUP BY tile
ORDER BY min_id
""", # noqa: S608 # nosec B608
(num_batches, *params),
)
return cr.fetchall()
28 changes: 22 additions & 6 deletions spp_programs/models/managers/program_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from odoo.addons.job_worker.delay import group

from ..programs import SPPProgram
from .pagination_utils import compute_id_ranges

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -184,28 +185,43 @@ def _enroll_eligible_registrants_async(self, states, members_count):
program.message_post(body=_("Eligibility check of %s beneficiaries started.", members_count))
program.write({"is_locked": True, "locked_reason": "Eligibility check of beneficiaries"})

if isinstance(states, str):
states = [states]

id_ranges = compute_id_ranges(
self.env.cr,
"spp_program_membership",
"program_id = %s AND state IN %s",
(program.id, tuple(states)),
self.MAX_ROW_JOB_QUEUE,
)

jobs = []
for i in range(0, members_count, self.MAX_ROW_JOB_QUEUE):
for min_id, max_id in id_ranges:
jobs.append(
self.delayable(channel="program_manager")._enroll_eligible_registrants(
states, i, self.MAX_ROW_JOB_QUEUE
states, min_id=min_id, max_id=max_id
)
)
main_job = group(*jobs)
main_job.on_done(self.delayable(channel="program_manager").mark_enroll_eligible_as_done())
main_job.delay()

def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=False):
def _enroll_eligible_registrants(self, states, offset=0, limit=None, min_id=None, max_id=None, do_count=False):
"""Enroll Eligible Registrants

:param states: List of states to be used in domain filter
:param offset: Optional integer value for the ORM search offset
:param limit: Optional integer value for the ORM search limit
:param offset: Optional integer value for the ORM search offset (deprecated, use min_id/max_id)
:param limit: Optional integer value for the ORM search limit (deprecated, use min_id/max_id)
:param min_id: Minimum record ID for ID-range pagination (inclusive)
:param max_id: Maximum record ID for ID-range pagination (inclusive)
:param do_count: Boolean - set to False to not run compute functions
:return: Integer - count of not enrolled members
"""
program = self.program_id
members = program.get_beneficiaries(state=states, offset=offset, limit=limit, order="id")
members = program.get_beneficiaries(
state=states, offset=offset, limit=limit, min_id=min_id, max_id=max_id, order="id"
)

member_before = members

Expand Down
16 changes: 13 additions & 3 deletions spp_programs/models/programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ def get_managers(self, kind):
return [el.manager_ref_id for el in managers]

@api.model
def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=False, last_id=None):
def get_beneficiaries(
self, state=None, offset=0, limit=None, order=None, count=False, last_id=None, min_id=None, max_id=None
):
"""
Get program beneficiaries with pagination support.

Expand All @@ -324,9 +326,12 @@ def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=
:param order: Sort order
:param count: If True, return count instead of records
:param last_id: For cursor-based pagination - ID of last record from previous batch (more efficient)
:param min_id: For ID-range pagination - minimum record ID (inclusive)
:param max_id: For ID-range pagination - maximum record ID (inclusive)
:return: Recordset or count

Note: For large datasets, use cursor-based pagination with last_id parameter instead of offset.
Note: For large datasets, prefer min_id/max_id (ID-range) or last_id (cursor)
pagination over offset-based pagination.
"""
self.ensure_one()
if isinstance(state, str):
Expand All @@ -337,7 +342,12 @@ def get_beneficiaries(self, state=None, offset=0, limit=None, order=None, count=
if count:
return self.env["spp.program.membership"].search_count(domain, limit=limit)

# Use cursor-based pagination if last_id is provided (more efficient)
# ID-range pagination (best for parallel job dispatch)
if min_id is not None and max_id is not None:
domain = domain + [("id", ">=", min_id), ("id", "<=", max_id)]
return self.env["spp.program.membership"].search(domain, order=order or "id")

# Cursor-based pagination (good for sequential iteration)
if last_id is not None:
domain = domain + [("id", ">", last_id)]
return self.env["spp.program.membership"].search(domain, limit=limit, order=order or "id")
Expand Down
1 change: 1 addition & 0 deletions spp_programs/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
from . import test_payment_and_accounting
from . import test_managers
from . import test_cycle_auto_approve_fund_check
from . import test_keyset_pagination
Loading
Loading