perf: improve load shedding from QueueJanitor #3518
Conversation
…r polling - Register QueueJanitor in SourceRegistry/BackendRegistry so it is discoverable by sibling processes; anonymous (no name) when backend_id is absent (default-backend case) - Add handle_cast(:check) with a 2.5s debounce so an external caller can trigger an immediate cleanup run without disrupting the polling timer - Add notify_overflow/2 and notify_overflow_consolidated/1 public helpers that cast :check via the registry (safe no-op if the process is not up) - In BufferProducer.resolve_demand/2, check queue size after each fetch and debounce-signal the sibling QueueJanitor when size exceeds max_queue_size(), providing a fast path for overflow detection without waiting for the poll - Halve the low-RPS polling multipliers (10x→5x, 5x→3x, 2.5x→2x) to reduce ingested-event retention latency on quiet sources https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
- QueueJanitor: simplify registration to a single if/else (consolidated vs source-keyed via tuple); always register - QueueJanitor: drop the internal :check debounce — debounce now lives solely in the caller (BufferProducer) - BufferProducer: collapse the two maybe_signal_janitor clauses into one, expressing the consolidated/standard split inline; use with/do for the debounce + size + threshold guard chain https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
… signaling QueueJanitor tests (ingest_event_queue_test.exs): - :check cast triggers cleanup faster than 60s polling interval - rapid :check casts each run cleanup (no janitor-side debounce) - notify_overflow/2 is a safe no-op when no janitor is registered - janitor is discoverable via Backends.via_source/3 after start - consolidated janitor is discoverable via Backends.via_backend/2 BufferProducer tests (buffer_producer_test.exs): - signals notify_overflow/2 when queue exceeds IngestEventQueue.max_queue_size() - does not signal when queue is under threshold - producer-side debounce suppresses second signal within 2.5s - signals again after the 2.5s debounce window expires (@tag :slow) - consolidated path calls notify_overflow_consolidated/1 - does not signal when backend_id is nil Also adds Mimic.copy for QueueJanitor in test_helper.exs. https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
Multiply the configured base purge_ratio by a memory-pressure factor read
from Logflare.System.memory_utilization/0:
< 60% memory → 1x (unchanged)
>= 60% → 2x
>= 75% → 5x
>= 85% → 20x (effectively purges the entire pending queue
once the result is capped at 1.0)
This makes overflow recovery aggressive as the BEAM approaches the
health-check threshold (default 80%) so the queue does not keep growing
into OOM territory while the health endpoint is already returning 503.
Tests cover all four memory bands by stubbing
Logflare.System.memory_utilization/0 via Mimic.
https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
Drop the 60% band — the janitor's base purge_ratio is already adequate at typical utilization. Start scaling at 75% (2x) and cap critical pressure at 5x (>= 85%), which with the default purge_ratio of 0.05 yields 25% at critical — aggressive but not a total wipe. Update tests to match the new bands and rename them accordingly. https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
Baishan
left a comment
There was a problem hiding this comment.
Looks like it works as intended but I'd question dropping pending here rather than refusing to accept them at ingest
|
|
||
| factor = | ||
| cond do | ||
| util >= 0.85 -> 5.0 |
There was a problem hiding this comment.
Maybe put these vals as consts at the top or in the config so its easy to tweak later
|
Some notes from discussion with Claude below. I don't think any of it is blocking, but it's worth a read.
|
| Reason | Emission site | Metric | Status |
|---|---|---|---|
| Validation reject (invalid event, out-of-range timestamp) | Backends.split_valid_events/2 |
logflare.logs.ingest_logs.drop |
Wired |
| GenStage buffer overflow (downstream can't keep up) | BufferProducer.format_discarded/2 |
logflare.logs.ingest_logs.buffer_full |
Declared in lib/telemetry.ex:183 but never emitted |
| ETS queue shed (janitor exceeded threshold) | QueueJanitor.drop_queue/5 |
(new metric) | Not wired |
Two changes would close the gap:
1. Wire format_discarded/2 to the already-declared buffer_full counter. This is the uncontrolled drop — GenStage evicting events from its internal buffer because demand can't drain it. Natural fit for the existing metric name:
def format_discarded(discarded, state) do
:telemetry.execute(
[:logflare, :logs, :ingest_logs],
%{buffer_full: discarded},
%{source_id: state.source_id, backend_id: state.backend_id,
consolidated?: state.consolidated}
)
# ...existing Logger.warning...
endNote: buffer_full is currently declared as counter(...), which increments by 1 per :telemetry.execute call rather than summing the discarded value. Either change it to sum("logflare.logs.ingest_logs.buffer_full", measurement: :buffer_full) to track total events lost, or declare both — counter for event frequency, sum for event magnitude.
2. Add a new metric for the janitor's controlled shedding. The PR's queue-overflow shed is operationally distinct from a GenStage buffer overflow — one means "we capped queue size to protect memory," the other means "consumers are too slow." Worth keeping them separately graphable. Suggest:
:telemetry.execute(
[:logflare, :backends, :ingest_event_queue, :drop],
%{count: to_drop, size_before: size},
%{
backend_id: state.backend_id,
source_id: state.source_id,
consolidated?: state.consolidated?,
memory_factor: factor # which tier fired
}
)…plus a matching sum registration in lib/telemetry.ex. Without this, operators have no way to measure drop rate, dashboard the new memory-pressure tiers, or know whether the producer-side overflow signaling is actually triggering purges in production.
Memory-tier scaling can't fire in the case it's most needed
effective_purge_ratio/1 only runs after the size > state.max gate in drop_queue/5. So the memory-tier multipliers (×2 at ≥75%, ×5 at ≥85%) only adjust shedding when row count has already breached the threshold.
The scenario that most needs memory-aware shedding is the inverse: a queue holding a relatively small number of very large events (e.g. OTel traces with multi-KB payloads). Row count stays well under max, the gate never opens, and effective_purge_ratio is never consulted — even as the binary heap grows toward OOM. The PR's memory awareness can't reach this case.
Also worth noting: Logflare.System.memory_utilization/0 measures :erlang.memory(:total) / system_total_memory, which is the whole BEAM footprint as a fraction of host RAM. It rises with anything in the BEAM (process heaps, in-flight HTTP buffers, ETS code cache), not specifically with queue memory. So even when it does scale the drop ratio, the signal isn't queue-attributable.
GC lag amplifies the memory tiers
Refc binaries dereferenced by IngestEventQueue.drop/3 aren't released until the holding process (the BufferProducer) does a heap GC, which happens lazily on the next collection. So :erlang.memory(:total) — and by extension memory_utilization() — stays elevated for seconds after a successful shed. The next janitor cycle sees the same pressure tier and applies the same ×5 factor, even though the previous shed should have already addressed the problem. Without an explicit :erlang.garbage_collect(pid) on the producer or a per-janitor cooldown, the design can over-correct when binary memory is the actual driver.
Threshold vs. drop-max mismatch
BufferProducer.maybe_signal_janitor/1 signals overflow at IngestEventQueue.max_queue_size() (30 000 rows, or × schedulers_online() for consolidated). The janitor's drop_queue/5 only drops :pending events when size > state.max, where state.max = max_buffer_queue_len() * 1.2 (≈36 000). Between 30 001 and 36 000, the producer fires overflow signals that result in only an :ingested truncation — no :pending drop. Functionally fine, but the relationship is non-obvious and worth aligning or documenting.
Tunable surface area
The PR introduces roughly six new constants that compose multiplicatively: @default_max (max_buffer_queue_len() * 1.2), @default_purge_ratio (0.05), memory tier thresholds at 0.75 and 0.85, factor multipliers of 2.0 and 5.0, consolidated_max_multiplier (System.schedulers_online()), and @janitor_signal_debounce_ms (2 500).
The resulting drop fraction for a given queue depends on purge_ratio × factor, clamped at 1.0, gated by whether size > max × consolidated_multiplier and whether memory_utilization() crossed a tier. Reasoning about steady-state behavior under sustained load requires holding all of these in mind simultaneously, and tuning any one of them changes the others' effective range. Worth considering whether the protection envelope can be expressed with fewer, more independent signals — but at minimum, the interactions deserve a comment or test that pins down expected behavior at the boundaries.
Other notes
QueueJanitor.init/1requiressource:in consolidated mode. Lines 50–51 accesssource.idandsource.tokenunconditionally. A true backend-scoped janitor shouldn't need a source.Mimic.copy(Logflare.System)is global. Added intest/test_helper.exs. Any other test that incidentally invokesLogflare.System.memory_utilization/0now goes through Mimic. Probably fine but worth being aware of.@janitor_signal_debounce_ms = 2_500is a free-floating constant. Could be derived frominterval(e.g.2 * interval) so it stays tied to janitor cadence — easier to reason about, scales naturally if the polling interval is ever tuned.- The debounce expiry test in
test/logflare/backends/buffer_producer_test.exsuses:timer.sleep(2_600). If the debounce becomes derivable frominterval, the test can pass a small interval and avoid the wall-clock wait. schedule/3polling-interval tiers (× 5 / × 3 / × 2based onmetrics.avg) are an optimization rather than a correctness mechanism. Idle janitor cycles are cheap; could be simplified to a single tier or removed without functional impact.
This PR improves load shedding from QueueJanitor, by increasing the purge ratio based on available system memory, and triggering immediate purges from downstream consumers (debounced) so that there is immediate adherence to the max queue size, in the event of sudden event bursts.
This PR also bumps up the consolidated queue size multiplier to be based on system scheduler
Closes O11Y-1868