Skip to content

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889

Open
darynaishchenko wants to merge 8 commits intomainfrom
daryna/fix-substream-partition-router
Open

fix(cdk): fix SubstreamPartitionRouter updating cursor value when no records were read in partition#889
darynaishchenko wants to merge 8 commits intomainfrom
daryna/fix-substream-partition-router

Conversation

@darynaishchenko
Copy link
Contributor

@darynaishchenko darynaishchenko commented Feb 2, 2026

Summary

This PR fixes an issue where SubstreamPartitionRouter would incorrectly update cursor values when no records were read in a partition. The fix adds defensive null-checking throughout stream_slices() to gracefully handle:

  1. Empty partition generators - iterate_with_last_flag now yields (None, True) sentinel when the input generator is empty, and stream_slices() breaks out of the partition loop when partition is None (preserving multi-parent behavior)
  2. Empty record iterators - When a partition has no records, cursor observation and slice emission are skipped (guarded by if parent_record is not None)

Updates since last revision

  • Changed if partition is None: return to if partition is None: break per review feedback. Using break ensures that if one parent stream has no partitions, we continue processing subsequent parent_stream_configs rather than exiting the entire method.
  • Added test_substream_partition_router_closes_all_partitions_even_when_no_records to verify that cursor.close_partition() is called for ALL partitions, including those with no records. This addresses the review feedback requesting validation of partition lifecycle management.
  • Fixed MyPy type error: Updated iterate_with_last_flag return type from tuple[T, bool] to tuple[T | None, bool] to properly reflect that the function yields (None, True) as a sentinel for empty generators.

Review & Testing Checklist for Human

  • ⚠️ INVESTIGATE TEST FAILURE: test_substream_slicer_parent_state_update_with_cursor is failing on this branch but passes on main. The test expects lookback_window: 0 but gets lookback_window: 1, and there's an extra state key in the output. This appears to be a pre-existing issue in the PR's original changes that needs investigation.
  • Verify multi-parent behavior: The if partition is None: break exits only the current parent's partition loop. Test with a connector that has multiple parent_stream_configs where one parent has no partitions to confirm subsequent parents are still processed. (Note: current tests only cover single-parent scenarios)
  • Test with real connector: Test with a connector using SubstreamPartitionRouter where a parent stream returns empty partitions or partitions with no records
  • Run full test suite: poetry run pytest unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py -v

Notes

Summary by CodeRabbit

  • New Features

    • Added an iterator utility that flags the final element and emits a None marker when the source is exhausted.
  • Bug Fixes

    • Prevented emitting slices for empty/None partitions and added early exit when parent partitions yield no records.
    • Guarded partition extraction and cursor/state updates so they run only when a valid parent record exists.
    • Ensured partition cursors are closed correctly even when partitions have no records.
  • Tests

    • Added unit tests covering the new iterator and edge-case partition behaviors.

@darynaishchenko darynaishchenko self-assigned this Feb 2, 2026
@github-actions
Copy link

github-actions bot commented Feb 2, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@daryna/fix-substream-partition-router#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch daryna/fix-substream-partition-router

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Changed SubstreamPartitionRouter to add iterate_with_last_flag (now yields (T | None, bool)), handle exhausted generators by yielding (None, True), and guard all parent_record-dependent processing (cursor observation, partition tracking, partition_value/extra_fields/lazy pointer extraction, and slice emission) behind non-None checks; outer loop now breaks early on None partitions.

Changes

Cohort / File(s) Summary
Substream partition router
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
Added iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T | None, bool]]; when input is exhausted it yields (None, True); guarded processing that depends on parent_record (cursor/state observation, partition association, partition_value extraction via dpath, extra_fields, lazy_read_pointer), preserved KeyError handling inside guarded path, emit StreamSlice only for non-None parent_record, and break outer loop early if a partition is None.
Unit tests for edge cases
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Added tests importing iterate_with_last_flag; new tests cover (item, is_last) semantics for empty/single/multiple inputs and SubstreamPartitionRouter behaviors: no slice/no cursor update for partitions with no records, handling empty parent partitions (early exit), and ensuring close_partition is called for all partitions even when no records are emitted.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like me to generate a small focused patch-level checklist for reviewers (key lines/behaviors to verify), wdyt?

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 71.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main fix: preventing cursor updates when no records are read in a partition, which aligns with the core behavioral changes in SubstreamPartitionRouter.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch daryna/fix-substream-partition-router

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (2)

1101-1149: Test name promises cursor behavior verification but only checks slice output.

The test asserts that no slices are yielded for the empty partition, which is valuable. However, neither cursor.observe nor cursor.close_partition are actually verified here since MockStream uses a real FinalStateCursor (not a mock). The "no cursor update" claim in the name/docstring is not directly validated. Would it be worth either renaming to something like test_substream_partition_router_no_slices_for_empty_partition or adding a mock cursor to assert observe was not called for the empty partition, wdyt?


1237-1242: Consider also asserting ensure_at_least_one_state_emitted and observe call counts?

The close_partition assertions are solid. Since you have a mock cursor here, it'd be easy to also verify:

  • mock_cursor.observe.call_count == 2 (only called for partitions with records)
  • mock_cursor.ensure_at_least_one_state_emitted.call_count == 1 (called after the last partition)

This would make the test a more complete contract for the partition lifecycle, wdyt?


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)

42-49: ⚠️ Potential issue | 🔴 Critical

Type annotation mismatch causing pipeline failure.

The pipeline is flagging a type error here. Yielding (None, True) produces tuple[None, bool], but the return type expects tuple[T, bool]. Since the sentinel introduces None as a possible value, would updating the return type to Optional[T] work for you, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 210-211: In stream_slices() in substream_partition_router.py the
check "if partition is None: return" prematurely exits the entire method when a
parent_stream.generate_partitions() yields the (None, True) sentinel; change
this to "break" (or otherwise exit the inner partition loop) so we stop
iterating that parent stream's partitions and continue to the next
parent_stream_config instead of returning from stream_slices(), leaving the rest
of the parent streams to be processed; locate the partition handling inside the
loop that consumes parent_stream.generate_partitions() and replace the return
with a break (or equivalent loop-control) so behavior matches the original
semantics.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Fast)

3 864 tests  +9   3 852 ✅ +9   6m 33s ⏱️ +6s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit d6c9701. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

PyTest Results (Full)

3 867 tests  +9   3 855 ✅ +9   10m 57s ⏱️ -4s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit d6c9701. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@darynaishchenko
Copy link
Contributor Author

darynaishchenko commented Feb 2, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/21601466980

devin-ai-integration bot and others added 2 commits February 3, 2026 15:38
…titionRouter null handling

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
…ulti-parent behavior

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)

42-49: ⚠️ Potential issue | 🟠 Major

Fix mypy type mismatch in iterate_with_last_flag.
Line 48 yields (None, True), but the signature advertises tuple[T, bool], which is tripping mypy. Could we widen the return type to Optional[T] to match the sentinel, wdyt?

🔧 Proposed fix
-def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
+def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:

186-187: ⚠️ Potential issue | 🟡 Minor

Update docstring to match empty-slice behavior.
Lines 186–187 still say an empty parent slice emits parent_record=None, but the code now skips emission. Could we adjust the docstring to reflect the new behavior, wdyt?

📝 Suggested doc tweak
-        If a parent slice contains no record, emit a slice with parent_record=None.
+        If a parent slice contains no record, emit no slice.

assert result == expected_output


def test_substream_partition_router_no_cursor_update_when_partition_has_no_records():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a test that validates that all parent stream partitions are properly closed, even if no parent records are produced for a given parent partition.

devin-ai-integration bot and others added 3 commits February 5, 2026 10:36
… when no records

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
… empty generators

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`:
- Around line 231-267: The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

Comment on lines +231 to +267
try:
partition_value = dpath.get(
record_data, # type: ignore [arg-type]
parent_field,
)
except KeyError:
# FIXME a log here would go a long way for debugging
continue

# Add extra fields
extracted_extra_fields = self._extract_extra_fields(
record_data, extra_fields
)

if parent_stream_config.lazy_read_pointer:
extracted_extra_fields = {
"child_response": self._extract_child_response(
record_data,
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
),
**extracted_extra_fields,
}

if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)
if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()

yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
if parent_record is not None:
yield StreamSlice(
partition={
partition_field: partition_value,
"parent_slice": parent_partition or {},
},
cursor_slice={},
extra_fields=extracted_extra_fields,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid skipping partition closure when partition-value extraction fails.

The early continue in the extraction-failure path skips close_partition() / ensure_at_least_one_state_emitted() when that record is last, leaving the cursor partition open. Could we switch to a skip_slice flag so closure always runs and only slice emission is skipped, wdyt?

🔧 Possible fix
                     for parent_record, is_last_record_in_slice in iterate_with_last_flag(
                         partition.read()
                     ):
-                        if parent_record is not None:
+                        skip_slice = True
+                        if parent_record is not None:
                             # In the previous CDK implementation, state management was done internally by the stream.
                             # However, this could cause issues when doing availability check for example as the availability
                             # check would progress the state so state management was moved outside of the read method.
                             # Hence, we need to call the cursor here.
                             # Note that we call observe and close_partition before emitting the associated record as the
                             # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
                             # record was consumed.
                             parent_stream.cursor.observe(parent_record)
                             parent_partition = (
                                 parent_record.associated_slice.partition
                                 if parent_record.associated_slice
                                 else {}
                             )
                             record_data = parent_record.data
 
                             try:
                                 partition_value = dpath.get(
                                     record_data,  # type: ignore [arg-type]
                                     parent_field,
                                 )
                             except KeyError:
                                 # FIXME a log here would go a long way for debugging
-                                continue
-
-                            # Add extra fields
-                            extracted_extra_fields = self._extract_extra_fields(
-                                record_data, extra_fields
-                            )
-
-                            if parent_stream_config.lazy_read_pointer:
-                                extracted_extra_fields = {
-                                    "child_response": self._extract_child_response(
-                                        record_data,
-                                        parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
-                                    ),
-                                    **extracted_extra_fields,
-                                }
+                                skip_slice = True
+                            else:
+                                skip_slice = False
+
+                                # Add extra fields
+                                extracted_extra_fields = self._extract_extra_fields(
+                                    record_data, extra_fields
+                                )
+
+                                if parent_stream_config.lazy_read_pointer:
+                                    extracted_extra_fields = {
+                                        "child_response": self._extract_child_response(
+                                            record_data,
+                                            parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
+                                        ),
+                                        **extracted_extra_fields,
+                                    }
 
                         if is_last_record_in_slice:
                             parent_stream.cursor.close_partition(partition)
                             if is_last_slice:
                                 parent_stream.cursor.ensure_at_least_one_state_emitted()
 
-                        if parent_record is not None:
+                        if not skip_slice:
                             yield StreamSlice(
                                 partition={
                                     partition_field: partition_value,
                                     "parent_slice": parent_partition or {},
                                 },
                                 cursor_slice={},
                                 extra_fields=extracted_extra_fields,
                             )
🤖 Prompt for AI Agents
In
`@airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py`
around lines 231 - 267, The try/except around dpath.get in
substream_partition_router should not use an early `continue` because that skips
the later partition closure logic; instead, introduce a boolean like `skip_slice
= True` when KeyError occurs (and only set `skip_slice = False` when extraction
succeeds), keep `extracted_extra_fields` in a safe default state, and let
execution fall through so the existing `if is_last_record_in_slice:
parent_stream.cursor.close_partition(partition)` / `if is_last_slice:
parent_stream.cursor.ensure_at_least_one_state_emitted()` blocks always run;
finally, guard the `yield StreamSlice(...)` (the emission that uses
`parent_record`, `partition_value`, `extracted_extra_fields`,
`parent_partition`, etc.) so it only occurs when `skip_slice` is False. This
ensures partition closure always executes while still skipping emission for
failed extractions.

…_update_with_cursor

Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants