Skip to content

bigquery: add schema resolver, evolution, and table name sanitization#4385

Open
squiidz wants to merge 6 commits into
mainfrom
bq-schema-management
Open

bigquery: add schema resolver, evolution, and table name sanitization#4385
squiidz wants to merge 6 commits into
mainfrom
bq-schema-management

Conversation

@squiidz
Copy link
Copy Markdown
Contributor

@squiidz squiidz commented May 1, 2026

Schema resolution + evolution

  • Add schemaResolver that fetches table metadata, builds a proto descriptor via the adapt pipeline, caches per table, and deduplicates concurrent cold-cache loads via singleflight.
  • Add schemaEvolver that diffs proto descriptor vs table schema and adds missing columns. Uses ETag with CAS-on-412 retry (max 5 attempts) so concurrent additive evolutions don't clobber each other; on retry exhaustion or a benign "another writer added the columns" outcome, signal the caller to retry the write.
  • Add grpcSchemaMismatch classification via StorageError details. In handleWriteError: on schema mismatch, evolve and retry; on evolution failure, return a permanent BatchError so messages go to DLQ instead of being retried forever.
  • Add bigquery_write_api_schema_evolutions_total and bigquery_write_api_schema_evolution_failures_total counters.

Table name handling

  • Sanitize interpolated table names (dots, hyphens, slashes, and whitespace become underscores; non-ASCII-alphanumerics are stripped; leading digits prefixed with _; cap at 1024 chars).
  • Reject empty-after-sanitization names with a permanent BatchError.

Error classification

  • Add isPermanentBQError that recognises both gRPC permanent codes and *googleapi.Error 4xx (excluding 408/429), so REST 4xx errors from the resolver path no longer retry forever.

Lifecycle and concurrency

  • Drop bqClient/datasetID fields from resolver and evolver; pass client + dataset as method arguments so they can't outlive the client. Close drains the resolver cache.
  • Track previously fire-and-forget stream closes (evictStream and the idle sweeper) on a closeWg so Close waits for them before tearing down the underlying clients.
  • Snapshot client and resolvedProjectID together under one connMu RLock in WriteBatch; pass projectID through to tableCacheKey.
  • Run client closes unconditionally in Close — the bigquery and managedwriter clients are non-blocking gRPC teardowns; gating them on ctx.Err() leaks connections.

Config validation and tuning

  • Reject delegates without target_principal at parse time.
  • Lower max_in_flight default from 64 to 4 to match snowflake_streaming and iceberg sibling outputs.
  • Detach the impersonate token source from Connect's ctx with context.WithoutCancel so a cancelled connect doesn't break later token refreshes.
  • Warn explicitly that endpoint overrides disable authentication.

Cleanup

  • Drop dead fieldNameMapping plumbing (resolvedSchema field, jsonToProtoBytes parameter, streamWithDescriptor field) — never wired in production.
  • Drop unused *service.Message argument from Resolve.
  • Reject non-positive proto Kind in protoKindToBQFieldType instead of silently coercing to STRING; cap RECORD nesting at 15 (BigQuery's own limit) to fail fast on self-referential descriptors.
  • Set Repeated on field schemas for repeated proto fields.

Tests

  • Unit tests for resolver (cache, evict, no-client fallback), evolver helpers (kind mapping, schema diff, repeated fields), buildAuthOpts (endpoint override, credentials_json, no-auth), error classification (gRPC permanent, REST 4xx, wrapped), table-name sanitization, sweep goroutine, config validation (durations, delegates).
  • Integration tests for schema evolution and table-name sanitization against the goccy bigquery emulator.

Schema resolution + evolution
- Add schemaResolver that fetches table metadata, builds a proto
  descriptor via the adapt pipeline, caches per table, and
  deduplicates concurrent cold-cache loads via singleflight.
- Add schemaEvolver that diffs proto descriptor vs table schema and
  adds missing columns. Uses ETag with CAS-on-412 retry (max 5
  attempts) so concurrent additive evolutions don't clobber each
  other; on retry exhaustion or a benign "another writer added the
  columns" outcome, signal the caller to retry the write.
- Add grpcSchemaMismatch classification via StorageError details.
  In handleWriteError: on schema mismatch, evolve and retry; on
  evolution failure, return a permanent BatchError so messages go
  to DLQ instead of being retried forever.
- Add bigquery_write_api_schema_evolutions_total and
  bigquery_write_api_schema_evolution_failures_total counters.

Table name handling
- Sanitize interpolated table names (dots, hyphens, slashes, and
  whitespace become underscores; non-ASCII-alphanumerics are
  stripped; leading digits prefixed with _; cap at 1024 chars).
- Reject empty-after-sanitization names with a permanent BatchError.

Error classification
- Add isPermanentBQError that recognises both gRPC permanent codes
  and *googleapi.Error 4xx (excluding 408/429), so REST 4xx errors
  from the resolver path no longer retry forever.

Lifecycle and concurrency
- Drop bqClient/datasetID fields from resolver and evolver; pass
  client + dataset as method arguments so they can't outlive the
  client. Close drains the resolver cache.
- Track previously fire-and-forget stream closes (evictStream and
  the idle sweeper) on a closeWg so Close waits for them before
  tearing down the underlying clients.
- Snapshot client and resolvedProjectID together under one connMu
  RLock in WriteBatch; pass projectID through to tableCacheKey.
- Run client closes unconditionally in Close — the bigquery and
  managedwriter clients are non-blocking gRPC teardowns; gating
  them on ctx.Err() leaks connections.

Config validation and tuning
- Reject delegates without target_principal at parse time.
- Lower max_in_flight default from 64 to 4 to match snowflake_streaming
  and iceberg sibling outputs.
- Detach the impersonate token source from Connect's ctx with
  context.WithoutCancel so a cancelled connect doesn't break later
  token refreshes.
- Warn explicitly that endpoint overrides disable authentication.

Cleanup
- Drop dead fieldNameMapping plumbing (resolvedSchema field, jsonToProtoBytes
  parameter, streamWithDescriptor field) — never wired in production.
- Drop unused *service.Message argument from Resolve.
- Reject non-positive proto Kind in protoKindToBQFieldType instead of
  silently coercing to STRING; cap RECORD nesting at 15 (BigQuery's
  own limit) to fail fast on self-referential descriptors.
- Set Repeated on field schemas for repeated proto fields.

Tests
- Unit tests for resolver (cache, evict, no-client fallback),
  evolver helpers (kind mapping, schema diff, repeated fields),
  buildAuthOpts (endpoint override, credentials_json, no-auth),
  error classification (gRPC permanent, REST 4xx, wrapped),
  table-name sanitization, sweep goroutine, config validation
  (durations, delegates).
- Integration tests for schema evolution and table-name
  sanitization against the goccy bigquery emulator.
@claude
Copy link
Copy Markdown

claude Bot commented May 1, 2026

Commits

Single commit bigquery: add schema resolver, evolution, and table name sanitization follows the system: message format and is descriptive. The change set is large (~1100 LOC across 10 files) and bundles several independent concerns — schema resolution + evolution + table-name sanitization clearly belong together, but the max_in_flight default change (64 → 4), the lifecycle/concurrency cleanup (closeWg, context.WithoutCancel on the impersonate token source), and the dead-field/parameter cleanup (fieldNameMapping, streamWithDescriptor.fieldNameMapping, Resolve's unused *service.Message arg) are arguably unrelated to the headline feature and could have been split per CLAUDE.md commit granularity guidance. Worth noting, not blocking.

Review

Schema resolver/evolver design is well thought out (singleflight on cold-cache loads, ETag-based CAS retry on 412, separate gRPC SCHEMA_MISMATCH_EXTRA_FIELDS classification, permanent-vs-transient split for REST 4xx via googleapi.Error). Lifecycle fixes around closeWg, snapshotting client+projectID together, and unconditional client teardown all read correctly. Tests cover the new behavior thoroughly: table-driven success+error cases for sanitization, kind mapping, schema diff, repeated fields, error classification, and config validation; integration tests use the goccy emulator with the proper service.NewStreamBuilder + license.InjectTestService pattern.

LGTM

createStream caches the ManagedStream for reuse across every
batch routing to the same table. Bind it to the per-batch ctx
and the first batch's cancellation (per-message deadline, source
shutdown, ack timeout) will sever the cached stream, blocking
every later AppendRows against it until the idle sweeper evicts
the entry.

Wrap with context.WithoutCancel so cancellation propagates only
to the in-flight NewManagedStream dial, not to the long-lived
stream itself.
@claude
Copy link
Copy Markdown

claude Bot commented May 4, 2026

Commits
LGTM

Review
Reviewed the bigquery output changes (schema resolver, evolver, table-name sanitization, lifecycle/concurrency tightening, REST 4xx classification, and the cached-stream ctx detach). Patterns match the project: RCL headers, field-name constants with the new bqwaep nested prefix, batch-error construction, mutex/ctx propagation, and tests cover the new helpers and integration paths against the emulator.

LGTM

if err != nil {
return false, fmt.Errorf("computing missing columns: %w", err)
}
if len(missing) == 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent schema evolution permanently drops messages

schema_evolution.go:155–161 — when missing == 0 on the first attempt, Evolve returns (false, nil). output.go:546–550 turns that into permanentBatchError. Under concurrent writes, the second batch to hit SCHEMA_MISMATCH sees columns already added by the first and is permanently dropped rather than retried.

@josephwoodward josephwoodward added needs more info An issue that may be a bug or useful feature, but requires more information run-integration-tests and removed needs more info An issue that may be a bug or useful feature, but requires more information labels May 14, 2026
fields := md.Fields()
for i := range fields.Len() {
fd := fields.Get(i)
if _, ok := existingNames[string(fd.Name())]; ok {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema_evolution.go:107–116existingNames map uses exact string keys. BQ column names are case-insensitive and stored lowercase; a proto field named UserID against a BQ column userid is treated as missing and an add-column request is sent. BQ rejects it as a duplicate, evolution fails, batch is dropped.

return
}
if conf.EndpointGRPC, err = epConf.FieldString(bqwaFieldEndpointGRPC); err != nil {
if conf.EndpointGRPC, err = epConf.FieldString(bqwaepFieldGRPC); err != nil {
Copy link
Copy Markdown
Contributor

@josephwoodward josephwoodward May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question(non-blocker): Why the rename here out of interest? The config variable (EndpointGRPC) is still includes endpoint?

Same question applies to bqwaFieldEndpointHTTP above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old constant bqwaFieldEndpointHTTP = "http" was a bit off — the name said "EndpointHTTP" but the actual value was just "http". That's because Namespace().FieldString() takes the leaf key inside the namespace, not the full path. bqwaep is the prefix for fields nested under endpoint, so the constant name now matches its value.

The struct field stays EndpointGRPC because the config struct is flat: no nested sub-struct, so each field needs a unique name within it.

  - diffMissingColumns: case-insensitive name match. BQ treats column
    names case-insensitively, so a proto field "UserID" against an
    existing BQ column "userid" was incorrectly flagged as missing.
    The add-column request was then rejected as duplicate, evolution
    failed, and the batch dropped to DLQ.

  - Evolve: return (true, nil) whenever no columns are missing, not
    just on retries. Under concurrent writes the second batch to hit
    SCHEMA_MISMATCH sees columns already added by the first; signal
    the caller to evict and retry rather than treating it as a
    permanent mismatch.

  - Detach Resolve and Evolve from the caller ctx. Both run inside a
    singleflight (Resolve) or after a batch has already failed
    (Evolve), so reusing the first caller's per-batch ctx propagates
    cancellation to every concurrent waiter or aborts evolution
    exactly when it is needed. Each fetch now uses
    context.WithoutCancel plus a bounded timeout.

  - Drop the unreachable evolved==false branch in handleWriteError.
    After the Evolve contract change it can never fire.
@claude
Copy link
Copy Markdown

claude Bot commented May 14, 2026

Commits
LGTM

Review
Schema resolver/evolver pipeline, table-name sanitization, error classification, and lifecycle fixes are well-structured. Context detachment (context.WithoutCancel + bounded timeouts) is correctly applied to long-lived state (impersonate token source, cached managed stream) and to shared paths (singleflight resolver, evolver), with explanatory comments. CAS-on-412 retry, case-insensitive column diff, and the (true, nil) "missing==0 means retry" contract correctly handle concurrent writers. Unit and integration test coverage is solid.

LGTM

batchLatency: m.NewTimer("bigquery_write_api_batch_latency_ns"),
retries: m.NewCounter("bigquery_write_api_retries_total"),
schemaEvolutions: m.NewCounter("bigquery_write_api_schema_evolutions_total"),
schemaEvolutionFailures: m.NewCounter("bigquery_write_api_schema_evolution_failures_total"),
Copy link
Copy Markdown
Contributor

@josephwoodward josephwoodward May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth changing this metric to bigquery_write_api_schema_evolutions_failures_total so it's consistent with the one above (bigquery_write_api_schema_evolutions_total)?

// Write APIs should be treated as permanent (no retry). 4xx HTTP status codes
// are permanent except for 408 (timeout) and 429 (throttling); all gRPC errors
// classified as permanent by classifyGRPCError are also permanent.
func isPermanentBQError(err error) bool {
Copy link
Copy Markdown
Contributor

@josephwoodward josephwoodward May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought (non-blocking): Feels like this kind of behaviour would be good to move into a custom big query error type so we could do err.IsRetryable() for instance. It could then contain a lot of other functionality like classifyGRPCError, clearing up the output.


// Drop the resolver's schema cache so a subsequent Connect doesn't reuse
// state tied to the now-closed client.
o.resolver.cache.Range(func(k, _ any) bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use o.resolver.cache.Clear() here instead?

  Rename bigquery_write_api_schema_evolution_failures_total to
  bigquery_write_api_schema_evolutions_failures_total so it shares
  the schema_evolutions_ prefix with its sibling counter.

  Use sync.Map.Clear in Close instead of Range+Delete: equivalent,
  atomic, cheaper.
@claude
Copy link
Copy Markdown

claude Bot commented May 15, 2026

Commits
LGTM

Review
Schema-resolver/evolver split-out, CAS-on-412 with case-insensitive diff, error-classification overhaul (REST 4xx + gRPC SCHEMA_MISMATCH_EXTRA_FIELDS), table-name sanitization, closeWg tracking for previously fire-and-forget stream closes, and detached contexts for Resolve/Evolve/the impersonate token source / cached NewManagedStream — all read as targeted, well-justified fixes. Tests cover the helpers (sanitizeTableName, diffMissingColumns case-insensitivity, protoKindToBQFieldType, isPermanentBQError, buildAuthOpts), the no-bq-client resolver path, and the concurrent-evolve scenario via the emulator. Good adherence to project Go and test patterns.

LGTM

  - Collapse classifyGRPCError, isPermanentBQError, and
    extractGRPCStatus into a single bqError type with IsRetryable /
    IsPermanent / IsSchemaMismatch methods. The classifier now
    handles gRPC permanent codes, REST 4xx (excluding 408/429), and
    SCHEMA_MISMATCH_EXTRA_FIELDS in one place; callers query intent
    via methods. Drops the manual unwrap loop since
    grpcstatus.FromError already uses errors.As.

  - Rename bigquery_write_api_schema_evolution_failures_total to
    bigquery_write_api_schema_evolutions_failures_total so it shares
    the schema_evolutions_ prefix with its sibling counter.

  - Replace the resolver-cache Range+Delete loop in Close with
    sync.Map.Clear (Go 1.23+): atomic, cheaper, less ceremony.
Comment on lines +137 to +145
// maxEvolveAttempts caps the CAS-on-412 retry loop so a pathologically busy
// table can't keep us spinning forever.
const maxEvolveAttempts = 5

// evolveTimeout bounds the total time Evolve will spend in BQ calls (Metadata
// + Update across all CAS attempts). Set high enough to cover several round
// trips under contention, but low enough that a wedged backend cannot starve
// the WriteBatch retry loop indefinitely.
const evolveTimeout = 30 * time.Second
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evolveTimeout = 30 * time.Second (and resolveTimeout = 5 * time.Second in schema_resolver.go#L36-L40) are hardcoded durations. Per the godev "Configurable Time Parameters" rule, every time-related value (timeouts, backoffs, intervals, retry delays) must be exposed as a YAML-configurable field. The sibling stream_idle_timeout / stream_sweep_interval fields already follow this pattern. Please expose these as ConfigSpec fields (e.g. schema_evolution_timeout, schema_resolve_timeout).

@claude
Copy link
Copy Markdown

claude Bot commented May 15, 2026

Commits

  1. The final two commits (bigquery: rename failures metric and use sync.Map.Clear and bigquery: address PR review feedback) overlap — both bodies describe the same metric rename and sync.Map.Clear change. They should be squashed into a single commit so the history reflects one logical change instead of a duplicated edit.
  2. bigquery: address PR review feedback is vague per the commit policy ("Message quality"); the body is substantive, but the title doesn't describe what changed. Consider retitling to reflect the actual change (e.g. bigquery: collapse error classifiers into bqError type).

Review
Overall a substantive, well-documented refactor. Lifecycle reasoning (closeWg, context.WithoutCancel on cached stream / impersonation token, lock-snapshot of client+projectID, CAS-on-ETag for schema evolution) is solid and the comments explain the why. Test coverage looks thorough — unit tests for the classifier, sanitizer, resolver cache, evolver diff/case-insensitivity, plus integration tests for evolution and table-name sanitization against the goccy emulator.

One issue:

  1. Hardcoded internal timeouts (evolveTimeout = 30s, resolveTimeout = 5s in schema_resolver.go#L36-L40) violate the godev "Configurable Time Parameters" rule. Expose them as ConfigSpec fields, consistent with stream_idle_timeout / stream_sweep_interval.

  Expose the previously hardcoded resolveTimeout (5s) and
  evolveTimeout (30s) as advanced YAML fields schema_resolve_timeout
  and schema_evolution_timeout, matching the pattern used by
  stream_idle_timeout / stream_sweep_interval and the godev
  "Configurable Time Parameters" convention.

  The durations now live on schemaResolver / schemaEvolver and are
  wired in by bigQueryWriteAPIOutputFromConfig. Non-positive values
  are rejected at parse time.
@claude
Copy link
Copy Markdown

claude Bot commented May 15, 2026

Commits

  1. Commit 3e803152 (bigquery: address PR review feedback) has a vague title that describes context rather than content. Per the commit policy, messages should describe what changed (e.g. bigquery: collapse error classifiers into bqError type). The body lists three distinct changes, suggesting the headline should follow system: <action> describing the primary change.
  2. Commits 73e65a4f (bigquery: rename failures metric and use sync.Map.Clear) and 3e803152 both list the same schema_evolutions_failures_total rename and sync.Map.Clear switch in their bodies. Consider squashing these into a single coherent commit before merge.

Review
The change extends the gcp_bigquery_write_api output with schema resolution caching (singleflight), schema evolution (CAS-on-412), table-name sanitization, and richer error classification. The lifecycle/concurrency reasoning is well-commented and the unit + integration test coverage is thorough.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants