Skip to content

Exploring Schema optimizations#3457

Closed
ruslandoga wants to merge 6 commits into
Logflare:mainfrom
ruslandoga:ruslandoga+conductor/schema-ips-reduce
Closed

Exploring Schema optimizations#3457
ruslandoga wants to merge 6 commits into
Logflare:mainfrom
ruslandoga:ruslandoga+conductor/schema-ips-reduce

Conversation

@ruslandoga
Copy link
Copy Markdown
Contributor

@ruslandoga ruslandoga commented May 12, 2026

Please ignore this PR for now :)


Adds a Benchee harness for BigQuery schema helper hot paths under bench/.

Adds a pre-cast mailbox guard to Logflare.Sources.Source.BigQuery.Schema.update/3 so sampled schema updates are dropped when the Schema process already has a backlog.

This targets observed queue spikes where in-process schema rate limiting still allowed ingest pipelines to keep filling the Schema inbox.

Validation: syntax-parsed changed files with elixir; mix test test/logflare/source/bigquery/schema_test.exs and mix format are blocked in this workspace by dependency lock mismatches that require mix deps.get.

@djwhitt
Copy link
Copy Markdown
Contributor

djwhitt commented May 12, 2026

Hey @ruslandoga — heads up that #3458 (currently in draft) rewrites SchemaUtils.flatten_typemap/1 to replace the Iteraptor.to_flatmap call with a direct walk, getting ~2-4× faster and ~5-13× less memory on realistic edge_log / OTel fixtures. Looks like we're targeting overlapping hot paths with different harnesses (Benchee here vs. :tprof + a snapshot/history file under test/profiling/ in mine).

Wanted to coordinate before either lands. A few options I can see:

  • Land yours first; I rebase perf(schema): rewrite SchemaUtils.flatten_typemap as direct walk #3458 on top. Your baseline numbers measure the slow Iteraptor impl, and you re-baseline after mine.
  • Land mine first; your benchmark measures the new direct-walk impl from the start. Probably the more useful ordering for ongoing trend tracking.
  • Consolidate harnesses — Benchee for ips/comparison + allocation/reduction snapshots in one place, since two parallel benchmark setups for the same code path is a smell.

Happy to defer to your preference — let me know what works.

@ruslandoga
Copy link
Copy Markdown
Contributor Author

ruslandoga commented May 12, 2026

👋 @djwhitt

Please feel free to ignore this PR, it was just a one shot from Codex based on Ziinc's task description. I think we should proceed with landing yours first. And if it's clear that a direct walk is an improvement, there is probably little reason to include the current implementation in history :)

@ruslandoga ruslandoga force-pushed the ruslandoga+conductor/schema-ips-reduce branch from 8ef52f0 to 7877749 Compare May 13, 2026 16:57
@ruslandoga ruslandoga changed the title Add schema helper benchmarks Limit schema update mailbox growth May 13, 2026
@ruslandoga
Copy link
Copy Markdown
Contributor Author

Added a small standalone benchmark for Process.info(pid, :message_queue_len) in bench/process_info_message_queue_len_bench.exs.

On my local run with ITERATIONS=1000000 SENDERS=1:

sleeping process, empty mailbox:       ~29 ns/call
sleeping process, 100k queued messages: ~29 ns/call
draining process under sender pressure: ~3.1 us/call

So the current Process.info/2 guard is cheap for idle/backlogged targets, but it gets meaningfully more expensive when the target process is actively receiving/draining messages. That still looks acceptable for sampled schema updates, but Ziinc's atomics/counters suggestion is probably the better next shape: keep a per-Schema-process queued/in-flight counter, increment before cast, decrement after handling, and skip casts when the counter is over a small threshold. That would avoid VM mailbox introspection on the hot caller side and track only Schema update work.

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added inline notes on the schema pressure-control and helper hot-path changes.

bigquery_dataset_id: dataset_id,
name: Backends.via_source(source, Schema, backend.id)
name:
Backends.via_source_with_value(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Schema process is now registered with a Registry value containing its :atomics counter. That keeps the counter tied to the process registration lifetime and lets callers check outstanding schema work before enqueueing another cast.

end

defp checkout_counter(counter) do
queued = :atomics.add_get(counter, 1, 1)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This counter tracks queued plus in-flight schema update work, not just mailbox length. The increment happens before GenServer.cast/2; if the source is already over the configured limit, the increment is rolled back and the cast is skipped.

try do
handle_update(log_event, source, state)
after
checkin_update(counter)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decrement is intentionally in after so the counter is released only after schema diffing/patch handling finishes, including early returns and errors. This is the piece that makes the value represent actual outstanding work instead of cast latency.

protected_keys = MapSet.new(initial_schema.fields, & &1.name)
param_keys = Map.keys(params)
param_key_set = MapSet.new(param_keys)
old_fields_by_name = Map.new(old_fields, &{&1.name, &1})
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_table_schema/2 now indexes old fields and param keys once, avoiding repeated linear scans while preserving the existing field-retention behavior for keys not present in the sampled event.

Implements merge for schema key conflicts.
Overwrites fields schemas that are present BOTH in old and new TFS structs and keeps fields schemas present ONLY in old.
"""
defp merge_payload_maps(map, acc) when is_map(map) do
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces DeepMerge in the array-of-maps path with a narrow recursive map merge. It avoids protocol dispatch in the hot path while keeping the previous behavior of merging nested map samples from repeated records.

end

{k, v}
{normalize_typemap_key(k), v}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typemap creation no longer atomizes incoming field names. The flattened schema maps already stringify paths, so keeping binary keys here avoids atom-table growth and removes String.to_atom/1 from the schema hot path.

Copy link
Copy Markdown
Contributor Author

Follow-up profiling pass pushed in 07c6d38f8.

What changed:

  • Added bench/schema_update_bench.exs, an e2e-ish local Schema update benchmark that does build_table_schema/2 plus the schema equality/update decision, optionally including flatmap generation for the post-patch/persist path.
  • Added PROFILE_AFTER=eprof|tprof|cprof|fprof support to the schema helper benches.
  • Simplified Schema.same_schemas?/2 to old_schema == new_schema. The previous implementation computed two flatmaps and then still required old_schema == new_schema, so the flatmap work was redundant on both matching and changed schemas.
  • Avoided rebuilding the initial table schema/protected-key set on every build_table_schema/2 call.

Quick wide-top-level local run after the change:

  • existing payload: build + diff: ~16.8K ips, avg ~59.6us in the eprof run.
  • one new field: build + diff: ~18.7K ips, avg ~53.5us in the eprof run.
  • one new field: build + diff + flatmap: ~12.4K ips, avg ~80.7us in the eprof run.

Profiler notes:

  • Before the same_schemas?/2 simplification, the wide e2e path spent visible time flattening both old and new schemas even when only deciding whether a patch is needed.
  • After the simplification, the remaining cost is mostly SchemaBuilder traversal/sorting/struct construction, with the flatmap path only showing up when explicitly benchmarking the persist-style + flatmap case.

Commands used:

  • mix test test/logflare/source/bigquery/schema_test.exs test/logflare/google/bigquery/schema_builder_test.exs
  • SCHEMA_UPDATE_BENCH_SCENARIO=wide_top_level BENCH_TIME=1 BENCH_WARMUP=0.2 BENCH_MEMORY_TIME=0 BENCH_REDUCTION_TIME=0 mix run --no-start bench/schema_update_bench.exs
  • PROFILE_AFTER=eprof SCHEMA_UPDATE_BENCH_SCENARIO=wide_top_level BENCH_TIME=0.2 BENCH_WARMUP=0 BENCH_MEMORY_TIME=0 BENCH_REDUCTION_TIME=0 mix run --no-start bench/schema_update_bench.exs

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a profiling-focused review pass. The short version: before the latest change, profiling showed redundant flatmap generation in the schema equality check; after the change, the remaining local CPU is mostly schema construction/sorting/merge traversal.

def run do
scenarios = selected_scenarios()

Benchee.run(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the e2e-ish benchmark I used for profiling. It avoids app startup and external BigQuery/Postgres calls, but exercises the local Schema GenServer hot path shape: build a candidate schema, decide whether it changed, and optionally build the flatmap that persist would need.

"one new field: build + diff" => fn scenario ->
new_field_update(scenario)
end,
"one new field: build + diff + flatmap" => fn scenario ->
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The + flatmap case is intentionally separated because eprof/tprof showed flatmap generation is a distinct cost. On the wide-top-level run, build + diff was ~53-60us, while build + diff + flatmap was ~80us, so flatmap construction adds roughly 20-30us for that shape.

Comment thread bench/schema_update_bench.exs Outdated
end
end

defp same_schemas?(old_schema, new_schema) do
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mirrors the production same_schemas?/2 simplification. eprof/tprof showed the old check spending time in SchemaUtils.bq_schema_to_flat_typemap/1, flatten_node/3, to_typemap/2, and map/list traversal even though the final predicate still required structural equality. That flatmap work was redundant for the equality decision.

warmup: benchmark_seconds("BENCH_WARMUP", 2.0),
memory_time: benchmark_seconds("BENCH_MEMORY_TIME", 3.0),
reduction_time: benchmark_seconds("BENCH_REDUCTION_TIME", 3.0),
profile_after: profile_after(),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchee 1.5 supports profile_after, including :tprof on this OTP/Elixir combo. Use e.g. PROFILE_AFTER=eprof ... mix run --no-start bench/schema_update_bench.exs or PROFILE_AFTER=tprof ...; tprof output broadly matched eprof in pointing at SchemaBuilder traversal/sort/merge and flatmap generation.

end

defp same_schemas?(old_schema, new_schema) do
old_schema == new_schema
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Profiling found this was the most obvious avoidable work: the previous implementation built old and new flatmaps and then still required old_schema == new_schema. This line preserves the actual behavior while removing two complete typemap/flatten traversals from every schema update decision.

@@ -126,60 +152,44 @@ defmodule Logflare.Sources.Source.BigQuery.SchemaBuilder do
@spec build_table_schema([map()] | map(), TFS.t()) :: TFS.t()

def build_table_schema(params, %{fields: old_fields}) do
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After removing the redundant flatmap equality work, eprof/tprof show the remaining local CPU is concentrated here: building field schemas, merging with the old schema, sorting via deep_sort_by_fields_name/1, and Enum.uniq_by/2. For wide top-level samples, this is now the main CPU area rather than typemap flattening.

alias Logflare.BigQuery.SchemaTypes
alias Model.TableFieldSchema, as: TFS

@initial_table_schema %Model.TableSchema{
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids reconstructing the initial schema fields and protected-key set machinery on each call. It is a small win compared with the flatmap short-circuit, but it removes repeated setup work visible around initial_table_schema/0, MapSet.new/2, and MapSet.member?/2 in the profile.


# filter field schemas that are present only in old table field schema
uniq_old_fs = for fs <- old_fields, fs.name not in new_fields_names, do: fs
defp merge_field_schema(%TFS{fields: old_fields}, %TFS{fields: new_fields} = new)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merge path still appears in the profiles (merge_field_schema/2, nested Enum.map/2, Map.get/3, MapSet.member?/2). Next possible optimization would be more structural: return {schema, changed?} from SchemaBuilder or avoid global sorting/uniq when no schema shape changed.

Copy link
Copy Markdown
Contributor Author

Profiling summary from the latest eprof/tprof pass:

What showed up before the latest simplification:

  • The e2e-ish build + diff path spent visible time in SchemaUtils.bq_schema_to_flat_typemap/1, SchemaUtils.to_typemap/2, SchemaUtils.flatten_node/3, Enum.map/2, and map/list traversal.
  • That came from same_schemas?/2 building flatmaps for both old and new schemas before still requiring old_schema == new_schema.
  • Since the final predicate already required structural equality, the flatmap diff work was redundant. Removing it preserved behavior and cut the local wide-top-level decision path from roughly ~130-170us in earlier smoke runs to ~53-60us in the focused eprof run.

Current wide-top-level eprof run after the simplification:

  • one new field: build + diff: ~18.7K ips, avg ~53.5us.
  • existing payload: build + diff: ~16.8K ips, avg ~59.6us.
  • one new field: build + diff + flatmap: ~12.4K ips, avg ~80.7us.

What remains hot:

  • SchemaBuilder.build_table_schema/2 and helpers dominate the local CPU now.
  • The recurring functions in eprof/tprof are build_fields_schemas/2, merge_field_schema/2, determine_type/3, deep_sort_by_fields_name/1, Enum.uniq_by/2, Enum.map/2, Map.get/3, and lower-level map/list traversal.
  • Flatmap generation is still a real cost, but only in the explicit + flatmap path that models persist-time work, not in the ordinary update decision anymore.

Possible next optimizations, if we want to keep going:

  • Have SchemaBuilder return {schema, changed?} so the caller does not need a second full structural comparison.
  • Avoid deep_sort_by_fields_name/1 when no new field was introduced, or sort only changed subtrees.
  • Avoid the final Enum.uniq_by/2 by constructing top-level fields in a way that guarantees uniqueness.
  • Consider a builder representation that tracks old fields by name through recursion, so nested merges avoid rebuilding lookup maps repeatedly.

I added inline comments on the PR around the benchmark scenarios, profiler config, same_schemas?/2, and the remaining SchemaBuilder hot paths.

@ruslandoga ruslandoga changed the title Limit schema update mailbox growth Scheme optimization exploration May 13, 2026
Copy link
Copy Markdown
Contributor Author

Follow-up pushed: b5e0aa40a (Skip schema diff work for unchanged payloads).

This makes SchemaBuilder return {schema, changed?} so Schema can skip the separate full-schema equality diff for unchanged payloads. When no field changed, the builder returns the existing schema directly and avoids the final deep_sort_by_fields_name/1 pass. For changed payloads, it also avoids the previous final Enum.uniq_by/2 by tracking existing field names while constructing the updated field list.

Short branchmarks against the previous PR commit (BENCH_TIME=2 BENCH_WARMUP=1 BENCH_REDUCTION_TIME=1 mix run --no-start bench/schema_update_bench.exs):

wide top-level existing payload: 7.61K ips -> 34.63K ips, reductions 5.94K -> 3.65K
wide top-level one new field: 12.81K ips -> 22.11K ips, reductions 5.99K -> 6.02K
otel trace existing payload: 58.26K ips -> 73.86K ips, reductions 2.09K -> 1.41K
otel trace one new field: 21.67K ips -> 65.68K ips, reductions 2.24K -> 2.32K
cloudflare nested one new field: 8.92K ips -> 26.11K ips, reductions 3.99K -> 4.21K
cloudflare nested existing payload: 26.10K ips -> 18.61K ips, reductions 3.95K -> 3.00K

The nested no-op wall-time result is mixed/noisy, but it still reduces scheduler work. The more important ingest cases here are no-op wide schemas and changed schemas, where this removes the old sort+compare path and improves ips materially.

Verification: mix test test/logflare/google/bigquery/schema_builder_test.exs.

@ruslandoga ruslandoga changed the title Scheme optimization exploration Schema optimization exploration May 13, 2026
@ruslandoga ruslandoga changed the title Schema optimization exploration Schema optimizations exploration May 13, 2026
@ruslandoga ruslandoga changed the title Schema optimizations exploration Schema optimizations exploration May 13, 2026
@ruslandoga ruslandoga changed the title Schema optimizations exploration Exploring Schema optimizations May 13, 2026
@ruslandoga ruslandoga force-pushed the ruslandoga+conductor/schema-ips-reduce branch from b5e0aa4 to a5f46bf Compare May 28, 2026 08:54
@ruslandoga
Copy link
Copy Markdown
Contributor Author

ruslandoga commented May 28, 2026

Closing in favor of smaller, self-contained patches, starting with #3534

@ruslandoga ruslandoga closed this May 28, 2026
@ruslandoga ruslandoga deleted the ruslandoga+conductor/schema-ips-reduce branch May 28, 2026 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants