Skip to content

feat(cdk): Add RecordExpander component for nested array extraction#859

Open
devin-ai-integration[bot] wants to merge 16 commits intomainfrom
devin/1764690419-dpath-extractor-expansion
Open

feat(cdk): Add RecordExpander component for nested array extraction#859
devin-ai-integration[bot] wants to merge 16 commits intomainfrom
devin/1764690419-dpath-extractor-expansion

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Dec 2, 2025

feat(cdk): Add RecordExpander component for nested array extraction

Summary

This PR adds a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This is needed to fix the Stripe invoice_line_items and subscription_items stream issues where the events endpoint returns parent objects with nested arrays, but we need to emit each child item as a separate record.

Key changes:

  • New RecordExpander class in airbyte_cdk/sources/declarative/expanders/
  • Integration with DpathExtractor via optional record_expander parameter
  • Support for wildcard paths (e.g., ["sections", "*", "items"])
  • Optional remain_original_record flag to embed parent record context
  • New on_no_records parameter: "skip" (default) or "emit_parent" behavior
  • New parent_fields_to_copy parameter with ParentFieldMapping class to copy specific parent fields to child records
  • Schema updates and auto-generated models
  • 40 unit tests covering all expansion scenarios

Example usage:

extractor:
  type: DpathExtractor
  field_path: ["data", "object"]
  record_expander:
    type: RecordExpander
    expand_records_from_field: ["items", "data"]
    on_no_records: skip
    parent_fields_to_copy:
      - type: ParentFieldMapping
        source_field_path: ["created"]
        target_field: "subscription_updated"

Updates since last revision

  • Added on_no_records parameter with "skip" (default) and "emit_parent" options for configurable behavior when expansion yields no records
  • Added parent_fields_to_copy parameter with ParentFieldMapping class to selectively copy parent fields to child records (instead of embedding entire parent)
  • Added 11 new unit tests for the new features (40 total tests)
  • Fixed type error in copy_field method

Review & Testing Checklist for Human

This is a YELLOW risk PR (medium confidence). Please verify:

  • on_no_records behavior: Verify the emit_parent option correctly emits the parent record when expansion path is missing, empty, or non-array. The logic at line 140-141 of record_expander.py handles this.

  • parent_fields_to_copy field copying: The copy_field method uses dict(parent_record) to create a shallow copy for dpath compatibility. Verify this doesn't cause issues with nested mutable objects.

  • End-to-end testing: This PR only includes unit tests. The real-world behavior needs to be verified with the Stripe connector in the companion PR (fix(source-stripe): Fix invoice_line_items and subscription_items incremental streams (do not merge) airbyte#70294).

Recommended test plan:

  1. Run CDK tests: poetry run pytest unit_tests/sources/declarative/extractors/test_dpath_extractor.py -v
  2. Test with Stripe connector (separate PR) to verify end-to-end behavior for both subscription_items and invoice_line_items streams
  3. Try edge cases: empty arrays, missing paths, combination of remain_original_record + parent_fields_to_copy

Notes

…thExtractor

- Add optional expand_records_from_field parameter to extract items from nested arrays
- Add optional remain_original_record parameter to preserve parent record context
- Implement _expand_record method to handle array expansion logic
- Add comprehensive unit tests covering all edge cases
- Maintain backward compatibility with existing functionality

Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Contributor Author

Original prompt from API User
Comment from @DanyloGL: /ai-triage\n\nIMPORTANT: The user will expect a response posted back to the PR. You should post exactly one comment back to the respective issue PR. If the user requested a code change or PR, your comment should contain a link to the PR. Assume the user has no access to your session or conversation thread unless/until you respond back to them.\n\nIssue #8683 by @jnr0790: Python L3: Stripe - Missing data in `invoice_line_items` stream\n\nIssue URL: https://github.com/airbytehq/oncall/issues/8683\n\nPlease use playbook macro: !issue_triage

PLAYBOOK_md:
# `/ai-triage` Slash Command Playbook

You are AI Triage Devin, an expert at analyzing Airbyte-related issues and providing actionable insights. You are responding to a GitHub slash command request. After reading the provided context, you should post a comment to confirm you understand the request and stating what your next steps will be, along with a link to your session. Once your triage and analysis is complete, update your comment with the full results of your triage. Collapse all of your comments under expandable sections.

IMPORTANT: Expect that your user has no access to the session and cannot talk with you directly. Do not wait for feedback or confirmation on any action.

## Context

You are analyzing the issue provided to you above. You will need to pull comment history on this issue to ensure you have full context.

## Your Task: Static Analysis and Triage

1. **Issue Analysis and Confirmation**: Read the complete issue content including all comments for full context.
   - **Post an initial comment immediately** (within 1-2 minutes) to confirm you understand the assignment and that you are looking into it. Include your session URL.
   - If you are missing any critical information or context (e.g., workspace UUID, connector version, error logs, reproduction steps, customer environment details), include in your initial comment a request for additional context. (Do not block waiting for a... (9078 chars truncated...)

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764690419-dpath-extractor-expansion#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764690419-dpath-extractor-expansion

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@github-actions github-actions bot added the enhancement New feature or request label Dec 2, 2025
@github-actions
Copy link

github-actions bot commented Dec 2, 2025

PyTest Results (Fast)

3 879 tests  +66   3 867 ✅ +66   6m 35s ⏱️ +4s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit f6cf99c. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

PyTest Results (Full)

3 882 tests  +66   3 870 ✅ +66   11m 0s ⏱️ +4s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit f6cf99c. ± Comparison against base commit 80b7668.

♻️ This comment has been updated with latest results.

- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/
- Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander
- Update DpathExtractor to accept optional record_expander attribute
- Register RecordExpander in manifest component transformer
- Update unit tests to use new RecordExpander class structure
- All 24 tests passing, MyPy and Ruff checks passing

This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component.

Co-Authored-By: unknown <>
- Add RecordExpander definition to declarative_component_schema.yaml
- Add record_expander property to DpathExtractor schema
- Update create_dpath_extractor in model_to_component_factory.py to handle record_expander
- Auto-generate models from schema using poetry run poe build
- All 24 tests passing

This completes the schema registration for RecordExpander component, enabling
YAML manifests to properly instantiate RecordExpander when used with DpathExtractor.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add expand_records_from_field and remain_original_record to DpathExtractor (do not merge) feat: Add RecordExpander component for nested array extraction Dec 2, 2025
Apply cleaner logic using 'yield from' consistently:
- When extracted is a list without record_expander, use 'yield from extracted'
- Check 'if not self.record_expander' instead of nested if/else
- Remove unnecessary 'yield from []' for empty case

All 24 tests passing. Suggested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
- Add back 'else: yield from []' in DpathExtractor for explicit empty case
- Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list
- Update unit tests to expect no records instead of original record when expansion fails

This makes RecordExpander stricter: it only emits records when successfully expanding a list.
For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects.

All 24 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
Changes:
1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs)
2. Add wildcard (*) support to RecordExpander for matching multiple arrays
3. Update docstring and schema to document wildcard support
4. Add 5 new unit tests for wildcard expansion scenarios
5. Regenerate models from updated schema

When wildcards are used, RecordExpander:
- Uses dpath.values() to find all matches
- Filters for list-valued matches only
- Expands items from all matched lists
- Returns nothing if no list matches found

All 29 tests passing. Requested by @DanyloGL.

Co-Authored-By: unknown <>
MyPy was complaining that dpath.values() and dpath.get() return 'object' type.
Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get()
to satisfy MyPy type checking while maintaining runtime behavior.

All 29 tests passing. MyPy check now passes.

Co-Authored-By: unknown <>
Unified the wildcard and non-wildcard branches by collecting all arrays
to process into a single list, then using one common loop for expansion.
This eliminates the duplicated item iteration and record expansion logic.

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: Add RecordExpander component for nested array extraction feat(cdk): Add RecordExpander component for nested array extraction Dec 2, 2025
Changes per Daryna's feedback:
1. Removed isinstance(m, list) filter - now checking inside loop
2. Renamed 'matches' to 'extracted'
3. Removed type casts - using 'extracted: Any' instead
4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original
5. Removed 'if not nested_array:' check (redundant with for loop)

All 29 tests passing. MyPy check passes.

Co-Authored-By: unknown <>
- Add on_no_records parameter with 'skip' (default) and 'emit_parent' options
- Add parent_fields_to_copy parameter to copy specific parent fields to child records
- Add ParentFieldMapping class to define source/target field mappings
- Update schema YAML with new properties and ParentFieldMapping definition
- Regenerate models from schema
- Add comprehensive unit tests for new features

Co-Authored-By: unknown <>
try:
value = dpath.get(dict(parent_record), source_path)
child_record[self.target_field] = value
except KeyError:
@sophiecuiy sophiecuiy marked this pull request as ready for review February 4, 2026 20:51
Copilot AI review requested due to automatic review settings February 4, 2026 20:51
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This addresses the Stripe connector's need to handle invoice_line_items and subscription_items streams where parent objects contain nested arrays that need to be flattened.

Changes:

  • Added RecordExpander class with support for wildcard paths, optional parent record embedding, configurable empty-array behavior, and selective parent field copying
  • Integrated RecordExpander with DpathExtractor via optional record_expander parameter
  • Updated schema definitions and component factory to support the new component
  • Added 40 comprehensive unit tests covering expansion scenarios, wildcard paths, and edge cases

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
airbyte_cdk/sources/declarative/expanders/record_expander.py New core implementation of RecordExpander and ParentFieldMapping classes
airbyte_cdk/sources/declarative/expanders/__init__.py Module initialization exporting new expander components
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py Integration of RecordExpander into DpathExtractor with optional expansion logic
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated Pydantic models for new components and parameters
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Schema definitions for RecordExpander and ParentFieldMapping
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Factory method updates to instantiate RecordExpander from models
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py Component transformer mapping for DpathExtractor.record_expander
unit_tests/sources/declarative/extractors/test_dpath_extractor.py 40 new unit tests for expansion, on_no_records, parent_fields_to_copy, and combined features

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

class ParentFieldMapping:
"""Defines a mapping from a parent record field to a child record field."""

source_field_path: list[str | InterpolatedString]
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.
config: The user-provided configuration as specified by the source's spec.
"""

expand_records_from_field: list[str | InterpolatedString]
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: str = "skip"
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] | None = [
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Optional[List[InterpolatedString]] from the typing module instead of list[InterpolatedString] | None for compatibility with Python versions prior to 3.10. The codebase uses Optional from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.
@sophiecuiy
Copy link

sophiecuiy commented Feb 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21726348626

@devin-ai-integration
Copy link
Contributor Author

❌ Cannot revive Devin session - the session is too old. Please start a new session instead.

@agarctfi
Copy link
Contributor

agarctfi commented Feb 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@agarctfi
Copy link
Contributor

agarctfi commented Feb 6, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

…ecordExpander

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants