Skip to content

[PLUGIN-1957] Validate PK Chunking for incremental loads#354

Open
harishhk107 wants to merge 1 commit into
data-integrations:developfrom
cloudsufi:feature/prevent-empty-chunks-pk-chunking
Open

[PLUGIN-1957] Validate PK Chunking for incremental loads#354
harishhk107 wants to merge 1 commit into
data-integrations:developfrom
cloudsufi:feature/prevent-empty-chunks-pk-chunking

Conversation

@harishhk107
Copy link
Copy Markdown
Contributor

@harishhk107 harishhk107 commented Apr 20, 2026

PLUGIN-1957 Autodetect PK Chunking for incremental loads

What

Adds a record count check before enabling PK chunking in SalesforceBatchSource.getSplits().
If the record count is below PK_CHUNK_RECORD_COUNT_THRESHOLD (100,000), PK chunking is skipped
even if enabled in config, to avoid unnecessary overhead on small datasets. Just for DTS

Why

PK chunking is designed for very large datasets. Enabling it on small datasets causes empty
chunk overhead and increased pipeline execution time without any benefit. This change ensures
chunking is only applied when it is operationally justified by the actual record count.

Changes

SalesforceBatchSource.java —> fixed getSplits() to call hasRequiredCountForPkChunking() only when
config.getEnablePKChunk() && pkChunkCountCheck is true, added pkChunkCountCheck parameter
SalesforceSplitUtil.java —> added hasRequiredCountForPkChunking() which runs a COUNT() query
to check record count against the threshold before enabling chunking
SalesforceSourceConstants.java — added PK_CHUNK_RECORD_COUNT_THRESHOLD = 1_00_000

Note on SOQL Specific Behavior

In Salesforce SOQL, LIMIT actually stops the counting process once the limit is reached. This makes it extremely efficient for threshold checks on massive tables without causing server timeouts.

Manual Testing

Verified pipeline runs correctly with enablePKChunk=true and small dataset → chunking skipped
Verified pipeline runs correctly with enablePKChunk=true and large dataset → chunking applied
Verified pipeline runs correctly with enablePKChunk=false → chunking skipped, no count query

@google-cla
Copy link
Copy Markdown

google-cla Bot commented Apr 20, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@harishhk107 harishhk107 force-pushed the feature/prevent-empty-chunks-pk-chunking branch from faac75b to 5b4cf9e Compare April 20, 2026 05:53
Comment thread src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java Outdated
Comment thread src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSourceETLTest.java Outdated
@vikasrathee-cs vikasrathee-cs changed the title feat: validate PK Chunking for large queries [PLUGIN-1957] Autodetect PK Chunking for incremental loads Apr 21, 2026
@harishhk107 harishhk107 force-pushed the feature/prevent-empty-chunks-pk-chunking branch from db562e2 to 3f613ab Compare April 22, 2026 11:19
@vikasrathee-cs vikasrathee-cs force-pushed the feature/prevent-empty-chunks-pk-chunking branch from 3f613ab to d6c2a04 Compare April 22, 2026 11:31
@harishhk107 harishhk107 force-pushed the feature/prevent-empty-chunks-pk-chunking branch 2 times, most recently from d88d48d to fba1f3f Compare April 22, 2026 12:54
public static final int MAX_PK_CHUNK_SIZE = 250000;
public static final int DEFAULT_PK_CHUNK_SIZE = 100000;
public static final int MIN_PK_CHUNK_SIZE = 1;
public static final long AUTO_PK_CHUNK_THRESHOLD = 1_000_000;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1M is too high, we should use the chunk size(that we generally use to create chunks) as a threshold here

Copy link
Copy Markdown
Contributor Author

@harishhk107 harishhk107 Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we could lower the threshold to the chunk size, we have tested the 1M threshold and found that it doesn't make a significant difference in execution time

Before ~13m
After ~13m

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1M is too high from readstream perspective, for some cases we may not be able to read that much data in a single stream.

Also how are you measuring execution time? in DTS it would definitely change as with chunking we will process 1M record in parallel and without chunking it would happen in a sequential manner

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m currently testing this locally on a CDAP pipeline.

* @param threshold the record count threshold above which PK chunking is enabled
* @return true if PK chunking should be auto-enabled, false otherwise
*/
public static boolean shouldAutoDetectPKChunk(String query, AuthenticatorCredentials credentials, long threshold) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not auto detecting chunking, instead it's a validation check for PK chunking.
We can use more intuitive names like hasRequiredCountForPkChunking

return false;
}
if (SalesforceQueryParser.isRestrictedPKQuery(query)) {
LOG.debug("PK Chunking auto-decision: query contains restricted clauses, skipping PK chunking");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record count criteria not auto decision, apply everywhere

}

@Test
public void testCreateCountQuery() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please follow same naming convention across all tests?

Test names should summarize the behavior being tested and its expected outcome.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@Test
public void testCreateCountQuery() {
String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z";
String countQuery = SalesforceQueryUtil.createCountQuery(query);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structure test in arrange, act, assert blocks , you can use empty line as separatot between these blocks, Comment applies to all

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied testMethodName_stateUnderTest_expectedBehavior pattern
Added blank lines for Arrange/Act/Assert

* for __c suffix), but threshold check still applies.
*/
@Test
public void pkChunking_customObject_returnsAll() throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow naming and structure guideline as described in below comments

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

* @param query the original SOQL query
* @return a COUNT SOQL query string
*/
public static String createCountQuery(String query) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does this query takes when we use filters?

Can you add tests cases with before and after time with high record count

  1. Test table having 30-40M records and with a filter query
  2. Table with 10M records with a filter query.

Basically the things that we want to tests is how does the query cost scales with record count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for Limit: The primary reason for adding the LIMIT clause is Salesforce’s hard-enforced 120-second synchronous query execution timeout. Without it, standard COUNT() queries on large tables often exceed this limit and raise a QUERY_TIMEOUT exception, causing pipeline failures.

Behavior After Code Changes: Because the query stops scanning once it hits the threshold, running this on a table with 10M records versus a table with 40M records will yield similar performance (assuming both have at least 100k records). The database will not perform a full scan of the 10M or 40M records. Therefore, performance tests will only show scaling behavior up to the threshold limit, regardless of the total table volume or the filters used.

Observations: In our observations, these count queries with the LIMIT clause typically take between 700ms and 2 seconds to complete depending on the table size, safely avoiding the 120-second synchronous timeout.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIMIT caps the maximum number of matching results returned, not the number of rows scanned to find those matches.
So I am not able to understand how adding limit will optimize the query.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we were dealing with standard ANSI SQL databases (like PostgreSQL or MySQL). In standard SQL engines, SELECT COUNT(*) FROM table LIMIT X computes the full aggregation first by scanning all matching rows to produce a single scalar value, and then applies the LIMIT clause to that final single row. In that context, LIMIT does not optimize or reduce the number of rows scanned at all.

However, Salesforce’s SOQL query engine handles COUNT() natively in a different way:
When a LIMIT clause is appended to a COUNT() query in SOQL (e.g., SELECT COUNT() FROM Object WHERE ... LIMIT 100000), it acts as a short-circuit instruction for the Salesforce query optimizer. The Salesforce database engine stops scanning and counting records as soon as it reaches 100,000 matches that satisfy the filter criteria. It then immediately returns the QueryResult with size capped at 100,000.

Conclusion: If LIMIT operated like standard SQL (evaluating the full count first and capping the returned result set afterward), the database engine would still have to perform the full table scan of all 30+ million records. If it performed a full scan, the query would still take >120 seconds and trigger the exact same QUERY_TIMEOUT exception.

The fact that execution drops from >120 seconds (Timeout) down to <2 seconds is definitive proof that the Salesforce query engine stops scanning rows the moment the LIMIT threshold is reached.

Official Documentation References:

Copy link
Copy Markdown

@Sunish-Dahiya Sunish-Dahiya May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says the same thing as i mentioned above - as soon as it reaches 100,000 matches that satisfy the filter criteria

If a table has 1 million records and only 1 lakh are matching the filter criteria, to find out those 1lakh it may need to scan the whole table. So the query complexity will depend on the nature of filter and data, adding limit doesn't result in any optimisation in such case.
It can help in cases where lets say filter criteria return 1 million records but since we want to only check if it has atleast 1 lakh records then in those cases it would return early, as soon as it find 1 lakh record but finding 1 lakh record doesn't mean it won't do table scan.

Also the official refs attached in your comment do not mention anything related to query optimisation.

@harishhk107 harishhk107 force-pushed the feature/prevent-empty-chunks-pk-chunking branch from ca92e80 to 73dacc9 Compare April 27, 2026 11:23
@harishhk107 harishhk107 changed the title [PLUGIN-1957] Autodetect PK Chunking for incremental loads [PLUGIN-1957] Validate PK Chunking for incremental loads Apr 27, 2026
@harishhk107 harishhk107 force-pushed the feature/prevent-empty-chunks-pk-chunking branch 4 times, most recently from dfd2083 to 62deabb Compare May 11, 2026 17:47
@vikasrathee-cs vikasrathee-cs force-pushed the feature/prevent-empty-chunks-pk-chunking branch from 62deabb to 303f515 Compare May 21, 2026 04:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants