feat(cdk): Add RecordExpander component for nested array extraction#859
feat(cdk): Add RecordExpander component for nested array extraction#859devin-ai-integration[bot] wants to merge 16 commits intomainfrom
Conversation
…thExtractor - Add optional expand_records_from_field parameter to extract items from nested arrays - Add optional remain_original_record parameter to preserve parent record context - Implement _expand_record method to handle array expansion logic - Add comprehensive unit tests covering all edge cases - Maintain backward compatibility with existing functionality Co-Authored-By: unknown <>
Original prompt from API User |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764690419-dpath-extractor-expansion#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764690419-dpath-extractor-expansionHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/ - Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander - Update DpathExtractor to accept optional record_expander attribute - Register RecordExpander in manifest component transformer - Update unit tests to use new RecordExpander class structure - All 24 tests passing, MyPy and Ruff checks passing This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component. Co-Authored-By: unknown <>
- Add RecordExpander definition to declarative_component_schema.yaml - Add record_expander property to DpathExtractor schema - Update create_dpath_extractor in model_to_component_factory.py to handle record_expander - Auto-generate models from schema using poetry run poe build - All 24 tests passing This completes the schema registration for RecordExpander component, enabling YAML manifests to properly instantiate RecordExpander when used with DpathExtractor. Co-Authored-By: unknown <>
Apply cleaner logic using 'yield from' consistently: - When extracted is a list without record_expander, use 'yield from extracted' - Check 'if not self.record_expander' instead of nested if/else - Remove unnecessary 'yield from []' for empty case All 24 tests passing. Suggested by @DanyloGL. Co-Authored-By: unknown <>
Changes: - Add back 'else: yield from []' in DpathExtractor for explicit empty case - Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list - Update unit tests to expect no records instead of original record when expansion fails This makes RecordExpander stricter: it only emits records when successfully expanding a list. For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects. All 24 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <>
Changes: 1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs) 2. Add wildcard (*) support to RecordExpander for matching multiple arrays 3. Update docstring and schema to document wildcard support 4. Add 5 new unit tests for wildcard expansion scenarios 5. Regenerate models from updated schema When wildcards are used, RecordExpander: - Uses dpath.values() to find all matches - Filters for list-valued matches only - Expands items from all matched lists - Returns nothing if no list matches found All 29 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <>
MyPy was complaining that dpath.values() and dpath.get() return 'object' type. Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get() to satisfy MyPy type checking while maintaining runtime behavior. All 29 tests passing. MyPy check now passes. Co-Authored-By: unknown <>
Unified the wildcard and non-wildcard branches by collecting all arrays to process into a single list, then using one common loop for expansion. This eliminates the duplicated item iteration and record expansion logic. All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <>
Changes per Daryna's feedback: 1. Removed isinstance(m, list) filter - now checking inside loop 2. Renamed 'matches' to 'extracted' 3. Removed type casts - using 'extracted: Any' instead 4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original 5. Removed 'if not nested_array:' check (redundant with for loop) All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <>
- Add on_no_records parameter with 'skip' (default) and 'emit_parent' options - Add parent_fields_to_copy parameter to copy specific parent fields to child records - Add ParentFieldMapping class to define source/target field mappings - Update schema YAML with new properties and ParentFieldMapping definition - Regenerate models from schema - Add comprehensive unit tests for new features Co-Authored-By: unknown <>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This addresses the Stripe connector's need to handle invoice_line_items and subscription_items streams where parent objects contain nested arrays that need to be flattened.
Changes:
- Added
RecordExpanderclass with support for wildcard paths, optional parent record embedding, configurable empty-array behavior, and selective parent field copying - Integrated
RecordExpanderwithDpathExtractorvia optionalrecord_expanderparameter - Updated schema definitions and component factory to support the new component
- Added 40 comprehensive unit tests covering expansion scenarios, wildcard paths, and edge cases
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/expanders/record_expander.py |
New core implementation of RecordExpander and ParentFieldMapping classes |
airbyte_cdk/sources/declarative/expanders/__init__.py |
Module initialization exporting new expander components |
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py |
Integration of RecordExpander into DpathExtractor with optional expansion logic |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Auto-generated Pydantic models for new components and parameters |
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Schema definitions for RecordExpander and ParentFieldMapping |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Factory method updates to instantiate RecordExpander from models |
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py |
Component transformer mapping for DpathExtractor.record_expander |
unit_tests/sources/declarative/extractors/test_dpath_extractor.py |
40 new unit tests for expansion, on_no_records, parent_fields_to_copy, and combined features |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class ParentFieldMapping: | ||
| """Defines a mapping from a parent record field to a child record field.""" | ||
|
|
||
| source_field_path: list[str | InterpolatedString] |
There was a problem hiding this comment.
Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).
| config: The user-provided configuration as specified by the source's spec. | ||
| """ | ||
|
|
||
| expand_records_from_field: list[str | InterpolatedString] |
There was a problem hiding this comment.
Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).
| parameters: InitVar[Mapping[str, Any]] | ||
| remain_original_record: bool = False | ||
| on_no_records: str = "skip" | ||
| parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) |
There was a problem hiding this comment.
Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).
| parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) | ||
|
|
||
| def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
| self._expand_path: list[InterpolatedString] | None = [ |
There was a problem hiding this comment.
Use Optional[List[InterpolatedString]] from the typing module instead of list[InterpolatedString] | None for compatibility with Python versions prior to 3.10. The codebase uses Optional from typing elsewhere (e.g., in dpath_extractor.py).
|
/prerelease
|
|
❌ Cannot revive Devin session - the session is too old. Please start a new session instead. |
|
/autofix
|
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Dismissed
Show dismissed
Hide dismissed
|
/autofix
|
…ecordExpander Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
feat(cdk): Add RecordExpander component for nested array extraction
Summary
This PR adds a new
RecordExpandercomponent to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This is needed to fix the Stripeinvoice_line_itemsandsubscription_itemsstream issues where the events endpoint returns parent objects with nested arrays, but we need to emit each child item as a separate record.Key changes:
RecordExpanderclass inairbyte_cdk/sources/declarative/expanders/DpathExtractorvia optionalrecord_expanderparameter["sections", "*", "items"])remain_original_recordflag to embed parent record contexton_no_recordsparameter:"skip"(default) or"emit_parent"behaviorparent_fields_to_copyparameter withParentFieldMappingclass to copy specific parent fields to child recordsExample usage:
Updates since last revision
on_no_recordsparameter with"skip"(default) and"emit_parent"options for configurable behavior when expansion yields no recordsparent_fields_to_copyparameter withParentFieldMappingclass to selectively copy parent fields to child records (instead of embedding entire parent)copy_fieldmethodReview & Testing Checklist for Human
This is a YELLOW risk PR (medium confidence). Please verify:
on_no_recordsbehavior: Verify theemit_parentoption correctly emits the parent record when expansion path is missing, empty, or non-array. The logic at line 140-141 ofrecord_expander.pyhandles this.parent_fields_to_copyfield copying: Thecopy_fieldmethod usesdict(parent_record)to create a shallow copy for dpath compatibility. Verify this doesn't cause issues with nested mutable objects.End-to-end testing: This PR only includes unit tests. The real-world behavior needs to be verified with the Stripe connector in the companion PR (fix(source-stripe): Fix invoice_line_items and subscription_items incremental streams (do not merge) airbyte#70294).
Recommended test plan:
poetry run pytest unit_tests/sources/declarative/extractors/test_dpath_extractor.py -vsubscription_itemsandinvoice_line_itemsstreamsremain_original_record+parent_fields_to_copyNotes