Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ SERVICE_CONTROL_SERVICE_NAME=your-service-name.endpoints.your-project.cloud.goog
# Path to service account key file (for local development)
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json

# -----------------------------------------------------------------------------
# Data Purge (cancelled/deleted entitlements)
# -----------------------------------------------------------------------------
# Periodic hard-deletion of data for cancelled/deleted entitlements.
# Must be explicitly enabled; disabled by default for safety.
DATA_PURGE_ENABLED=false

# Days to retain cancelled/deleted entitlement data before purging (minimum 1)
DATA_RETENTION_DAYS=90

# Hours between automated purge runs
DATA_PURGE_INTERVAL_HOURS=24

# -----------------------------------------------------------------------------
# Rate Limiting Configuration (Redis-backed)
# -----------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ Usage tracking is built into the agent via the ADK plugin system. No configurati

See [Usage Tracking and Metering](metering.md) for details on the plugin system and how to extend it.

### Data Purge

Periodic hard-deletion of data for cancelled/deleted entitlements. Disabled by default — must be explicitly enabled after reviewing retention policies.

| Variable | Default | Description |
|----------|---------|-------------|
| `DATA_PURGE_ENABLED` | `false` | Enable automated periodic purging of expired cancelled/deleted entitlements |
| `DATA_RETENTION_DAYS` | `90` | Days to retain cancelled/deleted entitlement data before purge (minimum 1) |
| `DATA_PURGE_INTERVAL_HOURS` | `24` | Hours between automated purge runs |

When an entitlement is cancelled or deleted, its usage records are purged immediately. The entitlement record itself is retained for `DATA_RETENTION_DAYS` and then hard-deleted by the periodic purge scheduler. See [Marketplace > Data Retention and Purge](marketplace.md#data-retention-and-purge) for details.

### Logging

| Variable | Default | Description |
Expand Down
18 changes: 18 additions & 0 deletions docs/marketplace.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,24 @@ async def handle_procurement_event(message: dict):
)
```

## Data Retention and Purge

When an entitlement is cancelled or deleted, data is cleaned up in two stages:

1. **Immediate**: Usage records for the order are purged as soon as the cancel/delete Pub/Sub event is processed (best-effort — records actively being reported are skipped and cleaned up by the retention scheduler).

2. **Periodic**: After `DATA_RETENTION_DAYS` (default 90), the entitlement record, DCR client, and any remaining usage records are hard-deleted by a background scheduler task.

**Configuration:**

| Variable | Default | Description |
|----------|---------|-------------|
| `DATA_PURGE_ENABLED` | `false` | Must be explicitly enabled |
| `DATA_RETENTION_DAYS` | `90` | Minimum 1 day |
| `DATA_PURGE_INTERVAL_HOURS` | `24` | How often the purge scheduler runs |

The purge scheduler processes up to 100 expired entitlements per run. Rate-limit keys in Redis are cleaned on a best-effort basis (they auto-expire within 1 hour regardless).

## Usage Metering

### Metrics Tracked
Expand Down
16 changes: 16 additions & 0 deletions src/lightspeed_agent/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,22 @@ class Settings(BaseSettings):
description="Max unreported periods to process per backfill run",
)

# Data purge: retention and scheduling for cancelled/deleted entitlements
data_retention_days: int = Field(
default=90,
ge=1,
description="Days to retain data for cancelled/deleted entitlements before hard purge",
)
data_purge_enabled: bool = Field(
default=False,
description="Enable automated periodic purging of expired cancelled/deleted entitlements",
)
data_purge_interval_hours: int = Field(
default=24,
ge=1,
description="Interval in hours between automated data purge runs",
)

# Rate Limiting (Redis-backed)
rate_limit_requests_per_minute: int = Field(
default=60,
Expand Down
9 changes: 9 additions & 0 deletions src/lightspeed_agent/marketplace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
ProcurementEvent,
ProcurementEventType,
)
from lightspeed_agent.marketplace.purge import (
DataPurgeService,
PurgeResult,
get_data_purge_service,
)
from lightspeed_agent.marketplace.repository import (
EntitlementRepository,
get_entitlement_repository,
Expand All @@ -35,6 +40,10 @@
"EntitlementState",
"ProcurementEvent",
"ProcurementEventType",
# Purge
"DataPurgeService",
"PurgeResult",
"get_data_purge_service",
# Repository
"EntitlementRepository",
"get_entitlement_repository",
Expand Down
193 changes: 193 additions & 0 deletions src/lightspeed_agent/marketplace/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""Data purge service for cancelled/deleted marketplace entitlements."""

from __future__ import annotations

import asyncio
import logging
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta

from lightspeed_agent.dcr.repository import DCRClientRepository, get_dcr_client_repository
from lightspeed_agent.marketplace.repository import (
EntitlementRepository,
get_entitlement_repository,
)
from lightspeed_agent.metering.repository import UsageRepository, get_usage_repository
from lightspeed_agent.ratelimit.middleware import RedisRateLimiter, get_redis_rate_limiter

logger = logging.getLogger(__name__)


@dataclass
class PurgeResult:
"""Result of purging data for a single order."""

order_id: str
usage_records_deleted: int = 0
dcr_client_deleted: bool = False
entitlement_deleted: bool = False
rate_limit_keys_deleted: int = 0
error_count: int = 0


class DataPurgeService:
"""Service for purging data associated with cancelled/deleted orders.

Deletion order (child records first, no FK constraints):
1. Usage records (by order_id)
2. DCR client record (local DB safety net — external GMA deletion is
handled separately by the cancel flow)
3. Entitlement record (by id = order_id)
4. Rate limit keys (best-effort, keys auto-expire within 1hr)
"""

def __init__(
self,
usage_repo: UsageRepository | None = None,
entitlement_repo: EntitlementRepository | None = None,
dcr_repo: DCRClientRepository | None = None,
rate_limiter: RedisRateLimiter | None = None,
) -> None:
self._usage_repo = usage_repo or get_usage_repository()
self._entitlement_repo = entitlement_repo or get_entitlement_repository()
self._dcr_repo = dcr_repo or get_dcr_client_repository()
self._rate_limiter = rate_limiter

def _get_rate_limiter(self) -> RedisRateLimiter | None:
"""Lazy-init rate limiter (may not be available in all contexts)."""
if self._rate_limiter is None:
try:
self._rate_limiter = get_redis_rate_limiter()
except Exception:
logger.debug("Redis rate limiter not available for purge cleanup")
return self._rate_limiter

async def purge_order_data(self, order_id: str) -> PurgeResult:
"""Delete all data associated with a cancelled/deleted order.

Deletion order (child records first):
1. Usage records (by order_id)
2. DCR client (local DB row)
3. Entitlement record (by id = order_id)
4. Rate limit keys (best-effort)
"""
result = PurgeResult(order_id=order_id)

# 1. Usage records
try:
result.usage_records_deleted = await self._usage_repo.delete_by_order_id(
order_id
)
except Exception:
result.error_count += 1
logger.exception("Failed to delete usage records for order %s", order_id)

# 2. DCR client (local DB safety net)
try:
result.dcr_client_deleted = await self._dcr_repo.delete_by_order_id(order_id)
except Exception:
result.error_count += 1
logger.exception("Failed to delete DCR client for order %s", order_id)

# 3. Entitlement record — skip if child record deletion failed,
# so the entitlement remains discoverable for the next purge run.
if result.error_count == 0:
try:
result.entitlement_deleted = await self._entitlement_repo.delete(order_id)
except Exception:
result.error_count += 1
logger.exception("Failed to delete entitlement for order %s", order_id)
else:
logger.warning(
"Skipping entitlement delete for order %s due to %d prior errors",
order_id,
result.error_count,
)

# 4. Rate limit keys (best-effort)
limiter = self._get_rate_limiter()
if limiter:
try:
result.rate_limit_keys_deleted = await limiter.delete_keys_for_order(
order_id
)
except Exception:
logger.warning(
"Failed to delete rate limit keys for order %s", order_id
)

logger.info(
"Purge complete for order %s: usage_records=%d, entitlement=%s, "
"rate_limit_keys=%d, errors=%d",
order_id,
result.usage_records_deleted,
result.entitlement_deleted,
result.rate_limit_keys_deleted,
result.error_count,
)
return result

async def purge_expired_data(
self,
retention_days: int,
*,
batch_size: int = 100,
max_concurrency: int = 10,
) -> list[PurgeResult]:
"""Bulk purge data for entitlements cancelled/deleted longer than retention_days.

Processes in batches until no more expired entitlements remain.
"""
cutoff = datetime.now(UTC) - timedelta(days=retention_days)
results: list[PurgeResult] = []
sem = asyncio.Semaphore(max_concurrency)
max_batches = 1000

async def _limited_purge(order_id: str) -> PurgeResult:
async with sem:
return await self.purge_order_data(order_id)

for _ in range(max_batches):
expired = await self._entitlement_repo.get_expired_cancelled(
cutoff, limit=batch_size
)
if not expired:
break

logger.info(
"Purging batch of %d expired cancelled/deleted entitlements "
"(retention=%d days)",
len(expired),
retention_days,
)

batch_results = await asyncio.gather(
*(_limited_purge(e.id) for e in expired),
return_exceptions=True,
)
for i, r in enumerate(batch_results):
if isinstance(r, PurgeResult):
results.append(r)
else:
order_id = expired[i].id
logger.exception("Purge failed for order %s: %s", order_id, r)
results.append(PurgeResult(order_id=order_id, error_count=1))
else:
logger.warning(
"Purge loop hit max batch limit (%d); remaining entries "
"will be processed on the next run",
max_batches,
)

return results


_purge_service: DataPurgeService | None = None


def get_data_purge_service() -> DataPurgeService:
"""Get the global data purge service instance."""
global _purge_service
if _purge_service is None:
_purge_service = DataPurgeService()
return _purge_service
49 changes: 49 additions & 0 deletions src/lightspeed_agent/marketplace/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"""

import logging
from datetime import datetime

from sqlalchemy import delete as sa_delete
from sqlalchemy import select

from lightspeed_agent.db import (
Expand Down Expand Up @@ -130,6 +132,53 @@ async def update(self, entitlement: Entitlement) -> Entitlement:
)
return self._model_to_entity(model)

async def delete(self, entitlement_id: str) -> bool:
"""Hard-delete an entitlement record.

Args:
entitlement_id: The Entitlement/Order ID.

Returns:
True if a record was deleted, False if not found.
"""
async with get_session() as session:
result = await session.execute(
sa_delete(MarketplaceEntitlementModel).where(
MarketplaceEntitlementModel.id == entitlement_id
)
)
deleted = bool(result.rowcount) # type: ignore[attr-defined]
if deleted:
logger.info("Deleted entitlement: %s", entitlement_id)
return deleted

async def get_expired_cancelled(
self, cutoff: datetime, *, limit: int = 100
) -> list[Entitlement]:
"""Get entitlements in CANCELLED/DELETED state older than cutoff.

Args:
cutoff: Return entitlements with updated_at before this timestamp.
limit: Maximum number of entitlements to return per batch.

Returns:
List of expired cancelled/deleted entitlements.
"""
async with get_session() as session:
result = await session.execute(
select(MarketplaceEntitlementModel)
.where(
MarketplaceEntitlementModel.state.in_([
EntitlementState.CANCELLED.value,
EntitlementState.DELETED.value,
]),
MarketplaceEntitlementModel.updated_at < cutoff,
)
.order_by(MarketplaceEntitlementModel.updated_at)
.limit(limit)
)
return [self._model_to_entity(m) for m in result.scalars().all()]

async def is_valid(self, entitlement_id: str) -> bool:
"""Check if an entitlement is valid (exists and active).

Expand Down
4 changes: 4 additions & 0 deletions src/lightspeed_agent/marketplace/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ async def _handle_entitlement_cancelled(self, event: ProcurementEvent) -> None:

Updates entitlement state and deletes the associated OAuth client
from Red Hat SSO (if created via GMA) and from the local DB.
Usage data is retained until the periodic retention scheduler
purges it after DATA_RETENTION_DAYS.
"""
if not event.entitlement:
return
Expand All @@ -404,6 +406,8 @@ async def _handle_entitlement_deleted(self, event: ProcurementEvent) -> None:

Updates entitlement state and ensures the associated OAuth client
is cleaned up (safety net if not already deleted on cancellation).
Usage data is retained until the periodic retention scheduler
purges it after DATA_RETENTION_DAYS.
"""
if not event.entitlement:
return
Expand Down
Loading
Loading