Skip to content

perf: improve load shedding from QueueJanitor #3518

Open
Ziinc wants to merge 6 commits into
mainfrom
claude/clever-cerf-fFPXk
Open

perf: improve load shedding from QueueJanitor #3518
Ziinc wants to merge 6 commits into
mainfrom
claude/clever-cerf-fFPXk

Conversation

@Ziinc
Copy link
Copy Markdown
Contributor

@Ziinc Ziinc commented May 22, 2026

This PR improves load shedding from QueueJanitor, by increasing the purge ratio based on available system memory, and triggering immediate purges from downstream consumers (debounced) so that there is immediate adherence to the max queue size, in the event of sudden event bursts.

This PR also bumps up the consolidated queue size multiplier to be based on system scheduler

Closes O11Y-1868

claude added 5 commits May 22, 2026 01:53
…r polling

- Register QueueJanitor in SourceRegistry/BackendRegistry so it is
  discoverable by sibling processes; anonymous (no name) when backend_id
  is absent (default-backend case)
- Add handle_cast(:check) with a 2.5s debounce so an external caller can
  trigger an immediate cleanup run without disrupting the polling timer
- Add notify_overflow/2 and notify_overflow_consolidated/1 public helpers
  that cast :check via the registry (safe no-op if the process is not up)
- In BufferProducer.resolve_demand/2, check queue size after each fetch and
  debounce-signal the sibling QueueJanitor when size exceeds max_queue_size(),
  providing a fast path for overflow detection without waiting for the poll
- Halve the low-RPS polling multipliers (10x→5x, 5x→3x, 2.5x→2x) to
  reduce ingested-event retention latency on quiet sources

https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
- QueueJanitor: simplify registration to a single if/else (consolidated
  vs source-keyed via tuple); always register
- QueueJanitor: drop the internal :check debounce — debounce now lives
  solely in the caller (BufferProducer)
- BufferProducer: collapse the two maybe_signal_janitor clauses into one,
  expressing the consolidated/standard split inline; use with/do for the
  debounce + size + threshold guard chain

https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
… signaling

QueueJanitor tests (ingest_event_queue_test.exs):
- :check cast triggers cleanup faster than 60s polling interval
- rapid :check casts each run cleanup (no janitor-side debounce)
- notify_overflow/2 is a safe no-op when no janitor is registered
- janitor is discoverable via Backends.via_source/3 after start
- consolidated janitor is discoverable via Backends.via_backend/2

BufferProducer tests (buffer_producer_test.exs):
- signals notify_overflow/2 when queue exceeds IngestEventQueue.max_queue_size()
- does not signal when queue is under threshold
- producer-side debounce suppresses second signal within 2.5s
- signals again after the 2.5s debounce window expires (@tag :slow)
- consolidated path calls notify_overflow_consolidated/1
- does not signal when backend_id is nil

Also adds Mimic.copy for QueueJanitor in test_helper.exs.

https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
Multiply the configured base purge_ratio by a memory-pressure factor read
from Logflare.System.memory_utilization/0:

  < 60% memory   →  1x   (unchanged)
  >= 60%         →  2x
  >= 75%         →  5x
  >= 85%         → 20x   (effectively purges the entire pending queue
                          once the result is capped at 1.0)

This makes overflow recovery aggressive as the BEAM approaches the
health-check threshold (default 80%) so the queue does not keep growing
into OOM territory while the health endpoint is already returning 503.

Tests cover all four memory bands by stubbing
Logflare.System.memory_utilization/0 via Mimic.

https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
Drop the 60% band — the janitor's base purge_ratio is already adequate
at typical utilization. Start scaling at 75% (2x) and cap critical
pressure at 5x (>= 85%), which with the default purge_ratio of 0.05
yields 25% at critical — aggressive but not a total wipe.

Update tests to match the new bands and rename them accordingly.

https://claude.ai/code/session_013WSYWJxqFMrbWqCpsDcaei
@Ziinc Ziinc requested review from Baishan and amokan May 22, 2026 08:18
@Ziinc Ziinc marked this pull request as ready for review May 22, 2026 08:18
Copy link
Copy Markdown
Contributor

@Baishan Baishan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it works as intended but I'd question dropping pending here rather than refusing to accept them at ingest


factor =
cond do
util >= 0.85 -> 5.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put these vals as consts at the top or in the config so its easy to tweak later

@djwhitt
Copy link
Copy Markdown
Contributor

djwhitt commented May 25, 2026

Some notes from discussion with Claude below. I don't think any of it is blocking, but it's worth a read.

notify_overflow_consolidated/1 has no listener in production

The only QueueJanitor started in lib/ is the standard one in AdaptorSupervisor (per source+backend), with consolidated: false. The ClickHouse consolidated path (lib/logflare/backends/adaptor/clickhouse_adaptor.ex:467) does not include a QueueJanitor child under Backends.via_backend(backend, __MODULE__). As a result, casts from BufferProducer.maybe_signal_janitor/1 in the consolidated branch resolve through the registry to no PID and silently no-op. The new test "notify_overflow/2 is a no-op when no janitor is registered" actually confirms this behavior.

Either wire a consolidated QueueJanitor into ClickhouseAdaptor.init/1, or drop the consolidated notify path until there's a listener for it. As written, that branch is dead code in production.

A related latent issue: QueueJanitor.init/1 accesses source.id and source.token unconditionally (lines 50–51), so a consolidated janitor cannot actually be started without also passing a source: it doesn't logically need. Worth resolving when the consolidated path is wired up.

No telemetry on the drop paths

There are three distinct reasons a log event can be dropped today; only one currently emits telemetry. The PR's new load-shedding behavior is invisible to dashboards and alerts.

Reason Emission site Metric Status
Validation reject (invalid event, out-of-range timestamp) Backends.split_valid_events/2 logflare.logs.ingest_logs.drop Wired
GenStage buffer overflow (downstream can't keep up) BufferProducer.format_discarded/2 logflare.logs.ingest_logs.buffer_full Declared in lib/telemetry.ex:183 but never emitted
ETS queue shed (janitor exceeded threshold) QueueJanitor.drop_queue/5 (new metric) Not wired

Two changes would close the gap:

1. Wire format_discarded/2 to the already-declared buffer_full counter. This is the uncontrolled drop — GenStage evicting events from its internal buffer because demand can't drain it. Natural fit for the existing metric name:

def format_discarded(discarded, state) do
  :telemetry.execute(
    [:logflare, :logs, :ingest_logs],
    %{buffer_full: discarded},
    %{source_id: state.source_id, backend_id: state.backend_id,
      consolidated?: state.consolidated}
  )
  # ...existing Logger.warning...
end

Note: buffer_full is currently declared as counter(...), which increments by 1 per :telemetry.execute call rather than summing the discarded value. Either change it to sum("logflare.logs.ingest_logs.buffer_full", measurement: :buffer_full) to track total events lost, or declare both — counter for event frequency, sum for event magnitude.

2. Add a new metric for the janitor's controlled shedding. The PR's queue-overflow shed is operationally distinct from a GenStage buffer overflow — one means "we capped queue size to protect memory," the other means "consumers are too slow." Worth keeping them separately graphable. Suggest:

:telemetry.execute(
  [:logflare, :backends, :ingest_event_queue, :drop],
  %{count: to_drop, size_before: size},
  %{
    backend_id: state.backend_id,
    source_id: state.source_id,
    consolidated?: state.consolidated?,
    memory_factor: factor  # which tier fired
  }
)

…plus a matching sum registration in lib/telemetry.ex. Without this, operators have no way to measure drop rate, dashboard the new memory-pressure tiers, or know whether the producer-side overflow signaling is actually triggering purges in production.

Memory-tier scaling can't fire in the case it's most needed

effective_purge_ratio/1 only runs after the size > state.max gate in drop_queue/5. So the memory-tier multipliers (×2 at ≥75%, ×5 at ≥85%) only adjust shedding when row count has already breached the threshold.

The scenario that most needs memory-aware shedding is the inverse: a queue holding a relatively small number of very large events (e.g. OTel traces with multi-KB payloads). Row count stays well under max, the gate never opens, and effective_purge_ratio is never consulted — even as the binary heap grows toward OOM. The PR's memory awareness can't reach this case.

Also worth noting: Logflare.System.memory_utilization/0 measures :erlang.memory(:total) / system_total_memory, which is the whole BEAM footprint as a fraction of host RAM. It rises with anything in the BEAM (process heaps, in-flight HTTP buffers, ETS code cache), not specifically with queue memory. So even when it does scale the drop ratio, the signal isn't queue-attributable.

GC lag amplifies the memory tiers

Refc binaries dereferenced by IngestEventQueue.drop/3 aren't released until the holding process (the BufferProducer) does a heap GC, which happens lazily on the next collection. So :erlang.memory(:total) — and by extension memory_utilization() — stays elevated for seconds after a successful shed. The next janitor cycle sees the same pressure tier and applies the same ×5 factor, even though the previous shed should have already addressed the problem. Without an explicit :erlang.garbage_collect(pid) on the producer or a per-janitor cooldown, the design can over-correct when binary memory is the actual driver.

Threshold vs. drop-max mismatch

BufferProducer.maybe_signal_janitor/1 signals overflow at IngestEventQueue.max_queue_size() (30 000 rows, or × schedulers_online() for consolidated). The janitor's drop_queue/5 only drops :pending events when size > state.max, where state.max = max_buffer_queue_len() * 1.2 (≈36 000). Between 30 001 and 36 000, the producer fires overflow signals that result in only an :ingested truncation — no :pending drop. Functionally fine, but the relationship is non-obvious and worth aligning or documenting.

Tunable surface area

The PR introduces roughly six new constants that compose multiplicatively: @default_max (max_buffer_queue_len() * 1.2), @default_purge_ratio (0.05), memory tier thresholds at 0.75 and 0.85, factor multipliers of 2.0 and 5.0, consolidated_max_multiplier (System.schedulers_online()), and @janitor_signal_debounce_ms (2 500).

The resulting drop fraction for a given queue depends on purge_ratio × factor, clamped at 1.0, gated by whether size > max × consolidated_multiplier and whether memory_utilization() crossed a tier. Reasoning about steady-state behavior under sustained load requires holding all of these in mind simultaneously, and tuning any one of them changes the others' effective range. Worth considering whether the protection envelope can be expressed with fewer, more independent signals — but at minimum, the interactions deserve a comment or test that pins down expected behavior at the boundaries.

Other notes

  • QueueJanitor.init/1 requires source: in consolidated mode. Lines 50–51 access source.id and source.token unconditionally. A true backend-scoped janitor shouldn't need a source.
  • Mimic.copy(Logflare.System) is global. Added in test/test_helper.exs. Any other test that incidentally invokes Logflare.System.memory_utilization/0 now goes through Mimic. Probably fine but worth being aware of.
  • @janitor_signal_debounce_ms = 2_500 is a free-floating constant. Could be derived from interval (e.g. 2 * interval) so it stays tied to janitor cadence — easier to reason about, scales naturally if the polling interval is ever tuned.
  • The debounce expiry test in test/logflare/backends/buffer_producer_test.exs uses :timer.sleep(2_600). If the debounce becomes derivable from interval, the test can pass a small interval and avoid the wall-clock wait.
  • schedule/3 polling-interval tiers (× 5 / × 3 / × 2 based on metrics.avg) are an optimization rather than a correctness mechanism. Idle janitor cycles are cheap; could be simplified to a single tier or removed without functional impact.

Copy link
Copy Markdown
Contributor

@amokan amokan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The points in @djwhitt comments all seem valid. Particularly the no-op on notify_overflow_consolidated/1 and the threshold mismatch between 36k and 30k.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants