Exploring Schema optimizations#3457
Conversation
|
Hey @ruslandoga — heads up that #3458 (currently in draft) rewrites Wanted to coordinate before either lands. A few options I can see:
Happy to defer to your preference — let me know what works. |
|
👋 @djwhitt Please feel free to ignore this PR, it was just a one shot from Codex based on Ziinc's task description. I think we should proceed with landing yours first. And if it's clear that a direct walk is an improvement, there is probably little reason to include the current implementation in history :) |
8ef52f0 to
7877749
Compare
|
Added a small standalone benchmark for On my local run with So the current |
ruslandoga
left a comment
There was a problem hiding this comment.
Added inline notes on the schema pressure-control and helper hot-path changes.
| bigquery_dataset_id: dataset_id, | ||
| name: Backends.via_source(source, Schema, backend.id) | ||
| name: | ||
| Backends.via_source_with_value( |
There was a problem hiding this comment.
The Schema process is now registered with a Registry value containing its :atomics counter. That keeps the counter tied to the process registration lifetime and lets callers check outstanding schema work before enqueueing another cast.
| end | ||
|
|
||
| defp checkout_counter(counter) do | ||
| queued = :atomics.add_get(counter, 1, 1) |
There was a problem hiding this comment.
This counter tracks queued plus in-flight schema update work, not just mailbox length. The increment happens before GenServer.cast/2; if the source is already over the configured limit, the increment is rolled back and the cast is skipped.
| try do | ||
| handle_update(log_event, source, state) | ||
| after | ||
| checkin_update(counter) |
There was a problem hiding this comment.
The decrement is intentionally in after so the counter is released only after schema diffing/patch handling finishes, including early returns and errors. This is the piece that makes the value represent actual outstanding work instead of cast latency.
| protected_keys = MapSet.new(initial_schema.fields, & &1.name) | ||
| param_keys = Map.keys(params) | ||
| param_key_set = MapSet.new(param_keys) | ||
| old_fields_by_name = Map.new(old_fields, &{&1.name, &1}) |
There was a problem hiding this comment.
build_table_schema/2 now indexes old fields and param keys once, avoiding repeated linear scans while preserving the existing field-retention behavior for keys not present in the sampled event.
| Implements merge for schema key conflicts. | ||
| Overwrites fields schemas that are present BOTH in old and new TFS structs and keeps fields schemas present ONLY in old. | ||
| """ | ||
| defp merge_payload_maps(map, acc) when is_map(map) do |
There was a problem hiding this comment.
This replaces DeepMerge in the array-of-maps path with a narrow recursive map merge. It avoids protocol dispatch in the hot path while keeping the previous behavior of merging nested map samples from repeated records.
| end | ||
|
|
||
| {k, v} | ||
| {normalize_typemap_key(k), v} |
There was a problem hiding this comment.
Typemap creation no longer atomizes incoming field names. The flattened schema maps already stringify paths, so keeping binary keys here avoids atom-table growth and removes String.to_atom/1 from the schema hot path.
|
Follow-up profiling pass pushed in What changed:
Quick wide-top-level local run after the change:
Profiler notes:
Commands used:
|
ruslandoga
left a comment
There was a problem hiding this comment.
Added a profiling-focused review pass. The short version: before the latest change, profiling showed redundant flatmap generation in the schema equality check; after the change, the remaining local CPU is mostly schema construction/sorting/merge traversal.
| def run do | ||
| scenarios = selected_scenarios() | ||
|
|
||
| Benchee.run( |
There was a problem hiding this comment.
This is the e2e-ish benchmark I used for profiling. It avoids app startup and external BigQuery/Postgres calls, but exercises the local Schema GenServer hot path shape: build a candidate schema, decide whether it changed, and optionally build the flatmap that persist would need.
| "one new field: build + diff" => fn scenario -> | ||
| new_field_update(scenario) | ||
| end, | ||
| "one new field: build + diff + flatmap" => fn scenario -> |
There was a problem hiding this comment.
The + flatmap case is intentionally separated because eprof/tprof showed flatmap generation is a distinct cost. On the wide-top-level run, build + diff was ~53-60us, while build + diff + flatmap was ~80us, so flatmap construction adds roughly 20-30us for that shape.
| end | ||
| end | ||
|
|
||
| defp same_schemas?(old_schema, new_schema) do |
There was a problem hiding this comment.
This mirrors the production same_schemas?/2 simplification. eprof/tprof showed the old check spending time in SchemaUtils.bq_schema_to_flat_typemap/1, flatten_node/3, to_typemap/2, and map/list traversal even though the final predicate still required structural equality. That flatmap work was redundant for the equality decision.
| warmup: benchmark_seconds("BENCH_WARMUP", 2.0), | ||
| memory_time: benchmark_seconds("BENCH_MEMORY_TIME", 3.0), | ||
| reduction_time: benchmark_seconds("BENCH_REDUCTION_TIME", 3.0), | ||
| profile_after: profile_after(), |
There was a problem hiding this comment.
Benchee 1.5 supports profile_after, including :tprof on this OTP/Elixir combo. Use e.g. PROFILE_AFTER=eprof ... mix run --no-start bench/schema_update_bench.exs or PROFILE_AFTER=tprof ...; tprof output broadly matched eprof in pointing at SchemaBuilder traversal/sort/merge and flatmap generation.
| end | ||
|
|
||
| defp same_schemas?(old_schema, new_schema) do | ||
| old_schema == new_schema |
There was a problem hiding this comment.
Profiling found this was the most obvious avoidable work: the previous implementation built old and new flatmaps and then still required old_schema == new_schema. This line preserves the actual behavior while removing two complete typemap/flatten traversals from every schema update decision.
| @@ -126,60 +152,44 @@ defmodule Logflare.Sources.Source.BigQuery.SchemaBuilder do | |||
| @spec build_table_schema([map()] | map(), TFS.t()) :: TFS.t() | |||
|
|
|||
| def build_table_schema(params, %{fields: old_fields}) do | |||
There was a problem hiding this comment.
After removing the redundant flatmap equality work, eprof/tprof show the remaining local CPU is concentrated here: building field schemas, merging with the old schema, sorting via deep_sort_by_fields_name/1, and Enum.uniq_by/2. For wide top-level samples, this is now the main CPU area rather than typemap flattening.
| alias Logflare.BigQuery.SchemaTypes | ||
| alias Model.TableFieldSchema, as: TFS | ||
|
|
||
| @initial_table_schema %Model.TableSchema{ |
There was a problem hiding this comment.
This avoids reconstructing the initial schema fields and protected-key set machinery on each call. It is a small win compared with the flatmap short-circuit, but it removes repeated setup work visible around initial_table_schema/0, MapSet.new/2, and MapSet.member?/2 in the profile.
|
|
||
| # filter field schemas that are present only in old table field schema | ||
| uniq_old_fs = for fs <- old_fields, fs.name not in new_fields_names, do: fs | ||
| defp merge_field_schema(%TFS{fields: old_fields}, %TFS{fields: new_fields} = new) |
There was a problem hiding this comment.
The merge path still appears in the profiles (merge_field_schema/2, nested Enum.map/2, Map.get/3, MapSet.member?/2). Next possible optimization would be more structural: return {schema, changed?} from SchemaBuilder or avoid global sorting/uniq when no schema shape changed.
|
Profiling summary from the latest What showed up before the latest simplification:
Current wide-top-level eprof run after the simplification:
What remains hot:
Possible next optimizations, if we want to keep going:
I added inline comments on the PR around the benchmark scenarios, profiler config, |
|
Follow-up pushed: This makes Short branchmarks against the previous PR commit ( The nested no-op wall-time result is mixed/noisy, but it still reduces scheduler work. The more important ingest cases here are no-op wide schemas and changed schemas, where this removes the old sort+compare path and improves ips materially. Verification: |
Schema optimizations exploration
Schema optimizations explorationSchema optimizations
b5e0aa4 to
a5f46bf
Compare
|
Closing in favor of smaller, self-contained patches, starting with #3534 |
Please ignore this PR for now :)
Adds a Benchee harness for BigQuery schema helper hot paths under
bench/.Adds a pre-cast mailbox guard to
Logflare.Sources.Source.BigQuery.Schema.update/3so sampled schema updates are dropped when the Schema process already has a backlog.This targets observed queue spikes where in-process schema rate limiting still allowed ingest pipelines to keep filling the Schema inbox.
Validation: syntax-parsed changed files with
elixir;mix test test/logflare/source/bigquery/schema_test.exsandmix formatare blocked in this workspace by dependency lock mismatches that requiremix deps.get.