Skip to content

[plan][runtime][java] Add CEL Action condition filtering#821

Open
rosemarYuan wants to merge 13 commits into
apache:mainfrom
rosemarYuan:pr2/java-cel-implementation
Open

[plan][runtime][java] Add CEL Action condition filtering#821
rosemarYuan wants to merge 13 commits into
apache:mainfrom
rosemarYuan:pr2/java-cel-implementation

Conversation

@rosemarYuan

@rosemarYuan rosemarYuan commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Linked issue: #754

Stacked PR — this PR is based on #756 . Please review the [incremental diff], not the full diff against main.

Purpose of change

This is the second PR in the four-PR stack tracked by #754:

  1. [api][java][python] Introduce EventType constants and unify Action trigger entry #756 — API changes (under review)
  2. This PR — Java CEL runtime
  3. Python CEL runtime (planned)
  4. Documentation (planned)

It adds the Java-side CEL runtime for @Action trigger conditions, enabling actions to filter events by field-level CEL expressions (e.g. event.tokenCount > 100).

Key components

  • ParsedCondition.classify: parses each trigger condition via CEL parser; bare identifier → TypeMatch, anything else → CelExpression.
  • CelMacroPolicy: custom has() macro (has(x)has(attributes.x)); whitelist rejects exists/filter/map etc.
  • CelExpressionFacade: Parse → Validate → Check → Compile pipeline with process-wide LRU program cache and AST resource limits.
  • CelConditionEvaluator: pre-compiles programs at open time; builds activation map from event; evaluates with configurable failure policy.
  • ActionRouter: two-phase routing — actionsByEvent HashMap fast path for TypeMatch, CEL slow path with lazy activation and LinkedHashSet deduplication.
  • Operator integration: ActionExecutionOperator / EventRouter delegate to ActionRouter.

Tests

  • Plan layer: ParsedConditionTest, ActionParsedConditionsTest, ActionConstructionFailureTest
  • Runtime layer: CelExpressionFacadeTest, CelConditionEvaluatorTest, ActionRouterTest, CelResourceLimitsTest
  • Serialization: ActionJsonSerializerTest, ActionJsonDeserializerTest (including legacy listen_event_types fallback)
  • Cross-language: cel_conformance_cases.yaml, disallowed_macros.yaml + .github/workflows/cel-conformance.yml

API

No new public API beyond what was introduced in #756. This PR adds runtime internals only.

Documentation

  • doc-needed
  • doc-not-needed (documentation will be covered in PR4)
  • doc-included

@rosemarYuan rosemarYuan force-pushed the pr2/java-cel-implementation branch from 4d01e78 to b4d2ebb Compare June 10, 2026 02:54
@github-actions github-actions Bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Jun 10, 2026

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — the serialization discipline stands out: keeping the compiled Programs off the Java-serialization path (router built on the TaskManager) and rebuilding the transient actionsWithCel in both readObject and the constructors are the easy-to-miss details that quietly break on failover. The macro-whitelist-via-CALL-node approach is a neat, well-tested call too. A few questions inline, plus one CI issue worth a look before merge.

  1. The dist uber-jar now shades dev.cel and its transitive graph (antlr4-runtime, re2j, protobuf-java). Is the LICENSE/NOTICE update for those tracked somewhere (release cut, or PR4), or expected in this PR? Just so it doesn't fall through the Apache release gate.

Comment thread .github/workflows/cel-conformance.yml Outdated
python3 << 'PY'
import re, sys

java_file = 'plan/src/main/java/org/apache/flink/agents/plan/condition/CelReserved.java'

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reserved-keyword parity step reads symbols that don't exist at head, so it verifies nothing today and will mis-fire once PR3 lands. java_file points at CelReserved.java, but the reserved set lives in CelMacroPolicy.RESERVED_IDENTIFIERS; the set\.add("...") regex matches nothing (the set is built with Set.of(...) + set.addAll(CEL_STANDARD_MACROS)), so java_names comes out empty; and the DISALLOWED_MACROS branch keys off a constant that isn't there (CEL_STANDARD_MACROS / ALLOWED_MACROS are). Line 111 also lists AstRewriterTest in -Dtest, which has no class at head. These read like leftovers from the pre-refactor design (a CelReserved class, a DISALLOWED_MACROS set, an AST rewriter) that got folded into CelMacroPolicy without the workflow following along.

It's harmless right now because the step exit 0s while cel_reserved.py is absent — but the moment PR3 adds the Python side, this compares an empty Java set against the Python frozenset: either a spurious failure (Java only: []) or, if both happen to be empty, a false OK: 0 reserved identifiers aligned that silently passes a check guaranteeing nothing. Since the cross-language parity guard is the whole point of the workflow, would it be worth fixing it here — point the parser at CelMacroPolicy.java, extract RESERVED_IDENTIFIERS from the Set.of(...) block plus CEL_STANDARD_MACROS, drop AstRewriterTest — rather than carrying it forward broken? A guard that fails when java_names is empty would also keep a future refactor from silently re-breaking it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for flagging this — I hadn't realized the deps shaded into the uber-jar needed LICENSE entries. You're right that this PR is where dev.cel enters the uber-jar, so this is the right place to handle it. Pushed a commit adding a bundled-dependency section to LICENSE listing the four transitives (dev.cel, antlr4-runtime, re2j, protobuf-java; Apache 2.0 / BSD-3-Clause) and appending the upstream copyright lines to NOTICE.

On cel-conformance.yml — my miss, I forgot to update the workflow after the refactor, and the issues you called out all hold. I fixed them locally first, then moved the fix commit over to the Python implementation PR. Rationale: the cross-language parity check only becomes meaningful once cel_reserved.py lands, so landing the fix together with the Python side makes for a cleaner story and lets us verify both ends in one go. The Python PR will go out after this Java PR is reviewed — looking forward to your eyes on that one too.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — the LICENSE/NOTICE additions cover it: dev.cel:cel/dev.cel:protobuf under Apache 2.0, re2j/antlr4-runtime under BSD-3-Clause, plus the guava/checker-qual/tree-sitter transitives, with the individual license files alongside. The definitive check is the shaded dist jar at release, but the set lines up with dev.cel's closure.

And agreed on removing the workflow until the Python side lands — that's cleaner than carrying a parity check that can't run yet, and it sidesteps the empty-set false-pass. I'll keep an eye out for the rebuilt check on the Python PR: that the regex pulls a non-empty RESERVED_IDENTIFIERS out of CelMacroPolicy, and that an empty java_names fails loudly rather than aligning two empty sets. Quick turnaround, appreciated.

/** Original user-written entry string. */
String source();

/** Parser with the custom {@code has()} macro; same dialect as the runtime facade parser. */

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parser is built without the resource-limit CelOptions that CelExpressionFacade.CEL_PARSER uses (maxParseRecursionDepth(32), maxExpressionCodePointSize(8192)), so the "same dialect as the runtime facade parser" note isn't quite accurate — the parse limits differ. The practical effect is small since classify() runs at plan-build time on the author's own annotation strings, but it does mean a deep expression parses fine here and only trips the cap later at toProgram. Could we apply the same maxParseRecursionDepth(32) / maxExpressionCodePointSize(8192) caps here too? ParsedCondition is in plan and the facade's CEL_OPTIONS is private to runtime, so a shared parser would cross the module boundary — but duplicating the two caps would line the limits up with the "same dialect" doc and surface a too-deep expression once, early.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the doc claim and the code had drifted, and I did miss this during the implementation.

I adopted the local fix you suggested: duplicating the two numeric caps in the plan-side parser is lighter than crossing the plan/runtime module boundary for a shared CelOptions singleton. With this change, a too-deep or too-long expression now fails at classify() during job submission, which matches the runtime facade behavior claimed in the documentation.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed — the duplicated caps line up with CelExpressionFacade.CEL_OPTIONS (maxParseRecursionDepth(32) / maxExpressionCodePointSize(8192)), so a too-deep or too-long expression now trips at classify() as the doc claims.

if (celExpressions.isEmpty()) {
return;
}
conditionEvaluator = new CelConditionEvaluator();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open() always constructs the evaluator with the default WARN_AND_SKIP policy, and nothing else (config, plan field, ctor arg) can select FAIL, so production always silently drops an action on a CEL runtime error or non-boolean result, with only a WARN log as signal. FAIL is fully implemented and tested but reachable only from tests. Two things I wanted to check rather than assume: is silent-skip the intended only production behavior here (fail-open vs. fail-closed is a real choice — a compilable expression that throws on every event would just quietly stop firing), and is FAIL meant to become a config knob, or is it speculative for now? If it's not going to be selectable, dropping it would avoid the test-only code path; if it is, the wiring looks like it's missing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — this was intended to be configurable, but the final wiring step was missing, so FAIL was only reachable from tests.

The intended default is still WARN_AND_SKIP, since that is the safer streaming behavior: a single bad event or unexpected CEL evaluation result should not fail the whole job. That said, strict-semantics pipelines do need a fail-closed option, so FAIL should be reachable in production rather than remaining a test-only path.

I added a follow-up commit to wire this through properly:

  • promoted EvaluationFailurePolicy to the API module as CelEvaluationFailurePolicy, following the existing LoggerType pattern;
  • exposed CEL_EVALUATION_FAILURE_POLICY in AgentConfigOptions, with WARN_AND_SKIP as the default;
  • updated ActionRouter.open() to read the policy from the plan config when constructing the evaluator.

With this change, both WARN_AND_SKIP and FAIL are now selectable in production. Default behavior is unchanged for existing users — anyone not setting celEvaluationFailurePolicy in plan config still gets WARN_AND_SKIP.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wiring looks right — getConfig() is always initialized (every AgentPlan constructor sets a default AgentConfiguration), so open() won't NPE, and the enum round-trips through config the same way EVENT_LOGGER_TYPE/LoggerType does.

One gap on the path you just added, though: nothing tests it end-to-end. The only FAIL test (CelConditionEvaluatorTest.testFailPolicyThrowsOnEvaluationError) constructs the evaluator directly, so it never exercises CEL_EVALUATION_FAILURE_POLICY → ActionRouter.open() → evaluator — and all the ActionRouterTest cases run on the default policy. A regression in the wiring (wrong option read, default flipped, open() dropping the value) would stay green. Would it be worth a small ActionRouterTest case — build a plan with a CEL action, getConfig().set(AgentConfigOptions.CEL_EVALUATION_FAILURE_POLICY, FAIL), then assert route() throws IllegalStateException on an event whose condition errors? That would lock down the path this commit is here to enable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review suggestion. Good catch — that error path was not covered before.

I added two paired cases in ActionRouterTest, both using the same erroring expression (attributes.nonexistent > 3) and the same event:

  • route_failPolicyFromConfig_throwsOnConditionEvaluationError: sets CEL_EVALUATION_FAILURE_POLICY=FAIL via getConfig().set(...) and verifies that route() throws IllegalStateException.
  • route_defaultWarnAndSkip_swallowsConditionEvaluationError: leaves the policy unset and verifies that route() returns an empty result.

The pairing is intentional. A standalone FAIL -> throw test could still pass if the default policy were accidentally changed, the option key were misread, or the configured value were dropped. By keeping the expression and event identical and varying only the policy source, the tests make those regression modes visible.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed — the paired cases close the gap. route_failPolicyFromConfig_throwsOnConditionEvaluationError walks exactly the path this commit enables (getConfig().set(FAIL)open() reading CEL_EVALUATION_FAILURE_POLICY → evaluator), and pinning hasMessageContaining("CEL condition evaluation failed") keys on the eval-error branch specifically rather than the non-boolean message, so it won't silently match the wrong throw site. Holding the expression and event fixed while varying only the policy source is what makes the default-flip / misread-key / dropped-value regressions each surface in one case.

Nothing further from me — the changes look good. I'll leave the final call to the maintainers.

@rosemarYuan rosemarYuan force-pushed the pr2/java-cel-implementation branch from b4d2ebb to 0a42523 Compare June 10, 2026 09:06
@github-actions github-actions Bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants