[PLUGIN-1957] Validate PK Chunking for incremental loads#354
[PLUGIN-1957] Validate PK Chunking for incremental loads#354harishhk107 wants to merge 1 commit into
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
faac75b to
5b4cf9e
Compare
db562e2 to
3f613ab
Compare
3f613ab to
d6c2a04
Compare
d88d48d to
fba1f3f
Compare
| public static final int MAX_PK_CHUNK_SIZE = 250000; | ||
| public static final int DEFAULT_PK_CHUNK_SIZE = 100000; | ||
| public static final int MIN_PK_CHUNK_SIZE = 1; | ||
| public static final long AUTO_PK_CHUNK_THRESHOLD = 1_000_000; |
There was a problem hiding this comment.
1M is too high, we should use the chunk size(that we generally use to create chunks) as a threshold here
There was a problem hiding this comment.
While we could lower the threshold to the chunk size, we have tested the 1M threshold and found that it doesn't make a significant difference in execution time
Before ~13m
After ~13m
There was a problem hiding this comment.
1M is too high from readstream perspective, for some cases we may not be able to read that much data in a single stream.
Also how are you measuring execution time? in DTS it would definitely change as with chunking we will process 1M record in parallel and without chunking it would happen in a sequential manner
There was a problem hiding this comment.
I’m currently testing this locally on a CDAP pipeline.
| * @param threshold the record count threshold above which PK chunking is enabled | ||
| * @return true if PK chunking should be auto-enabled, false otherwise | ||
| */ | ||
| public static boolean shouldAutoDetectPKChunk(String query, AuthenticatorCredentials credentials, long threshold) { |
There was a problem hiding this comment.
We are not auto detecting chunking, instead it's a validation check for PK chunking.
We can use more intuitive names like hasRequiredCountForPkChunking
| return false; | ||
| } | ||
| if (SalesforceQueryParser.isRestrictedPKQuery(query)) { | ||
| LOG.debug("PK Chunking auto-decision: query contains restricted clauses, skipping PK chunking"); |
There was a problem hiding this comment.
record count criteria not auto decision, apply everywhere
| } | ||
|
|
||
| @Test | ||
| public void testCreateCountQuery() { |
There was a problem hiding this comment.
can we please follow same naming convention across all tests?
Test names should summarize the behavior being tested and its expected outcome.
| @Test | ||
| public void testCreateCountQuery() { | ||
| String query = "SELECT Id,Name,SomeField FROM sObjectName WHERE LastModifiedDate>=2019-04-12T23:23:23Z"; | ||
| String countQuery = SalesforceQueryUtil.createCountQuery(query); |
There was a problem hiding this comment.
Structure test in arrange, act, assert blocks , you can use empty line as separatot between these blocks, Comment applies to all
There was a problem hiding this comment.
applied testMethodName_stateUnderTest_expectedBehavior pattern
Added blank lines for Arrange/Act/Assert
| * for __c suffix), but threshold check still applies. | ||
| */ | ||
| @Test | ||
| public void pkChunking_customObject_returnsAll() throws Exception { |
There was a problem hiding this comment.
Please follow naming and structure guideline as described in below comments
| * @param query the original SOQL query | ||
| * @return a COUNT SOQL query string | ||
| */ | ||
| public static String createCountQuery(String query) { |
There was a problem hiding this comment.
How long does this query takes when we use filters?
Can you add tests cases with before and after time with high record count
- Test table having 30-40M records and with a filter query
- Table with 10M records with a filter query.
Basically the things that we want to tests is how does the query cost scales with record count.
There was a problem hiding this comment.
Reason for Limit: The primary reason for adding the LIMIT clause is Salesforce’s hard-enforced 120-second synchronous query execution timeout. Without it, standard COUNT() queries on large tables often exceed this limit and raise a QUERY_TIMEOUT exception, causing pipeline failures.
Behavior After Code Changes: Because the query stops scanning once it hits the threshold, running this on a table with 10M records versus a table with 40M records will yield similar performance (assuming both have at least 100k records). The database will not perform a full scan of the 10M or 40M records. Therefore, performance tests will only show scaling behavior up to the threshold limit, regardless of the total table volume or the filters used.
Observations: In our observations, these count queries with the LIMIT clause typically take between 700ms and 2 seconds to complete depending on the table size, safely avoiding the 120-second synchronous timeout.
There was a problem hiding this comment.
LIMIT caps the maximum number of matching results returned, not the number of rows scanned to find those matches.
So I am not able to understand how adding limit will optimize the query.
There was a problem hiding this comment.
if we were dealing with standard ANSI SQL databases (like PostgreSQL or MySQL). In standard SQL engines, SELECT COUNT(*) FROM table LIMIT X computes the full aggregation first by scanning all matching rows to produce a single scalar value, and then applies the LIMIT clause to that final single row. In that context, LIMIT does not optimize or reduce the number of rows scanned at all.
However, Salesforce’s SOQL query engine handles COUNT() natively in a different way:
When a LIMIT clause is appended to a COUNT() query in SOQL (e.g., SELECT COUNT() FROM Object WHERE ... LIMIT 100000), it acts as a short-circuit instruction for the Salesforce query optimizer. The Salesforce database engine stops scanning and counting records as soon as it reaches 100,000 matches that satisfy the filter criteria. It then immediately returns the QueryResult with size capped at 100,000.
Conclusion: If LIMIT operated like standard SQL (evaluating the full count first and capping the returned result set afterward), the database engine would still have to perform the full table scan of all 30+ million records. If it performed a full scan, the query would still take >120 seconds and trigger the exact same QUERY_TIMEOUT exception.
The fact that execution drops from >120 seconds (Timeout) down to <2 seconds is definitive proof that the Salesforce query engine stops scanning rows the moment the LIMIT threshold is reached.
Official Documentation References:
- For how
COUNT()populates thesizeproperty directly: SOQL COUNT() Documentation- For capping execution evaluations: SOQL LIMIT Clause Documentation
There was a problem hiding this comment.
It says the same thing as i mentioned above - as soon as it reaches 100,000 matches that satisfy the filter criteria
If a table has 1 million records and only 1 lakh are matching the filter criteria, to find out those 1lakh it may need to scan the whole table. So the query complexity will depend on the nature of filter and data, adding limit doesn't result in any optimisation in such case.
It can help in cases where lets say filter criteria return 1 million records but since we want to only check if it has atleast 1 lakh records then in those cases it would return early, as soon as it find 1 lakh record but finding 1 lakh record doesn't mean it won't do table scan.
Also the official refs attached in your comment do not mention anything related to query optimisation.
ca92e80 to
73dacc9
Compare
dfd2083 to
62deabb
Compare
62deabb to
303f515
Compare
PLUGIN-1957 Autodetect PK Chunking for incremental loads
What
Adds a record count check before enabling PK chunking in SalesforceBatchSource.getSplits().
If the record count is below PK_CHUNK_RECORD_COUNT_THRESHOLD (100,000), PK chunking is skipped
even if enabled in config, to avoid unnecessary overhead on small datasets. Just for DTS
Why
PK chunking is designed for very large datasets. Enabling it on small datasets causes empty
chunk overhead and increased pipeline execution time without any benefit. This change ensures
chunking is only applied when it is operationally justified by the actual record count.
Changes
SalesforceBatchSource.java —> fixed getSplits() to call hasRequiredCountForPkChunking() only when
config.getEnablePKChunk() && pkChunkCountCheck is true, added pkChunkCountCheck parameter
SalesforceSplitUtil.java —> added hasRequiredCountForPkChunking() which runs a COUNT() query
to check record count against the threshold before enabling chunking
SalesforceSourceConstants.java — added PK_CHUNK_RECORD_COUNT_THRESHOLD = 1_00_000
Note on SOQL Specific Behavior
In Salesforce SOQL,
LIMITactually stops the counting process once the limit is reached. This makes it extremely efficient for threshold checks on massive tables without causing server timeouts.Manual Testing
Verified pipeline runs correctly with enablePKChunk=true and small dataset → chunking skipped
Verified pipeline runs correctly with enablePKChunk=true and large dataset → chunking applied
Verified pipeline runs correctly with enablePKChunk=false → chunking skipped, no count query