Skip to content

avro: resolve named-type references in nullable unions#4429

Open
Jeffail wants to merge 16 commits into
mainfrom
fix/avro-nullable-record-name-resolution
Open

avro: resolve named-type references in nullable unions#4429
Jeffail wants to merge 16 commits into
mainfrom
fix/avro-nullable-record-name-resolution

Conversation

@Jeffail
Copy link
Copy Markdown
Contributor

@Jeffail Jeffail commented May 14, 2026

Summary

Closes CON-468. The Avro JSON schema parser at internal/impl/confluent/ecs_avro.go was dropping named-type references in nullable unions — i.e. the ["null", "Fee"] idiom where Fee is a previously-defined record reused across more than one field — to schema.Any, so downstream sinks (notably iceberg) saw a VARCHAR column where the customer's Avro schema asked for a nested struct.

#4380 closed the inline-record form of this shape (["null", {"type":"record","name":"Fee",...}]). It did not address the more common name-reference form, which any non-trivial Avro schema with a reused record will produce — the Avro spec requires named types to be defined once and referenced by name thereafter.

What changed

ecsAvroConfig now carries a names map[string]schema.Common populated as the parser walks the schema. Every record, enum, and fixed definition is registered under both its simple name and its fully-qualified namespace.name form, matching Avro's lexical-scope resolution rules. String-form type references consult the map before falling back to the primitive-name lookup, so "Fee" resolves to the registered record's full structure instead of collapsing to schema.Any.

The two existing optional-union helpers (ecsAvroIsUnionJustOptional for primitives, ecsAvroIsUnionJustOptionalObject for inline objects) are replaced by a unified ecsAvroResolveOptionalUnion that handles all three branch forms (primitive, named reference, inline object) and accepts either ordering — ["null", X] and [X, null] are now equivalent, per CON-468 acceptance criterion 2.

Both the raw-union and lame-union (default raw_unions: false) hydrators route through the same ecsAvroResolveTypeRef helper, so the fix covers every customer regardless of their raw_unions setting.

Commit narrative

Commit Scope
4531c5171 Names map, unified resolver, named-type registration, raw-union path. Five tests covering CON-468 criterion 2 verbatim.
10000e8a4 Same fix routed through the lame-union path (default config). One additional test.

Scope notes

  • Self-referential records (a record whose own children reference it by name before its definition completes) remain unsupported. Closing that would require pre-registering a placeholder before walking children. Avro recursive types are rare; we can address as a follow-up if a customer hits it.
  • JSON Schema and Protobuf decode paths in the Schema Registry processor never produced schema.Common metadata to begin with — store_schema_metadata is wired only for the Avro arm. That is a pre-existing limitation, not a regression, and a feature request worth its own ticket if a customer asks for it.

Test plan

  • go test ./internal/impl/confluent/... — all green, including the six new tests below.
  • task lint0 issues.
  • Verified the existing TestEcsAvroRawUnionNestedRecord (the #4380 regression) still passes — the inline-record path is unchanged.

New tests:

  • TestEcsAvroRawUnionNullableRecordByName["null", "Fee"] name reference.
  • TestEcsAvroRawUnionNullableOrderIndependence (3 sub-tests) — [X, "null"] across inline records, primitives, and name references.
  • TestEcsAvroRawUnionNullableRecordNamespaced — short and fully-qualified namespace references.
  • TestEcsAvroRawUnionNullableRecordNested — record-containing-record where the inner is a name-reference union.
  • TestEcsAvroLameUnionNameResolution — same fix verified in the default-config (lame-union) path.

twmb and others added 11 commits May 14, 2026 11:01
The schema-registry Avro decoder now honours every Avro logical type the
spec defines (previously only `decimal`), and the iceberg connector maps
the resulting schema metadata to the right column type and interprets
numeric values using the schema's declared unit.

Decoder (internal/impl/confluent/ecs_avro.go):
  - Replace the single decimal-only branch with a dispatcher covering
    timestamp-{millis,micros,nanos}, local-timestamp-{millis,micros,nanos},
    date, time-{millis,micros}, and uuid. Per Avro 1.10 spec, unrecognised
    logicalType annotations and primitive/logical-type mismatches fall
    back silently to the base primitive.

Encoder (internal/impl/confluent/common_to_avro.go):
  - Symmetric encode for the new common types. Timestamp without explicit
    Logical params keeps emitting `timestamp-millis` via EffectiveTimestamp,
    preserving pre-PR output for legacy schemas. Date and TimeOfDay
    paths reject Avro-inexpressible shapes (e.g. nanos for time-of-day,
    AdjustToUTC for time-of-day) with field-naming errors rather than
    silently downcasting.

Iceberg type resolver (internal/impl/iceberg/type_resolver.go):
  - Map schema.Timestamp through EffectiveTimestamp so legacy schemas
    keep landing on TimestampTzType. Honour Logical.Timestamp.Unit and
    AdjustToUTC to pick TimestampType / TimestampTzType / TimestampNsType
    / TimestampTzNsType. Add Date, TimeOfDay, UUID arms; reject TimeOfDay
    shapes Iceberg can't faithfully represent (AdjustToUTC=true, nanos)
    with field-naming errors.

Iceberg shredder (internal/impl/iceberg/shredder/{shredder,temporal}.go):
  - Plumb a fieldID -> *schema.Common map onto RecordShredder via
    SetFieldSchemaMetadata. The leaf-value converter looks up the
    metadata for time-typed columns and uses the declared unit to scale
    numeric inputs into the column's internal representation. Without
    metadata, the converter accepts time.Time / time.Duration directly
    and falls back to bloblang.ValueAsTimestamp's seconds-default for
    bare numerics — preserving existing behaviour for callers that
    genuinely store unix-seconds.
  - This closes the silent-corruption case where a numeric millisecond
    value declared by the schema as timestamp-millis would land
    ~50,000 years in the future when the column type flipped from BIGINT
    to TIMESTAMPTZ. The schema's declared unit is now the source of
    truth for unit interpretation.

Iceberg writer (internal/impl/iceberg/writer.go, router.go):
  - NewWriter accepts the *typeResolver. The writer parses
    schema_metadata from the first message of a batch and builds the
    field-ID lookup the shredder consults. Internal API change only —
    the only call site is the router.

Breaking surface is documented in CHANGELOG.md under Unreleased.
Pipeline values flow through unchanged in both preserve_logical_types
modes; bloblang behaviour and JSON output bytes are unaffected.
The breakage is concentrated in (a) iceberg tables that already exist
with BIGINT/INT/STRING columns from this bug, which hit Iceberg's
schema-evolution wall, and (b) custom code that pattern-matches the
historical schema_metadata shape via meta() lookups.

Companion to redpanda-data/benthos#429 which adds the new schema.Common
types and parameter blocks. The go.mod replace directive is a
development crutch and must flip to a tagged release before merge.

Closes #4399

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a record field declares a union as its type, ecsAvroFromAnyMap
previously returned without consulting applyAvroLogicalType on the
field-level object. This silently dropped sibling-of-type annotations,
which is the Java/JDBC Avro idiom -- a nullable timestamp written as
{"type": ["null", "long"], "logicalType": "timestamp-millis"} rather
than nesting the annotation inside the union element.

The value-side decoder honours both idioms, so the schema-metadata side
must agree; otherwise the resulting Common reports the base primitive
and downstream consumers (e.g. the iceberg output) pick column types
that mismatch the decoded values, producing the "field <name>: expected
number value, got timestamp" shredder error against a BIGINT column.

The unit-test file covers every logical type (timestamp-{millis,micros},
local-timestamp-{millis,micros}, date, time-{millis,micros}, uuid,
decimal) under both nested-in-type and sibling-of-type idioms. An
encode/decode round-trip exercises the symmetric path through
commonToAvroNode to confirm the encoder remains consistent.

The iceberg integration test asserts that, given the @Schema shape
that schema_registry_decode now produces, the iceberg router
auto-creates a timestamp column rather than a BIGINT, and that both
time.Time and raw int64 values round-trip to the correct calendar
date via the shredder's metadata-driven unit scaling.
Picks up two upstream landings from the rolling-fix work:

  1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our
     own metadata parser already handles. Pulling it in means the
     value-side decoder now produces time.Time for sibling-form
     timestamp-millis (and the rest of the matrix) natively, instead
     of returning int64 and relying on the iceberg shredder's
     metadata-driven numeric scaling bridge to reconcile.

  2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and
     spec-compliance pass. Includes "decimal precision/scale, spec
     form" which changes how decimal-typed values serialise under
     EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as
     the codepoint-mapped string "!" rather than the numeric 0.33,
     matching Java's JsonEncoder output.

The shredder coerce bridge in iceberg/shredder/temporal.go stays —
it's now a safety net rather than load-bearing infrastructure. The
metadata-side fix in confluent/ecs_avro.go also stays because it
parses schemas into schema.Common independently of twmb (the iceberg
output's schema_metadata path uses Common, not twmb's schemaNode).

Coverage:
  - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the
    upstream PR #38 behaviour by asserting that sibling-form schemas
    decode to time.Time end-to-end. If twmb ever regresses on this,
    the test surfaces it in the package that depends on the contract.

  - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson:
    pos_0_33333333 default-mode expectation updated from `0.33` to
    `"!"` per the spec form. Preserved-mode expectation unchanged —
    our preserveLogicalTypeOpts decimal CustomType still produces
    json.Number, which the SetStructuredMut path preserves through
    Go's json.Marshal.

CHANGELOG: a "Changed (potentially breaking)" entry documents the
decimal serialisation shape change for default-mode users and points
at preserve_logical_types: true as the migration knob.
Addresses review comment #9 on PR #4402: a downstream mapping that drops
schema_metadata between the schema-registry decoder and the iceberg sink
would silently reintroduce the year-50000 corruption that the rest of
the PR closes — the type-resolver picks TIMESTAMPTZ for the column based
on metadata seen at table-creation time, but per-message metadata is
what the shredder needs to interpret each numeric value's unit.

When schema_evolution.require_schema_metadata is true (default false),
the shredder rejects numeric inputs into time-typed columns when no
schema.Common has been registered for that field. time.Time /
time.Duration native inputs are unaffected — they carry their own unit
unambiguously. Non-time columns are unaffected.

The flag is gated to require schema_metadata also be set; setting strict
mode without configuring metadata at all is a config error caught at
startup.

Plumbing:
- config.go: new field with operator-facing description.
- output_iceberg.go: parse and validate the require/has-metadata pair.
- router.go: add to SchemaEvolutionConfig and pass to writer.
- writer.go: extend NewWriter signature; flip shredder strict mode when
  the flag is set.
- shredder/shredder.go: new SetStrictTemporalMode(bool) and a
  strictTemporal field; thread through convertLeafValue.
- shredder/temporal.go: convertDate / convertTime / convertTimestamp
  return field-naming errors instead of falling through when strict
  mode is on and metadata is absent.

Tests cover (a) numeric without metadata under strict mode is rejected
with a require_schema_metadata=true error message, (b) native time.Time
and time.Duration inputs are unaffected by strict mode, (c) numeric with
metadata under strict mode succeeds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a pipeline previously produced int64-millisecond values into a BIGINT
column (the pre-fix-#4399 metadata bug), an operator who upgrades
schema_registry_decode to faithfully report Timestamp metadata now finds
time.Time values reaching the shredder against the same BIGINT column. The
shredder's Int64Type / Int32Type arms hand the temporal value to
bloblang.ValueAsInt64 which fails with "expected number value, got
timestamp" -- breaking every rolling upgrade against a table that pre-dates
the fix.

The symmetric numeric->temporal bridge already exists in temporal.go:208
for forward-compatibility; this change adds the reverse temporal->numeric
direction so a rolling upgrade can simply work without dropping any
affected table. The wire-equivalent integer is computed per the schema's
declared Unit (UnixMilli/Micro/Nano for Timestamp, days-since-epoch for
Date, micros/millis/etc for TimeOfDay), preserving exactly what the
operator's pre-fix pipeline had been storing.

The writer emits one INFO-level log per (writer, fieldID) the first time
it observes a metadata-vs-column type divergence, so operators can see
which columns are silently coercing on write and choose to rebuild the
affected tables when they want native temporal columns.

Coverage:
  - shredder/temporal.go: new coerceTemporalToNumeric helper.
  - shredder/shredder.go: Int32Type / Int64Type arms call it before the
    bloblang fallback. Non-temporal values continue to flow unchanged.
  - shredder/temporal_test.go: unit tests for each temporal/Unit/column
    combination, plus no-op cases (plain int, no metadata, mismatch).
  - writer.go: logCoerceDecisions detects divergences at shredder setup
    time and logs once per (writer, field).
  - integration/schema_metadata_timestamp_test.go: end-to-end test
    pre-creates a table with id:STRING, ts:BIGINT, sends a message with a
    time.Time value plus Timestamp(Millis) metadata, asserts the column
    remains BIGINT and the row stores the millisecond integer.

Future work (Phase 1d): require_schema_metadata=true should turn a coerce
situation into a hard error rather than coercing silently. Not in scope
here; the INFO log is the interim signal.
…_types

The original #4399 fix promoted every Avro logical type (timestamp-*,
local-timestamp-*, date, time-*, uuid) to its semantic schema.Common type
in the metadata produced by the schema-registry decoder. Defaulting that
behaviour on shipped a real breaking change: pipelines whose downstream
iceberg tables had BIGINT / INT / STRING columns from the pre-fix bug
suddenly wanted to evolve those columns to TIMESTAMP / DATE / UUID, which
iceberg refuses (BIGINT->TIMESTAMP is not an allowed alter).

This commit makes the metadata-side preservation contingent on
preserve_logical_types: the same flag that controls whether values
surface as time.Time / uuid.UUID on the message body. The two sides now
move in lock-step:

  - preserve_logical_types: false (default): value side emits raw
    long/int/string from twmb/avro; metadata side keeps the base
    primitive. Identical to pre-#4399 behaviour byte-for-byte.
  - preserve_logical_types: true: value side emits the rich Go types;
    metadata side surfaces TIMESTAMP / DATE / TIME_OF_DAY / UUID with
    the correct unit / adjust-to-utc parameters. This is the shape the
    iceberg output needs to auto-create correctly-typed columns.

Decimal stays unconditional. It pre-dates the preserve_logical_types
contract and is load-bearing for normaliseAvroDecimals on the value
side, which keys on Common.Type == Decimal regardless of any flag.
Gating it would silently regress decimal handling for pipelines that
have never opted into preserve_logical_types. The field comment on
ecsAvroConfig.preserveLogicalTypes documents the carve-out.

Strict mode (require_schema_metadata=true) is also tightened in the
same commit: the temporal-to-numeric coerce bridge in shredder/
shredder.go's Int32Type / Int64Type arms now returns a hard error in
strict mode instead of silently converting, and the writer's
logCoerceDecisions emits a stronger notice ("writes will be rejected")
when strict mode is enabled. Together with the coerce path itself,
operators have three rolling-upgrade options:

  - leave preserve_logical_types alone: zero change.
  - flip preserve_logical_types: true on a fresh table: native timestamp
    columns from auto-create.
  - flip preserve_logical_types: true against pre-existing BIGINT
    tables: the coerce-on-write path stores wire-equivalent millis in
    the existing columns until you choose to rebuild, with an INFO log
    naming each divergent column.
  - additionally flip require_schema_metadata: true: same as above but
    type disagreements between metadata and existing columns become a
    hard write error, so operators who want loud failure on schema
    drift get it.

Coverage:
  - ecs_avro_test.go / common_to_avro_test.go: existing logical-type
    tests pass preserveLogicalTypes: true explicitly.
  - ecs_avro_field_level_logical_type_test.go: new
    TestEcsAvroFieldLevelLogicalType_LegacyShapeWithoutFlag matrix
    asserts the pre-#4399 shape (Int64+optional with nil Logical) is
    preserved exactly under preserveLogicalTypes: false, for every
    logical type the gate covers. Decimal stays preserved in both
    modes.
  - shredder/temporal_test.go: new TestCoerceTemporalRejectedInStrictMode
    confirms strict mode disables the coerce path and returns an error
    naming the require_schema_metadata flag.
  - iceberg/integration/schema_metadata_timestamp_test.go: new
    TestIntegrationStrictModeRejectsCoerceOnExistingBigintColumn drives
    a full Route() against a pre-existing BIGINT table under strict
    mode and asserts the write errors with require_schema_metadata=true
    in the message.

CHANGELOG: the BREAKING entry has been reframed. The default path is
unchanged for every user who has not opted into preserve_logical_types:
true. The Added section enumerates the new shredder behaviours (
schema-aware numeric scaling, temporal->numeric coerce bridge with
INFO log, require_schema_metadata strict mode).
…data

Direct analogue of the field-level logicalType fix. The schema_registry_
decode processor's `translate_kafka_connect_types: true` flag has long
caused the value side to recognise Debezium temporal annotations
(io.debezium.time.{Date, Year, Timestamp, Time, MicroTimestamp,
MicroTime, NanoTimestamp, NanoTime, ZonedTimestamp}) via the
kafkaConnectTypeOpt CustomType registration in avro_walker.go and emit
time.Time values. The metadata side however ignored the connect.name
annotation entirely, so @Schema continued to claim INT64/INT32/STRING.

The result for a Debezium-sourced iceberg pipeline running with both
`translate_kafka_connect_types: true` and `schema_metadata: schema`:
twmb emits time.Time for the field, ecsAvroParseFromBytes builds
@Schema with the base primitive, iceberg auto-creates a BIGINT column,
and the shredder rejects the typed value with the same "expected
number value, got timestamp" symptom that surfaced this whole work
stream for the customer's sibling-form Avro case.

Changes:

  - Add `translateKafkaConnectTypes` to ecsAvroConfig and plumb it
    through serde_avro.go's metadata-storage path. Mirrors the
    `preserveLogicalTypes` plumbing pattern from Phase 1a.

  - Add applyKafkaConnectType(cfg, c, as) next to applyAvroLogicalType.
    Reads `connect.name` from the parsed aobject's properties and maps
    to the matching schema.Common type:
      - Date         → schema.Date
      - Year         → schema.Date  (value side returns time.Time at
                                     Jan 1; days-since-epoch round-
                                     trips faithfully through DATE)
      - Timestamp,   → schema.Timestamp(Millis, AdjustToUTC)
        Time
      - MicroTimestamp,
        MicroTime    → schema.Timestamp(Micros, AdjustToUTC)
      - NanoTimestamp,
        NanoTime     → schema.Timestamp(Nanos, AdjustToUTC)
      - ZonedTimestamp → schema.Timestamp(Millis, AdjustToUTC)

  - Call applyKafkaConnectType from the same two sites
    applyAvroLogicalType is already called from — the bottom of
    ecsAvroFromAnyMap (primitive case) and the union case (after
    ecsAvroHydrateRawUnion). Both flat and Debezium-nested forms
    therefore get the annotation lifted into the parsed schema.Common.

Time and Timestamp are intentionally both mapped to schema.Timestamp
despite Time being semantically a time-of-day. The value side returns
time.Time for both (kafkaConnectTypeOpt's case "Timestamp", "Time"),
so the metadata side matches that shape to keep the value/metadata
contract symmetric. Operators who need a distinct TIME column can
override via the iceberg output's new_column_type_mapping.

Also closes the audit-found duration gap. Avro `duration` (fixed[12]
with logicalType: duration) is decoded by the value side to an ISO
8601 string under preserve_logical_types. The metadata side now maps
it to schema.String, matching that output and giving iceberg a
VARCHAR column instead of BINARY. Same gate as the rest of
applyAvroLogicalType: preserve_logical_types must be true.

Coverage:

  - TestEcsAvroKafkaConnectTypes: matrix of all 9 Debezium types
    under both translate-on (asserts the lift fires) and translate-off
    (asserts the parser stays agnostic — same as the value side
    under that mode).

  - TestEcsAvroDurationLogicalType: preserve-on / preserve-off
    matrix for the duration logical type.

Audit notes:

  Cross-referenced every CustomType registration in
  preserveLogicalTypeOpts() and kafkaConnectTypeOpt() against the
  metadata side. After this commit, every annotation that the value
  side translates has a matching metadata-side mapping. The only
  remaining theoretical gap — annotations the value side doesn't
  translate either (e.g. io.debezium.data.{Uuid, Json, Enum}) — is
  internally consistent (both sides surface the base primitive) and
  doesn't trigger the value/metadata-mismatch bug class.
…code

parquet_encode was the largest remaining hole in the schema.Common
consumer audit. A pipeline using the same shape that took GF down (CDC
source → schema_registry_decode with preserve_logical_types: true →
@Schema metadata → parquet sink) hit two distinct walls today:

  1. Type coverage. parquetNodeFromCommonField rejected Date, TimeOfDay,
     UUID, Map, Union, and Null with a hostile "unsupported by this
     processor" error.

  2. Temporal value coercion. encodingCoercionVisitor required
     RFC3339-formatted strings for any TIMESTAMP column, with the
     error message literally telling the operator to upstream-convert.
     time.Time values flowing from twmb/avro went unrecognised.

This commit ports the same approach the iceberg shredder uses (Phase
1b/1c, coerceTemporalToNumeric in iceberg/shredder/temporal.go) into
the parquet encoder:

  - parquetNodeFromCommonField now handles Date → parquet.Date(),
    TimeOfDay → parquet.Time(unit) with millis/micros/nanos
    interpretation from the Common's Logical.TimeOfDay.Unit, UUID →
    parquet.UUID(), and Map → parquet.Map(String(), value-node).

  - Null and Union remain errors because parquet has no faithful
    representation. The error messages now name the constraint and
    point at the upstream-coercion remedy, rather than producing the
    generic "not supported by this processor" wording.

  - encodingCoercionVisitor grows three new bridges:
      - coerceTimestampForEncode: accepts time.Time (scaled to the
        column's declared unit), numeric (pre-scaled passthrough), and
        the historical RFC3339-string path.
      - coerceDateForEncode: accepts time.Time (UTC-floored to days
        since epoch), date string (RFC3339 or bare YYYY-MM-DD), and
        numeric.
      - coerceTimeForEncode: accepts time.Duration, time.Time
        (wall-clock portion only), and numeric. Returns int32 for the
        millis unit, int64 otherwise — matching parquet's physical
        representation for TIME columns.

Coverage:

  - TestParquetNodeFromCommonField_NewTypes: every new variant
    (Date, TimeOfDay millis/micros, UUID, Map) plus the loud-error
    cases (Union, Null).

  - TestEncodingCoercionVisitor_Temporal: round-trip-style assertions
    for time.Time/time.Duration/numeric/string inputs into
    TIMESTAMP(millis), TIMESTAMP(micros), DATE, TIME(millis),
    TIME(micros). Includes the "unsupported type errors clearly"
    case so the diagnostic stays useful.

No existing tests required changes.
Running the common-schema-audit skill against the existing iceberg
output surfaced a gap I'd missed: commonTypeToIcebergType handled 16
of 18 schema.CommonType variants but fell through to the generic
"unsupported common schema type" error for Map and Union.

A pipeline whose @Schema declares a record field as `Map<string, V>`
or as a Union — both natural in Avro — would error at table-create /
column-add time with a message that doesn't tell the operator what to
do. parquet_encode and JSON Schema both got matching coverage in
earlier commits on this branch; iceberg was the actual originator of
the consumer pattern but had the same hole.

Map maps to iceberg.MapType<String, ValueType>, mirroring the Avro /
parquet conventions where map keys are always strings. The single
schema.Common child describes the value type; we recurse through
commonTypeToIcebergTypeRec to resolve it, allocating a fresh
KeyID/ValueID via the shared type inferrer to keep field IDs unique
across the table schema.

Union is left as an explicit loud error because iceberg has no native
union type — the same decision parquet_encode landed on. The message
names the constraint and points at the upstream remediation
("flatten to a single branch, typically by projecting to the
non-null variant") rather than the previous generic "unsupported".

The shredder already had MapType handling at internal/impl/iceberg/
shredder/shredder.go:262, and the writer's buildShredderFieldCommons
already descended into MapType to register value-leaf metadata at
internal/impl/iceberg/writer.go (the recordOrRecurseIcebergField
helper). So no value-side changes needed; the schema/value paths
were just waiting for the type_resolver to start producing MapType
columns.

Coverage:

  - TestCommonTypeToIcebergType_MapAndUnion pins the new behaviour:
    Map of String->Int64, Map of String->Timestamp (validates the
    value-type recursion handles logically-typed values), the wrong-
    child-arity error path, and the Union loud-error case asserting
    the error message includes the "flatten" remediation pointer.

No production behaviour change for pipelines that did not exercise
Map or Union in their schema metadata; the previous error path is
replaced by a successful conversion for Map and a more helpful error
for Union.
The smaller half of the schema.Common consumer-coverage audit:
common_to_json_schema.go rejected schema.Date, schema.TimeOfDay, and
schema.UUID with the generic "unsupported schema type" error, even
though JSON Schema draft 2019-09 has well-defined `format` keywords
for each.

Maps:
  - Date      → {"type":"string","format":"date"}
  - TimeOfDay → {"type":"string","format":"time"}
  - UUID      → {"type":"string","format":"uuid"}

Symmetric with the iceberg / parquet_encode fixes earlier on this
branch: every consumer of schema.Common now handles every variant
the type system defines, so a producer schema declaring any of these
three types can be encoded as JSON Schema without an upstream
projection step.

Coverage:

  - TestCommonToJSONSchemaDateTimeUUID asserts each format keyword
    end-to-end.

No production behaviour change for pipelines that did not exercise
these types; the previous error path is replaced by a successful
encoding.
The schema.Common metadata format flows between source-side producers
(schema_registry_decode, parquet_decode, CDC sources) and downstream
sinks (iceberg, parquet_encode, schema_registry_encode). Every consumer
of this metadata must (a) handle every variant of schema.CommonType and
(b) coerce values when the message body's Go type doesn't match the
schema-declared type — the same bug class that caused the GF iceberg
issue and its cascade across confluent / parquet / JSON Schema fixes
earlier on this branch.

A new consumer can re-introduce the gap without anyone noticing until
a customer pipeline breaks. The new audit skill catches the drift
mechanically:

  - Enumerates the schema.CommonType variant universe from the live
    benthos source, so new variants surface as needed-but-missing rows
    rather than silent omissions.

  - Finds every consumer via `schema.ParseFromAny` and direct type
    switches, filtering out the CDC schema producers (intentionally
    out of scope).

  - Per-consumer Explore-agent brief covers both the type-coverage
    matrix and the temporal-coercion bridges that distinguish a
    rigorous consumer (iceberg post-Phase-1b/1c) from a fragile one.

  - Produces a Markdown matrix plus a JSON variant for CI consumption.

  - References the prior fix commits on this branch as templates:
    bfcad32 (parquet type-coverage + coercion), e2b2c86
    (coerceTemporalToNumeric), 8120621 (metadata-aware scaling),
    8e80cb4 (JSON Schema format keywords).

Read-only skill — it produces the report so a human can prioritise
fixes, never commits changes itself.

Lives at .claude/skills/common-schema-audit/SKILL.md alongside the
existing review skill, with matching frontmatter shape
(disable-model-invocation: true; tool allowlist scoped to the
read-only subset Bash/Read/Glob/Grep/Task).

Also adds a "adding new consumers" section listing the three
contract obligations a new consumer must meet, with citations to
the reference implementations.
Comment on lines +156 to +161
case map[string]any:
inner, err := ecsAvroFromAnyMap(cfg, b)
if err != nil {
return schema.Common{}, false
}
return inner, true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor regression in error reporting: the err from ecsAvroFromAnyMap is dropped here, so the union hydrators downstream report "union ... child '...': could not resolve type map[string]interface {}" instead of the previous wrapped form "union ... child '...': decimal precision: not an integer: foo" (see the old map branch in ecsAvroHydrateRawUnion that previously called ecsAvroFromAnyMap directly with %w wrapping).

This violates the godev %w-wrapping pattern. Consider returning (Common, error) from ecsAvroResolveTypeRef and wrapping at the call site so malformed inline-record errors surface their root cause.

@claude
Copy link
Copy Markdown

claude Bot commented May 14, 2026

Commits
LGTM

Review
Focused, well-tested fix for CON-468. The names map threaded through ecsAvroConfig, the unified ecsAvroResolveOptionalUnion/ecsAvroResolveTypeRef helpers, and the parallel raw/lame routing all hang together cleanly. Aliasing of the registered schema.Common (and its Children slice) is correctly noted in the registration helper's comment and never mutated post-registration in the new code paths. Tests cover all four shapes called out in the PR description plus the lame-union path.

  1. Minor: ecsAvroResolveTypeRef drops the underlying err from ecsAvroFromAnyMap on malformed inline objects, replacing a previously-wrapped error chain with a generic "could not resolve type %T" message — see inline comment for details.

Jeffail added 5 commits May 14, 2026 17:21
…se attempts

Mirrors the iceberg shredder defense for the same bug class: the
float64 arms of coerceTimestampForEncode / coerceDateForEncode /
coerceTimeForEncode now reject NaN and +/-Inf rather than int-casting
them to implementation-defined garbage that would silently land as
year 1970 (or worse) in the column.

coerceDateForEncode string-parse error path now reports both the
RFC3339 and YYYY-MM-DD attempts instead of only the RFC3339 one. A
malformed bare date like "2024-13-99" was previously surfaced with an
RFC3339-only error that misleadingly suggested adding a time
component.

Adds TestEncodingCoercionVisitor_RejectsNaNInf (parallel to the
iceberg shredder TestTemporalRejectsNaNInf) and
TestCoerceDateForEncode_StringErrorSurfacesBothAttempts.
When applyAvroLogicalType promotes a `fixed`-backed Avro decimal to
schema.Decimal, the fact that the source was fixed[N] rather than
bytes is discarded — schema.Common has no field to retain it. This is
lossless for every downstream consumer of schema.Common (iceberg,
parquet, JSON Schema, value-side normaliseAvroDecimals), all of which
pick their own physical encoding from precision/scale. The one place
it leaks is common_to_avro.go re-emit, which will always produce a
bytes-backed decimal even when the source schema was fixed-backed:
semantically equivalent but a different Avro schema shape, which can
matter for Schema Registry compatibility checks that compare by
equality.

Comment-only; the trade-off was already implicit but unwritten, so
future readers tracing the path do not have to rederive it.
The default check is an O(snapshot) scan of every manifest entry, which
dominated the commit hot path on busy tables and produced a multiplicative
slowdown over time (T6692). Each parquet file path already carries a fresh
uuid, so the check is redundant for this writer.
Avro JSON schemas may reference a previously-defined record/enum/fixed by
name rather than inlining the full definition — the Java/JDBC idiom for
any record reused across more than one field, e.g.

    {"name": "secondary_fee", "type": ["null", "Fee"]}

where "Fee" was defined inline by an earlier field. The metadata parser
in ecsAvroParseFromBytes was treating the string branch as an unknown
type and falling through to schema.Any, so the resulting common-schema
metadata reported the field as VARCHAR rather than the registered record
structure. Downstream sinks (notably iceberg) then created a string
column where the customer expected a nested struct.

Thread a names map through ecsAvroConfig, register every record/enum/
fixed by both its simple name and its fully-qualified namespace.name
form, and have the string-form type resolver consult the map before
falling back to the primitive-name lookup. Also generalise the optional-
union helper to accept either ordering -- [null, X] and [X, null] -- since
the Avro spec doesn't constrain branch order.

The lexical-scope assumption -- a name must be defined before it is
referenced -- is the Avro spec's, so a single forward-only pass suffices.
Self-referential records remain unsupported and would need pre-
registration with a placeholder; flagged in the registration helper's
comment.

Tests cover the four shapes CON-468's acceptance criteria call out:
nullable inline record (already green via #4380), nullable record by
name reference, both branch orderings, fully-qualified vs short-name
references, and record-with-nested-record where the inner level is
itself a name-reference union.
The lame-union path -- raw_unions: false on schema_registry_decode, which
is the documented default -- carried the same bug as the raw path: string
branches like "Fee" in ["null", "Fee"] went through ecsAvroTypeToCommon
directly and collapsed to schema.Any, even when "Fee" was a previously-
defined record. The tagged-JSON envelope around each branch then wrapped
an Any inner, producing a structureless metadata tree.

Reroute the lame hydrator through the same ecsAvroResolveTypeRef helper
the raw path now uses, then re-apply the lame-specific wrapping
(tagged-Object envelope, type-name preserved as Common.Name to match the
wire-form tag). The non-Avro behavior of the lame envelope is unchanged;
only the inner Common is now correctly populated for name references.

This closes the same CON-468 bug class for the default-config path that
commit 4531c51 closed for the raw-unions path.
@Jeffail Jeffail force-pushed the fix/avro-nullable-record-name-resolution branch from 10000e8 to bbd78f9 Compare May 14, 2026 17:13
}
t = t2
}
return int32(t.UTC().Unix() / 86400), nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RFC3339 string path computes t.UTC().Unix() / 86400 without the floor-vs-truncate adjustment that the time.Time arm explicitly added at line 175-179. For a pre-epoch RFC3339 string with a fractional-day time component (e.g. "1969-12-31T12:00:00Z"), Unix() is -43200 and Go's integer division truncates toward zero to 0 (Jan 1 1970) instead of the correct -1 (Dec 31 1969). The bare-date "2006-01-02" shape lands exactly on midnight so it's unaffected, but RFC3339 inputs land here. Apply the same secs < 0 && secs%86400 != 0 adjustment used in the time.Time arm.

@claude
Copy link
Copy Markdown

claude Bot commented May 14, 2026

Commits
LGTM

Review
Large multi-commit PR closing the schema-metadata / value-vs-metadata mismatch class across confluent, iceberg, and parquet. Test coverage is thorough and well-targeted. One narrow but real bug in the new parquet DATE string-coercion path.

  1. internal/impl/parquet/schema_coercion.go#L194 — RFC3339 string path for coerceDateForEncode misses the floor-vs-truncate adjustment that the sibling time.Time arm applies. Pre-epoch RFC3339 inputs with a fractional-day component land off-by-one (Go integer division truncates toward zero for negative Unix() values). Bare YYYY-MM-DD strings are unaffected since they land exactly on midnight.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants