[plan][runtime][java] Add CEL Action condition filtering#821
[plan][runtime][java] Add CEL Action condition filtering#821rosemarYuan wants to merge 13 commits into
Conversation
4d01e78 to
b4d2ebb
Compare
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — the serialization discipline stands out: keeping the compiled Programs off the Java-serialization path (router built on the TaskManager) and rebuilding the transient actionsWithCel in both readObject and the constructors are the easy-to-miss details that quietly break on failover. The macro-whitelist-via-CALL-node approach is a neat, well-tested call too. A few questions inline, plus one CI issue worth a look before merge.
- The dist uber-jar now shades
dev.celand its transitive graph (antlr4-runtime, re2j, protobuf-java). Is the LICENSE/NOTICE update for those tracked somewhere (release cut, or PR4), or expected in this PR? Just so it doesn't fall through the Apache release gate.
| python3 << 'PY' | ||
| import re, sys | ||
|
|
||
| java_file = 'plan/src/main/java/org/apache/flink/agents/plan/condition/CelReserved.java' |
There was a problem hiding this comment.
The reserved-keyword parity step reads symbols that don't exist at head, so it verifies nothing today and will mis-fire once PR3 lands. java_file points at CelReserved.java, but the reserved set lives in CelMacroPolicy.RESERVED_IDENTIFIERS; the set\.add("...") regex matches nothing (the set is built with Set.of(...) + set.addAll(CEL_STANDARD_MACROS)), so java_names comes out empty; and the DISALLOWED_MACROS branch keys off a constant that isn't there (CEL_STANDARD_MACROS / ALLOWED_MACROS are). Line 111 also lists AstRewriterTest in -Dtest, which has no class at head. These read like leftovers from the pre-refactor design (a CelReserved class, a DISALLOWED_MACROS set, an AST rewriter) that got folded into CelMacroPolicy without the workflow following along.
It's harmless right now because the step exit 0s while cel_reserved.py is absent — but the moment PR3 adds the Python side, this compares an empty Java set against the Python frozenset: either a spurious failure (Java only: []) or, if both happen to be empty, a false OK: 0 reserved identifiers aligned that silently passes a check guaranteeing nothing. Since the cross-language parity guard is the whole point of the workflow, would it be worth fixing it here — point the parser at CelMacroPolicy.java, extract RESERVED_IDENTIFIERS from the Set.of(...) block plus CEL_STANDARD_MACROS, drop AstRewriterTest — rather than carrying it forward broken? A guard that fails when java_names is empty would also keep a future refactor from silently re-breaking it.
There was a problem hiding this comment.
Thanks for flagging this — I hadn't realized the deps shaded into the uber-jar needed LICENSE entries. You're right that this PR is where dev.cel enters the uber-jar, so this is the right place to handle it. Pushed a commit adding a bundled-dependency section to LICENSE listing the four transitives (dev.cel, antlr4-runtime, re2j, protobuf-java; Apache 2.0 / BSD-3-Clause) and appending the upstream copyright lines to NOTICE.
On cel-conformance.yml — my miss, I forgot to update the workflow after the refactor, and the issues you called out all hold. I fixed them locally first, then moved the fix commit over to the Python implementation PR. Rationale: the cross-language parity check only becomes meaningful once cel_reserved.py lands, so landing the fix together with the Python side makes for a cleaner story and lets us verify both ends in one go. The Python PR will go out after this Java PR is reviewed — looking forward to your eyes on that one too.
There was a problem hiding this comment.
Thanks — the LICENSE/NOTICE additions cover it: dev.cel:cel/dev.cel:protobuf under Apache 2.0, re2j/antlr4-runtime under BSD-3-Clause, plus the guava/checker-qual/tree-sitter transitives, with the individual license files alongside. The definitive check is the shaded dist jar at release, but the set lines up with dev.cel's closure.
And agreed on removing the workflow until the Python side lands — that's cleaner than carrying a parity check that can't run yet, and it sidesteps the empty-set false-pass. I'll keep an eye out for the rebuilt check on the Python PR: that the regex pulls a non-empty RESERVED_IDENTIFIERS out of CelMacroPolicy, and that an empty java_names fails loudly rather than aligning two empty sets. Quick turnaround, appreciated.
| /** Original user-written entry string. */ | ||
| String source(); | ||
|
|
||
| /** Parser with the custom {@code has()} macro; same dialect as the runtime facade parser. */ |
There was a problem hiding this comment.
This parser is built without the resource-limit CelOptions that CelExpressionFacade.CEL_PARSER uses (maxParseRecursionDepth(32), maxExpressionCodePointSize(8192)), so the "same dialect as the runtime facade parser" note isn't quite accurate — the parse limits differ. The practical effect is small since classify() runs at plan-build time on the author's own annotation strings, but it does mean a deep expression parses fine here and only trips the cap later at toProgram. Could we apply the same maxParseRecursionDepth(32) / maxExpressionCodePointSize(8192) caps here too? ParsedCondition is in plan and the facade's CEL_OPTIONS is private to runtime, so a shared parser would cross the module boundary — but duplicating the two caps would line the limits up with the "same dialect" doc and surface a too-deep expression once, early.
There was a problem hiding this comment.
You're right — the doc claim and the code had drifted, and I did miss this during the implementation.
I adopted the local fix you suggested: duplicating the two numeric caps in the plan-side parser is lighter than crossing the plan/runtime module boundary for a shared CelOptions singleton. With this change, a too-deep or too-long expression now fails at classify() during job submission, which matches the runtime facade behavior claimed in the documentation.
There was a problem hiding this comment.
Confirmed — the duplicated caps line up with CelExpressionFacade.CEL_OPTIONS (maxParseRecursionDepth(32) / maxExpressionCodePointSize(8192)), so a too-deep or too-long expression now trips at classify() as the doc claims.
| if (celExpressions.isEmpty()) { | ||
| return; | ||
| } | ||
| conditionEvaluator = new CelConditionEvaluator(); |
There was a problem hiding this comment.
open() always constructs the evaluator with the default WARN_AND_SKIP policy, and nothing else (config, plan field, ctor arg) can select FAIL, so production always silently drops an action on a CEL runtime error or non-boolean result, with only a WARN log as signal. FAIL is fully implemented and tested but reachable only from tests. Two things I wanted to check rather than assume: is silent-skip the intended only production behavior here (fail-open vs. fail-closed is a real choice — a compilable expression that throws on every event would just quietly stop firing), and is FAIL meant to become a config knob, or is it speculative for now? If it's not going to be selectable, dropping it would avoid the test-only code path; if it is, the wiring looks like it's missing.
There was a problem hiding this comment.
Good catch — this was intended to be configurable, but the final wiring step was missing, so FAIL was only reachable from tests.
The intended default is still WARN_AND_SKIP, since that is the safer streaming behavior: a single bad event or unexpected CEL evaluation result should not fail the whole job. That said, strict-semantics pipelines do need a fail-closed option, so FAIL should be reachable in production rather than remaining a test-only path.
I added a follow-up commit to wire this through properly:
- promoted
EvaluationFailurePolicyto the API module asCelEvaluationFailurePolicy, following the existingLoggerTypepattern; - exposed
CEL_EVALUATION_FAILURE_POLICYinAgentConfigOptions, withWARN_AND_SKIPas the default; - updated
ActionRouter.open()to read the policy from the plan config when constructing the evaluator.
With this change, both WARN_AND_SKIP and FAIL are now selectable in production. Default behavior is unchanged for existing users — anyone not setting celEvaluationFailurePolicy in plan config still gets WARN_AND_SKIP.
There was a problem hiding this comment.
The wiring looks right — getConfig() is always initialized (every AgentPlan constructor sets a default AgentConfiguration), so open() won't NPE, and the enum round-trips through config the same way EVENT_LOGGER_TYPE/LoggerType does.
One gap on the path you just added, though: nothing tests it end-to-end. The only FAIL test (CelConditionEvaluatorTest.testFailPolicyThrowsOnEvaluationError) constructs the evaluator directly, so it never exercises CEL_EVALUATION_FAILURE_POLICY → ActionRouter.open() → evaluator — and all the ActionRouterTest cases run on the default policy. A regression in the wiring (wrong option read, default flipped, open() dropping the value) would stay green. Would it be worth a small ActionRouterTest case — build a plan with a CEL action, getConfig().set(AgentConfigOptions.CEL_EVALUATION_FAILURE_POLICY, FAIL), then assert route() throws IllegalStateException on an event whose condition errors? That would lock down the path this commit is here to enable.
There was a problem hiding this comment.
Thanks for the review suggestion. Good catch — that error path was not covered before.
I added two paired cases in ActionRouterTest, both using the same erroring expression (attributes.nonexistent > 3) and the same event:
route_failPolicyFromConfig_throwsOnConditionEvaluationError: setsCEL_EVALUATION_FAILURE_POLICY=FAILviagetConfig().set(...)and verifies thatroute()throwsIllegalStateException.route_defaultWarnAndSkip_swallowsConditionEvaluationError: leaves the policy unset and verifies thatroute()returns an empty result.
The pairing is intentional. A standalone FAIL -> throw test could still pass if the default policy were accidentally changed, the option key were misread, or the configured value were dropped. By keeping the expression and event identical and varying only the policy source, the tests make those regression modes visible.
There was a problem hiding this comment.
Confirmed — the paired cases close the gap. route_failPolicyFromConfig_throwsOnConditionEvaluationError walks exactly the path this commit enables (getConfig().set(FAIL) → open() reading CEL_EVALUATION_FAILURE_POLICY → evaluator), and pinning hasMessageContaining("CEL condition evaluation failed") keys on the eval-error branch specifically rather than the non-boolean message, so it won't silently match the wrong throw site. Holding the expression and event fixed while varying only the policy source is what makes the default-flip / misread-key / dropped-value regressions each surface in one case.
Nothing further from me — the changes look good. I'll leave the final call to the maintainers.
b4d2ebb to
0a42523
Compare
…ed ActionRouterTest cases
Linked issue: #754
Purpose of change
This is the second PR in the four-PR stack tracked by #754:
It adds the Java-side CEL runtime for
@Actiontrigger conditions, enabling actions to filter events by field-level CEL expressions (e.g.event.tokenCount > 100).Key components
ParsedCondition.classify: parses each trigger condition via CEL parser; bare identifier →TypeMatch, anything else →CelExpression.CelMacroPolicy: customhas()macro (has(x)→has(attributes.x)); whitelist rejectsexists/filter/mapetc.CelExpressionFacade: Parse → Validate → Check → Compile pipeline with process-wide LRU program cache and AST resource limits.CelConditionEvaluator: pre-compiles programs at open time; builds activation map from event; evaluates with configurable failure policy.ActionRouter: two-phase routing —actionsByEventHashMap fast path forTypeMatch, CEL slow path with lazy activation andLinkedHashSetdeduplication.ActionExecutionOperator/EventRouterdelegate toActionRouter.Tests
ParsedConditionTest,ActionParsedConditionsTest,ActionConstructionFailureTestCelExpressionFacadeTest,CelConditionEvaluatorTest,ActionRouterTest,CelResourceLimitsTestActionJsonSerializerTest,ActionJsonDeserializerTest(including legacylisten_event_typesfallback)cel_conformance_cases.yaml,disallowed_macros.yaml+.github/workflows/cel-conformance.ymlAPI
No new public API beyond what was introduced in #756. This PR adds runtime internals only.
Documentation
doc-neededdoc-not-needed(documentation will be covered in PR4)doc-included