avro: resolve named-type references in nullable unions#4429
Conversation
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.
Decoder (internal/impl/confluent/ecs_avro.go):
- Replace the single decimal-only branch with a dispatcher covering
timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
logicalType annotations and primitive/logical-type mismatches fall
back silently to the base primitive.
Encoder (internal/impl/confluent/common_to_avro.go):
- Symmetric encode for the new common types. Timestamp without explicit
Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
preserving pre-PR output for legacy schemas. Date and TimeOfDay
paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
AdjustToUTC for time-of-day) with field-naming errors rather than
silently downcasting.
Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
- Map schema.Timestamp through EffectiveTimestamp so legacy schemas
keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
/ TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
with field-naming errors.
Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
- Plumb a fieldID -> *schema.Common map onto RecordShredder via
SetFieldSchemaMetadata. The leaf-value converter looks up the
metadata for time-typed columns and uses the declared unit to scale
numeric inputs into the column's internal representation. Without
metadata, the converter accepts time.Time / time.Duration directly
and falls back to bloblang.ValueAsTimestamp's seconds-default for
bare numerics — preserving existing behaviour for callers that
genuinely store unix-seconds.
- This closes the silent-corruption case where a numeric millisecond
value declared by the schema as timestamp-millis would land
~50,000 years in the future when the column type flipped from BIGINT
to TIMESTAMPTZ. The schema's declared unit is now the source of
truth for unit interpretation.
Iceberg writer (internal/impl/iceberg/writer.go, router.go):
- NewWriter accepts the *typeResolver. The writer parses
schema_metadata from the first message of a batch and builds the
field-ID lookup the shredder consults. Internal API change only —
the only call site is the router.
Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.
Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.
Closes #4399
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a record field declares a union as its type, ecsAvroFromAnyMap
previously returned without consulting applyAvroLogicalType on the
field-level object. This silently dropped sibling-of-type annotations,
which is the Java/JDBC Avro idiom -- a nullable timestamp written as
{"type": ["null", "long"], "logicalType": "timestamp-millis"} rather
than nesting the annotation inside the union element.
The value-side decoder honours both idioms, so the schema-metadata side
must agree; otherwise the resulting Common reports the base primitive
and downstream consumers (e.g. the iceberg output) pick column types
that mismatch the decoded values, producing the "field <name>: expected
number value, got timestamp" shredder error against a BIGINT column.
The unit-test file covers every logical type (timestamp-{millis,micros},
local-timestamp-{millis,micros}, date, time-{millis,micros}, uuid,
decimal) under both nested-in-type and sibling-of-type idioms. An
encode/decode round-trip exercises the symmetric path through
commonToAvroNode to confirm the encoder remains consistent.
The iceberg integration test asserts that, given the @Schema shape
that schema_registry_decode now produces, the iceberg router
auto-creates a timestamp column rather than a BIGINT, and that both
time.Time and raw int64 values round-trip to the correct calendar
date via the shredder's metadata-driven unit scaling.
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
Addresses review comment #9 on PR #4402: a downstream mapping that drops schema_metadata between the schema-registry decoder and the iceberg sink would silently reintroduce the year-50000 corruption that the rest of the PR closes — the type-resolver picks TIMESTAMPTZ for the column based on metadata seen at table-creation time, but per-message metadata is what the shredder needs to interpret each numeric value's unit. When schema_evolution.require_schema_metadata is true (default false), the shredder rejects numeric inputs into time-typed columns when no schema.Common has been registered for that field. time.Time / time.Duration native inputs are unaffected — they carry their own unit unambiguously. Non-time columns are unaffected. The flag is gated to require schema_metadata also be set; setting strict mode without configuring metadata at all is a config error caught at startup. Plumbing: - config.go: new field with operator-facing description. - output_iceberg.go: parse and validate the require/has-metadata pair. - router.go: add to SchemaEvolutionConfig and pass to writer. - writer.go: extend NewWriter signature; flip shredder strict mode when the flag is set. - shredder/shredder.go: new SetStrictTemporalMode(bool) and a strictTemporal field; thread through convertLeafValue. - shredder/temporal.go: convertDate / convertTime / convertTimestamp return field-naming errors instead of falling through when strict mode is on and metadata is absent. Tests cover (a) numeric without metadata under strict mode is rejected with a require_schema_metadata=true error message, (b) native time.Time and time.Duration inputs are unaffected by strict mode, (c) numeric with metadata under strict mode succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a pipeline previously produced int64-millisecond values into a BIGINT column (the pre-fix-#4399 metadata bug), an operator who upgrades schema_registry_decode to faithfully report Timestamp metadata now finds time.Time values reaching the shredder against the same BIGINT column. The shredder's Int64Type / Int32Type arms hand the temporal value to bloblang.ValueAsInt64 which fails with "expected number value, got timestamp" -- breaking every rolling upgrade against a table that pre-dates the fix. The symmetric numeric->temporal bridge already exists in temporal.go:208 for forward-compatibility; this change adds the reverse temporal->numeric direction so a rolling upgrade can simply work without dropping any affected table. The wire-equivalent integer is computed per the schema's declared Unit (UnixMilli/Micro/Nano for Timestamp, days-since-epoch for Date, micros/millis/etc for TimeOfDay), preserving exactly what the operator's pre-fix pipeline had been storing. The writer emits one INFO-level log per (writer, fieldID) the first time it observes a metadata-vs-column type divergence, so operators can see which columns are silently coercing on write and choose to rebuild the affected tables when they want native temporal columns. Coverage: - shredder/temporal.go: new coerceTemporalToNumeric helper. - shredder/shredder.go: Int32Type / Int64Type arms call it before the bloblang fallback. Non-temporal values continue to flow unchanged. - shredder/temporal_test.go: unit tests for each temporal/Unit/column combination, plus no-op cases (plain int, no metadata, mismatch). - writer.go: logCoerceDecisions detects divergences at shredder setup time and logs once per (writer, field). - integration/schema_metadata_timestamp_test.go: end-to-end test pre-creates a table with id:STRING, ts:BIGINT, sends a message with a time.Time value plus Timestamp(Millis) metadata, asserts the column remains BIGINT and the row stores the millisecond integer. Future work (Phase 1d): require_schema_metadata=true should turn a coerce situation into a hard error rather than coercing silently. Not in scope here; the INFO log is the interim signal.
…_types The original #4399 fix promoted every Avro logical type (timestamp-*, local-timestamp-*, date, time-*, uuid) to its semantic schema.Common type in the metadata produced by the schema-registry decoder. Defaulting that behaviour on shipped a real breaking change: pipelines whose downstream iceberg tables had BIGINT / INT / STRING columns from the pre-fix bug suddenly wanted to evolve those columns to TIMESTAMP / DATE / UUID, which iceberg refuses (BIGINT->TIMESTAMP is not an allowed alter). This commit makes the metadata-side preservation contingent on preserve_logical_types: the same flag that controls whether values surface as time.Time / uuid.UUID on the message body. The two sides now move in lock-step: - preserve_logical_types: false (default): value side emits raw long/int/string from twmb/avro; metadata side keeps the base primitive. Identical to pre-#4399 behaviour byte-for-byte. - preserve_logical_types: true: value side emits the rich Go types; metadata side surfaces TIMESTAMP / DATE / TIME_OF_DAY / UUID with the correct unit / adjust-to-utc parameters. This is the shape the iceberg output needs to auto-create correctly-typed columns. Decimal stays unconditional. It pre-dates the preserve_logical_types contract and is load-bearing for normaliseAvroDecimals on the value side, which keys on Common.Type == Decimal regardless of any flag. Gating it would silently regress decimal handling for pipelines that have never opted into preserve_logical_types. The field comment on ecsAvroConfig.preserveLogicalTypes documents the carve-out. Strict mode (require_schema_metadata=true) is also tightened in the same commit: the temporal-to-numeric coerce bridge in shredder/ shredder.go's Int32Type / Int64Type arms now returns a hard error in strict mode instead of silently converting, and the writer's logCoerceDecisions emits a stronger notice ("writes will be rejected") when strict mode is enabled. Together with the coerce path itself, operators have three rolling-upgrade options: - leave preserve_logical_types alone: zero change. - flip preserve_logical_types: true on a fresh table: native timestamp columns from auto-create. - flip preserve_logical_types: true against pre-existing BIGINT tables: the coerce-on-write path stores wire-equivalent millis in the existing columns until you choose to rebuild, with an INFO log naming each divergent column. - additionally flip require_schema_metadata: true: same as above but type disagreements between metadata and existing columns become a hard write error, so operators who want loud failure on schema drift get it. Coverage: - ecs_avro_test.go / common_to_avro_test.go: existing logical-type tests pass preserveLogicalTypes: true explicitly. - ecs_avro_field_level_logical_type_test.go: new TestEcsAvroFieldLevelLogicalType_LegacyShapeWithoutFlag matrix asserts the pre-#4399 shape (Int64+optional with nil Logical) is preserved exactly under preserveLogicalTypes: false, for every logical type the gate covers. Decimal stays preserved in both modes. - shredder/temporal_test.go: new TestCoerceTemporalRejectedInStrictMode confirms strict mode disables the coerce path and returns an error naming the require_schema_metadata flag. - iceberg/integration/schema_metadata_timestamp_test.go: new TestIntegrationStrictModeRejectsCoerceOnExistingBigintColumn drives a full Route() against a pre-existing BIGINT table under strict mode and asserts the write errors with require_schema_metadata=true in the message. CHANGELOG: the BREAKING entry has been reframed. The default path is unchanged for every user who has not opted into preserve_logical_types: true. The Added section enumerates the new shredder behaviours ( schema-aware numeric scaling, temporal->numeric coerce bridge with INFO log, require_schema_metadata strict mode).
…data
Direct analogue of the field-level logicalType fix. The schema_registry_
decode processor's `translate_kafka_connect_types: true` flag has long
caused the value side to recognise Debezium temporal annotations
(io.debezium.time.{Date, Year, Timestamp, Time, MicroTimestamp,
MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp}) via the
kafkaConnectTypeOpt CustomType registration in avro_walker.go and emit
time.Time values. The metadata side however ignored the connect.name
annotation entirely, so @Schema continued to claim INT64/INT32/STRING.
The result for a Debezium-sourced iceberg pipeline running with both
`translate_kafka_connect_types: true` and `schema_metadata: schema`:
twmb emits time.Time for the field, ecsAvroParseFromBytes builds
@Schema with the base primitive, iceberg auto-creates a BIGINT column,
and the shredder rejects the typed value with the same "expected
number value, got timestamp" symptom that surfaced this whole work
stream for the customer's sibling-form Avro case.
Changes:
- Add `translateKafkaConnectTypes` to ecsAvroConfig and plumb it
through serde_avro.go's metadata-storage path. Mirrors the
`preserveLogicalTypes` plumbing pattern from Phase 1a.
- Add applyKafkaConnectType(cfg, c, as) next to applyAvroLogicalType.
Reads `connect.name` from the parsed aobject's properties and maps
to the matching schema.Common type:
- Date → schema.Date
- Year → schema.Date (value side returns time.Time at
Jan 1; days-since-epoch round-
trips faithfully through DATE)
- Timestamp, → schema.Timestamp(Millis, AdjustToUTC)
Time
- MicroTimestamp,
MicroTime → schema.Timestamp(Micros, AdjustToUTC)
- NanoTimestamp,
NanoTime → schema.Timestamp(Nanos, AdjustToUTC)
- ZonedTimestamp → schema.Timestamp(Millis, AdjustToUTC)
- Call applyKafkaConnectType from the same two sites
applyAvroLogicalType is already called from — the bottom of
ecsAvroFromAnyMap (primitive case) and the union case (after
ecsAvroHydrateRawUnion). Both flat and Debezium-nested forms
therefore get the annotation lifted into the parsed schema.Common.
Time and Timestamp are intentionally both mapped to schema.Timestamp
despite Time being semantically a time-of-day. The value side returns
time.Time for both (kafkaConnectTypeOpt's case "Timestamp", "Time"),
so the metadata side matches that shape to keep the value/metadata
contract symmetric. Operators who need a distinct TIME column can
override via the iceberg output's new_column_type_mapping.
Also closes the audit-found duration gap. Avro `duration` (fixed[12]
with logicalType: duration) is decoded by the value side to an ISO
8601 string under preserve_logical_types. The metadata side now maps
it to schema.String, matching that output and giving iceberg a
VARCHAR column instead of BINARY. Same gate as the rest of
applyAvroLogicalType: preserve_logical_types must be true.
Coverage:
- TestEcsAvroKafkaConnectTypes: matrix of all 9 Debezium types
under both translate-on (asserts the lift fires) and translate-off
(asserts the parser stays agnostic — same as the value side
under that mode).
- TestEcsAvroDurationLogicalType: preserve-on / preserve-off
matrix for the duration logical type.
Audit notes:
Cross-referenced every CustomType registration in
preserveLogicalTypeOpts() and kafkaConnectTypeOpt() against the
metadata side. After this commit, every annotation that the value
side translates has a matching metadata-side mapping. The only
remaining theoretical gap — annotations the value side doesn't
translate either (e.g. io.debezium.data.{Uuid, Json, Enum}) — is
internally consistent (both sides surface the base primitive) and
doesn't trigger the value/metadata-mismatch bug class.
…code parquet_encode was the largest remaining hole in the schema.Common consumer audit. A pipeline using the same shape that took GF down (CDC source → schema_registry_decode with preserve_logical_types: true → @Schema metadata → parquet sink) hit two distinct walls today: 1. Type coverage. parquetNodeFromCommonField rejected Date, TimeOfDay, UUID, Map, Union, and Null with a hostile "unsupported by this processor" error. 2. Temporal value coercion. encodingCoercionVisitor required RFC3339-formatted strings for any TIMESTAMP column, with the error message literally telling the operator to upstream-convert. time.Time values flowing from twmb/avro went unrecognised. This commit ports the same approach the iceberg shredder uses (Phase 1b/1c, coerceTemporalToNumeric in iceberg/shredder/temporal.go) into the parquet encoder: - parquetNodeFromCommonField now handles Date → parquet.Date(), TimeOfDay → parquet.Time(unit) with millis/micros/nanos interpretation from the Common's Logical.TimeOfDay.Unit, UUID → parquet.UUID(), and Map → parquet.Map(String(), value-node). - Null and Union remain errors because parquet has no faithful representation. The error messages now name the constraint and point at the upstream-coercion remedy, rather than producing the generic "not supported by this processor" wording. - encodingCoercionVisitor grows three new bridges: - coerceTimestampForEncode: accepts time.Time (scaled to the column's declared unit), numeric (pre-scaled passthrough), and the historical RFC3339-string path. - coerceDateForEncode: accepts time.Time (UTC-floored to days since epoch), date string (RFC3339 or bare YYYY-MM-DD), and numeric. - coerceTimeForEncode: accepts time.Duration, time.Time (wall-clock portion only), and numeric. Returns int32 for the millis unit, int64 otherwise — matching parquet's physical representation for TIME columns. Coverage: - TestParquetNodeFromCommonField_NewTypes: every new variant (Date, TimeOfDay millis/micros, UUID, Map) plus the loud-error cases (Union, Null). - TestEncodingCoercionVisitor_Temporal: round-trip-style assertions for time.Time/time.Duration/numeric/string inputs into TIMESTAMP(millis), TIMESTAMP(micros), DATE, TIME(millis), TIME(micros). Includes the "unsupported type errors clearly" case so the diagnostic stays useful. No existing tests required changes.
Running the common-schema-audit skill against the existing iceberg output surfaced a gap I'd missed: commonTypeToIcebergType handled 16 of 18 schema.CommonType variants but fell through to the generic "unsupported common schema type" error for Map and Union. A pipeline whose @Schema declares a record field as `Map<string, V>` or as a Union — both natural in Avro — would error at table-create / column-add time with a message that doesn't tell the operator what to do. parquet_encode and JSON Schema both got matching coverage in earlier commits on this branch; iceberg was the actual originator of the consumer pattern but had the same hole. Map maps to iceberg.MapType<String, ValueType>, mirroring the Avro / parquet conventions where map keys are always strings. The single schema.Common child describes the value type; we recurse through commonTypeToIcebergTypeRec to resolve it, allocating a fresh KeyID/ValueID via the shared type inferrer to keep field IDs unique across the table schema. Union is left as an explicit loud error because iceberg has no native union type — the same decision parquet_encode landed on. The message names the constraint and points at the upstream remediation ("flatten to a single branch, typically by projecting to the non-null variant") rather than the previous generic "unsupported". The shredder already had MapType handling at internal/impl/iceberg/ shredder/shredder.go:262, and the writer's buildShredderFieldCommons already descended into MapType to register value-leaf metadata at internal/impl/iceberg/writer.go (the recordOrRecurseIcebergField helper). So no value-side changes needed; the schema/value paths were just waiting for the type_resolver to start producing MapType columns. Coverage: - TestCommonTypeToIcebergType_MapAndUnion pins the new behaviour: Map of String->Int64, Map of String->Timestamp (validates the value-type recursion handles logically-typed values), the wrong- child-arity error path, and the Union loud-error case asserting the error message includes the "flatten" remediation pointer. No production behaviour change for pipelines that did not exercise Map or Union in their schema metadata; the previous error path is replaced by a successful conversion for Map and a more helpful error for Union.
The smaller half of the schema.Common consumer-coverage audit:
common_to_json_schema.go rejected schema.Date, schema.TimeOfDay, and
schema.UUID with the generic "unsupported schema type" error, even
though JSON Schema draft 2019-09 has well-defined `format` keywords
for each.
Maps:
- Date → {"type":"string","format":"date"}
- TimeOfDay → {"type":"string","format":"time"}
- UUID → {"type":"string","format":"uuid"}
Symmetric with the iceberg / parquet_encode fixes earlier on this
branch: every consumer of schema.Common now handles every variant
the type system defines, so a producer schema declaring any of these
three types can be encoded as JSON Schema without an upstream
projection step.
Coverage:
- TestCommonToJSONSchemaDateTimeUUID asserts each format keyword
end-to-end.
No production behaviour change for pipelines that did not exercise
these types; the previous error path is replaced by a successful
encoding.
The schema.Common metadata format flows between source-side producers
(schema_registry_decode, parquet_decode, CDC sources) and downstream
sinks (iceberg, parquet_encode, schema_registry_encode). Every consumer
of this metadata must (a) handle every variant of schema.CommonType and
(b) coerce values when the message body's Go type doesn't match the
schema-declared type — the same bug class that caused the GF iceberg
issue and its cascade across confluent / parquet / JSON Schema fixes
earlier on this branch.
A new consumer can re-introduce the gap without anyone noticing until
a customer pipeline breaks. The new audit skill catches the drift
mechanically:
- Enumerates the schema.CommonType variant universe from the live
benthos source, so new variants surface as needed-but-missing rows
rather than silent omissions.
- Finds every consumer via `schema.ParseFromAny` and direct type
switches, filtering out the CDC schema producers (intentionally
out of scope).
- Per-consumer Explore-agent brief covers both the type-coverage
matrix and the temporal-coercion bridges that distinguish a
rigorous consumer (iceberg post-Phase-1b/1c) from a fragile one.
- Produces a Markdown matrix plus a JSON variant for CI consumption.
- References the prior fix commits on this branch as templates:
bfcad32 (parquet type-coverage + coercion), e2b2c86
(coerceTemporalToNumeric), 8120621 (metadata-aware scaling),
8e80cb4 (JSON Schema format keywords).
Read-only skill — it produces the report so a human can prioritise
fixes, never commits changes itself.
Lives at .claude/skills/common-schema-audit/SKILL.md alongside the
existing review skill, with matching frontmatter shape
(disable-model-invocation: true; tool allowlist scoped to the
read-only subset Bash/Read/Glob/Grep/Task).
Also adds a "adding new consumers" section listing the three
contract obligations a new consumer must meet, with citations to
the reference implementations.
| case map[string]any: | ||
| inner, err := ecsAvroFromAnyMap(cfg, b) | ||
| if err != nil { | ||
| return schema.Common{}, false | ||
| } | ||
| return inner, true |
There was a problem hiding this comment.
Minor regression in error reporting: the err from ecsAvroFromAnyMap is dropped here, so the union hydrators downstream report "union ... child '...': could not resolve type map[string]interface {}" instead of the previous wrapped form "union ... child '...': decimal precision: not an integer: foo" (see the old map branch in ecsAvroHydrateRawUnion that previously called ecsAvroFromAnyMap directly with %w wrapping).
This violates the godev %w-wrapping pattern. Consider returning (Common, error) from ecsAvroResolveTypeRef and wrapping at the call site so malformed inline-record errors surface their root cause.
|
Commits Review
|
…se attempts Mirrors the iceberg shredder defense for the same bug class: the float64 arms of coerceTimestampForEncode / coerceDateForEncode / coerceTimeForEncode now reject NaN and +/-Inf rather than int-casting them to implementation-defined garbage that would silently land as year 1970 (or worse) in the column. coerceDateForEncode string-parse error path now reports both the RFC3339 and YYYY-MM-DD attempts instead of only the RFC3339 one. A malformed bare date like "2024-13-99" was previously surfaced with an RFC3339-only error that misleadingly suggested adding a time component. Adds TestEncodingCoercionVisitor_RejectsNaNInf (parallel to the iceberg shredder TestTemporalRejectsNaNInf) and TestCoerceDateForEncode_StringErrorSurfacesBothAttempts.
When applyAvroLogicalType promotes a `fixed`-backed Avro decimal to schema.Decimal, the fact that the source was fixed[N] rather than bytes is discarded — schema.Common has no field to retain it. This is lossless for every downstream consumer of schema.Common (iceberg, parquet, JSON Schema, value-side normaliseAvroDecimals), all of which pick their own physical encoding from precision/scale. The one place it leaks is common_to_avro.go re-emit, which will always produce a bytes-backed decimal even when the source schema was fixed-backed: semantically equivalent but a different Avro schema shape, which can matter for Schema Registry compatibility checks that compare by equality. Comment-only; the trade-off was already implicit but unwritten, so future readers tracing the path do not have to rederive it.
The default check is an O(snapshot) scan of every manifest entry, which dominated the commit hot path on busy tables and produced a multiplicative slowdown over time (T6692). Each parquet file path already carries a fresh uuid, so the check is redundant for this writer.
Avro JSON schemas may reference a previously-defined record/enum/fixed by
name rather than inlining the full definition — the Java/JDBC idiom for
any record reused across more than one field, e.g.
{"name": "secondary_fee", "type": ["null", "Fee"]}
where "Fee" was defined inline by an earlier field. The metadata parser
in ecsAvroParseFromBytes was treating the string branch as an unknown
type and falling through to schema.Any, so the resulting common-schema
metadata reported the field as VARCHAR rather than the registered record
structure. Downstream sinks (notably iceberg) then created a string
column where the customer expected a nested struct.
Thread a names map through ecsAvroConfig, register every record/enum/
fixed by both its simple name and its fully-qualified namespace.name
form, and have the string-form type resolver consult the map before
falling back to the primitive-name lookup. Also generalise the optional-
union helper to accept either ordering -- [null, X] and [X, null] -- since
the Avro spec doesn't constrain branch order.
The lexical-scope assumption -- a name must be defined before it is
referenced -- is the Avro spec's, so a single forward-only pass suffices.
Self-referential records remain unsupported and would need pre-
registration with a placeholder; flagged in the registration helper's
comment.
Tests cover the four shapes CON-468's acceptance criteria call out:
nullable inline record (already green via #4380), nullable record by
name reference, both branch orderings, fully-qualified vs short-name
references, and record-with-nested-record where the inner level is
itself a name-reference union.
The lame-union path -- raw_unions: false on schema_registry_decode, which is the documented default -- carried the same bug as the raw path: string branches like "Fee" in ["null", "Fee"] went through ecsAvroTypeToCommon directly and collapsed to schema.Any, even when "Fee" was a previously- defined record. The tagged-JSON envelope around each branch then wrapped an Any inner, producing a structureless metadata tree. Reroute the lame hydrator through the same ecsAvroResolveTypeRef helper the raw path now uses, then re-apply the lame-specific wrapping (tagged-Object envelope, type-name preserved as Common.Name to match the wire-form tag). The non-Avro behavior of the lame envelope is unchanged; only the inner Common is now correctly populated for name references. This closes the same CON-468 bug class for the default-config path that commit 4531c51 closed for the raw-unions path.
10000e8 to
bbd78f9
Compare
| } | ||
| t = t2 | ||
| } | ||
| return int32(t.UTC().Unix() / 86400), nil |
There was a problem hiding this comment.
The RFC3339 string path computes t.UTC().Unix() / 86400 without the floor-vs-truncate adjustment that the time.Time arm explicitly added at line 175-179. For a pre-epoch RFC3339 string with a fractional-day time component (e.g. "1969-12-31T12:00:00Z"), Unix() is -43200 and Go's integer division truncates toward zero to 0 (Jan 1 1970) instead of the correct -1 (Dec 31 1969). The bare-date "2006-01-02" shape lands exactly on midnight so it's unaffected, but RFC3339 inputs land here. Apply the same secs < 0 && secs%86400 != 0 adjustment used in the time.Time arm.
|
Commits Review
|
Summary
Closes CON-468. The Avro JSON schema parser at
internal/impl/confluent/ecs_avro.gowas dropping named-type references in nullable unions — i.e. the["null", "Fee"]idiom whereFeeis a previously-defined record reused across more than one field — toschema.Any, so downstream sinks (notablyiceberg) saw a VARCHAR column where the customer's Avro schema asked for a nested struct.#4380 closed the inline-record form of this shape (
["null", {"type":"record","name":"Fee",...}]). It did not address the more common name-reference form, which any non-trivial Avro schema with a reused record will produce — the Avro spec requires named types to be defined once and referenced by name thereafter.What changed
ecsAvroConfignow carries anames map[string]schema.Commonpopulated as the parser walks the schema. Everyrecord,enum, andfixeddefinition is registered under both its simple name and its fully-qualifiednamespace.nameform, matching Avro's lexical-scope resolution rules. String-form type references consult the map before falling back to the primitive-name lookup, so"Fee"resolves to the registered record's full structure instead of collapsing toschema.Any.The two existing optional-union helpers (
ecsAvroIsUnionJustOptionalfor primitives,ecsAvroIsUnionJustOptionalObjectfor inline objects) are replaced by a unifiedecsAvroResolveOptionalUnionthat handles all three branch forms (primitive, named reference, inline object) and accepts either ordering —["null", X]and[X, null]are now equivalent, per CON-468 acceptance criterion 2.Both the raw-union and lame-union (default
raw_unions: false) hydrators route through the sameecsAvroResolveTypeRefhelper, so the fix covers every customer regardless of theirraw_unionssetting.Commit narrative
4531c517110000e8a4Scope notes
schema.Commonmetadata to begin with —store_schema_metadatais wired only for the Avro arm. That is a pre-existing limitation, not a regression, and a feature request worth its own ticket if a customer asks for it.Test plan
go test ./internal/impl/confluent/...— all green, including the six new tests below.task lint—0 issues.TestEcsAvroRawUnionNestedRecord(the #4380 regression) still passes — the inline-record path is unchanged.New tests:
TestEcsAvroRawUnionNullableRecordByName—["null", "Fee"]name reference.TestEcsAvroRawUnionNullableOrderIndependence(3 sub-tests) —[X, "null"]across inline records, primitives, and name references.TestEcsAvroRawUnionNullableRecordNamespaced— short and fully-qualified namespace references.TestEcsAvroRawUnionNullableRecordNested— record-containing-record where the inner is a name-reference union.TestEcsAvroLameUnionNameResolution— same fix verified in the default-config (lame-union) path.