bigquery: add schema resolver, evolution, and table name sanitization#4385
bigquery: add schema resolver, evolution, and table name sanitization#4385squiidz wants to merge 6 commits into
Conversation
Schema resolution + evolution - Add schemaResolver that fetches table metadata, builds a proto descriptor via the adapt pipeline, caches per table, and deduplicates concurrent cold-cache loads via singleflight. - Add schemaEvolver that diffs proto descriptor vs table schema and adds missing columns. Uses ETag with CAS-on-412 retry (max 5 attempts) so concurrent additive evolutions don't clobber each other; on retry exhaustion or a benign "another writer added the columns" outcome, signal the caller to retry the write. - Add grpcSchemaMismatch classification via StorageError details. In handleWriteError: on schema mismatch, evolve and retry; on evolution failure, return a permanent BatchError so messages go to DLQ instead of being retried forever. - Add bigquery_write_api_schema_evolutions_total and bigquery_write_api_schema_evolution_failures_total counters. Table name handling - Sanitize interpolated table names (dots, hyphens, slashes, and whitespace become underscores; non-ASCII-alphanumerics are stripped; leading digits prefixed with _; cap at 1024 chars). - Reject empty-after-sanitization names with a permanent BatchError. Error classification - Add isPermanentBQError that recognises both gRPC permanent codes and *googleapi.Error 4xx (excluding 408/429), so REST 4xx errors from the resolver path no longer retry forever. Lifecycle and concurrency - Drop bqClient/datasetID fields from resolver and evolver; pass client + dataset as method arguments so they can't outlive the client. Close drains the resolver cache. - Track previously fire-and-forget stream closes (evictStream and the idle sweeper) on a closeWg so Close waits for them before tearing down the underlying clients. - Snapshot client and resolvedProjectID together under one connMu RLock in WriteBatch; pass projectID through to tableCacheKey. - Run client closes unconditionally in Close — the bigquery and managedwriter clients are non-blocking gRPC teardowns; gating them on ctx.Err() leaks connections. Config validation and tuning - Reject delegates without target_principal at parse time. - Lower max_in_flight default from 64 to 4 to match snowflake_streaming and iceberg sibling outputs. - Detach the impersonate token source from Connect's ctx with context.WithoutCancel so a cancelled connect doesn't break later token refreshes. - Warn explicitly that endpoint overrides disable authentication. Cleanup - Drop dead fieldNameMapping plumbing (resolvedSchema field, jsonToProtoBytes parameter, streamWithDescriptor field) — never wired in production. - Drop unused *service.Message argument from Resolve. - Reject non-positive proto Kind in protoKindToBQFieldType instead of silently coercing to STRING; cap RECORD nesting at 15 (BigQuery's own limit) to fail fast on self-referential descriptors. - Set Repeated on field schemas for repeated proto fields. Tests - Unit tests for resolver (cache, evict, no-client fallback), evolver helpers (kind mapping, schema diff, repeated fields), buildAuthOpts (endpoint override, credentials_json, no-auth), error classification (gRPC permanent, REST 4xx, wrapped), table-name sanitization, sweep goroutine, config validation (durations, delegates). - Integration tests for schema evolution and table-name sanitization against the goccy bigquery emulator.
|
Commits Single commit Review Schema resolver/evolver design is well thought out (singleflight on cold-cache loads, ETag-based CAS retry on 412, separate gRPC LGTM |
createStream caches the ManagedStream for reuse across every batch routing to the same table. Bind it to the per-batch ctx and the first batch's cancellation (per-message deadline, source shutdown, ack timeout) will sever the cached stream, blocking every later AppendRows against it until the idle sweeper evicts the entry. Wrap with context.WithoutCancel so cancellation propagates only to the in-flight NewManagedStream dial, not to the long-lived stream itself.
|
Commits Review LGTM |
| if err != nil { | ||
| return false, fmt.Errorf("computing missing columns: %w", err) | ||
| } | ||
| if len(missing) == 0 { |
There was a problem hiding this comment.
Concurrent schema evolution permanently drops messages
schema_evolution.go:155–161 — when missing == 0 on the first attempt, Evolve returns (false, nil). output.go:546–550 turns that into permanentBatchError. Under concurrent writes, the second batch to hit SCHEMA_MISMATCH sees columns already added by the first and is permanently dropped rather than retried.
| fields := md.Fields() | ||
| for i := range fields.Len() { | ||
| fd := fields.Get(i) | ||
| if _, ok := existingNames[string(fd.Name())]; ok { |
There was a problem hiding this comment.
schema_evolution.go:107–116 — existingNames map uses exact string keys. BQ column names are case-insensitive and stored lowercase; a proto field named UserID against a BQ column userid is treated as missing and an add-column request is sent. BQ rejects it as a duplicate, evolution fails, batch is dropped.
| return | ||
| } | ||
| if conf.EndpointGRPC, err = epConf.FieldString(bqwaFieldEndpointGRPC); err != nil { | ||
| if conf.EndpointGRPC, err = epConf.FieldString(bqwaepFieldGRPC); err != nil { |
There was a problem hiding this comment.
question(non-blocker): Why the rename here out of interest? The config variable (EndpointGRPC) is still includes endpoint?
Same question applies to bqwaFieldEndpointHTTP above.
There was a problem hiding this comment.
Old constant bqwaFieldEndpointHTTP = "http" was a bit off — the name said "EndpointHTTP" but the actual value was just "http". That's because Namespace().FieldString() takes the leaf key inside the namespace, not the full path. bqwaep is the prefix for fields nested under endpoint, so the constant name now matches its value.
The struct field stays EndpointGRPC because the config struct is flat: no nested sub-struct, so each field needs a unique name within it.
- diffMissingColumns: case-insensitive name match. BQ treats column
names case-insensitively, so a proto field "UserID" against an
existing BQ column "userid" was incorrectly flagged as missing.
The add-column request was then rejected as duplicate, evolution
failed, and the batch dropped to DLQ.
- Evolve: return (true, nil) whenever no columns are missing, not
just on retries. Under concurrent writes the second batch to hit
SCHEMA_MISMATCH sees columns already added by the first; signal
the caller to evict and retry rather than treating it as a
permanent mismatch.
- Detach Resolve and Evolve from the caller ctx. Both run inside a
singleflight (Resolve) or after a batch has already failed
(Evolve), so reusing the first caller's per-batch ctx propagates
cancellation to every concurrent waiter or aborts evolution
exactly when it is needed. Each fetch now uses
context.WithoutCancel plus a bounded timeout.
- Drop the unreachable evolved==false branch in handleWriteError.
After the Evolve contract change it can never fire.
|
Commits Review LGTM |
| batchLatency: m.NewTimer("bigquery_write_api_batch_latency_ns"), | ||
| retries: m.NewCounter("bigquery_write_api_retries_total"), | ||
| schemaEvolutions: m.NewCounter("bigquery_write_api_schema_evolutions_total"), | ||
| schemaEvolutionFailures: m.NewCounter("bigquery_write_api_schema_evolution_failures_total"), |
There was a problem hiding this comment.
Would it be worth changing this metric to bigquery_write_api_schema_evolutions_failures_total so it's consistent with the one above (bigquery_write_api_schema_evolutions_total)?
| // Write APIs should be treated as permanent (no retry). 4xx HTTP status codes | ||
| // are permanent except for 408 (timeout) and 429 (throttling); all gRPC errors | ||
| // classified as permanent by classifyGRPCError are also permanent. | ||
| func isPermanentBQError(err error) bool { |
There was a problem hiding this comment.
thought (non-blocking): Feels like this kind of behaviour would be good to move into a custom big query error type so we could do err.IsRetryable() for instance. It could then contain a lot of other functionality like classifyGRPCError, clearing up the output.
|
|
||
| // Drop the resolver's schema cache so a subsequent Connect doesn't reuse | ||
| // state tied to the now-closed client. | ||
| o.resolver.cache.Range(func(k, _ any) bool { |
There was a problem hiding this comment.
Can we use o.resolver.cache.Clear() here instead?
Rename bigquery_write_api_schema_evolution_failures_total to bigquery_write_api_schema_evolutions_failures_total so it shares the schema_evolutions_ prefix with its sibling counter. Use sync.Map.Clear in Close instead of Range+Delete: equivalent, atomic, cheaper.
|
Commits Review LGTM |
- Collapse classifyGRPCError, isPermanentBQError, and
extractGRPCStatus into a single bqError type with IsRetryable /
IsPermanent / IsSchemaMismatch methods. The classifier now
handles gRPC permanent codes, REST 4xx (excluding 408/429), and
SCHEMA_MISMATCH_EXTRA_FIELDS in one place; callers query intent
via methods. Drops the manual unwrap loop since
grpcstatus.FromError already uses errors.As.
- Rename bigquery_write_api_schema_evolution_failures_total to
bigquery_write_api_schema_evolutions_failures_total so it shares
the schema_evolutions_ prefix with its sibling counter.
- Replace the resolver-cache Range+Delete loop in Close with
sync.Map.Clear (Go 1.23+): atomic, cheaper, less ceremony.
| // maxEvolveAttempts caps the CAS-on-412 retry loop so a pathologically busy | ||
| // table can't keep us spinning forever. | ||
| const maxEvolveAttempts = 5 | ||
|
|
||
| // evolveTimeout bounds the total time Evolve will spend in BQ calls (Metadata | ||
| // + Update across all CAS attempts). Set high enough to cover several round | ||
| // trips under contention, but low enough that a wedged backend cannot starve | ||
| // the WriteBatch retry loop indefinitely. | ||
| const evolveTimeout = 30 * time.Second |
There was a problem hiding this comment.
evolveTimeout = 30 * time.Second (and resolveTimeout = 5 * time.Second in schema_resolver.go#L36-L40) are hardcoded durations. Per the godev "Configurable Time Parameters" rule, every time-related value (timeouts, backoffs, intervals, retry delays) must be exposed as a YAML-configurable field. The sibling stream_idle_timeout / stream_sweep_interval fields already follow this pattern. Please expose these as ConfigSpec fields (e.g. schema_evolution_timeout, schema_resolve_timeout).
|
Commits
Review One issue:
|
Expose the previously hardcoded resolveTimeout (5s) and evolveTimeout (30s) as advanced YAML fields schema_resolve_timeout and schema_evolution_timeout, matching the pattern used by stream_idle_timeout / stream_sweep_interval and the godev "Configurable Time Parameters" convention. The durations now live on schemaResolver / schemaEvolver and are wired in by bigQueryWriteAPIOutputFromConfig. Non-positive values are rejected at parse time.
|
Commits
Review LGTM |
Schema resolution + evolution
Table name handling
Error classification
Lifecycle and concurrency
Config validation and tuning
Cleanup
Tests