diff --git a/agent-service/src/agent/texera-agent.ts b/agent-service/src/agent/texera-agent.ts index 37eb12d8688..0cd69353294 100644 --- a/agent-service/src/agent/texera-agent.ts +++ b/agent-service/src/agent/texera-agent.ts @@ -48,7 +48,8 @@ import { type ExecutionConfig, } from "./tools/workflow-execution-tools"; import { assembleContext } from "./util/context-utils"; -import { compileWorkflowAsync, type WorkflowCompilationResponse } from "../api/compile-api"; +import { compileWorkflowAsync } from "../api/compile-api"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; import type { Logger } from "pino"; @@ -570,15 +571,18 @@ export class TexeraAgent { onStepFinish: async ({ text, toolCalls, toolResults, usage }) => { stepIndex++; + // The AI SDK types tc.input / tr.output as `unknown` for dynamically + // registered tools; narrow to the shapes our tools actually produce + // (object args, string results — see tools/*). const formattedToolCalls = toolCalls?.map(tc => ({ toolName: tc.toolName, toolCallId: tc.toolCallId, - input: tc.input, + input: tc.input as Record, })); const formattedToolResults = toolResults?.map(tr => ({ toolCallId: tr.toolCallId, - output: tr.output, + output: tr.output as string, isError: !!(tr.output as any)?.error, })); @@ -728,7 +732,7 @@ export class TexeraAgent { const result = new Map(); const visible = this.workflowResultState.getAllVisible(); for (const [operatorId, entry] of visible) { - result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo, this.workflowState)); + result.set(operatorId, formatOperatorResult(operatorId, entry.operatorInfo)); } return result; } diff --git a/agent-service/src/agent/tools/result-formatting.test.ts b/agent-service/src/agent/tools/result-formatting.spec.ts similarity index 61% rename from agent-service/src/agent/tools/result-formatting.test.ts rename to agent-service/src/agent/tools/result-formatting.spec.ts index e6d1afdf2e3..1d518b43e9f 100644 --- a/agent-service/src/agent/tools/result-formatting.test.ts +++ b/agent-service/src/agent/tools/result-formatting.spec.ts @@ -19,89 +19,89 @@ import { describe, expect, test } from "bun:test"; import { formatOperatorResult } from "./result-formatting"; -import { WorkflowState } from "../workflow-state"; -import type { OperatorInfo } from "../../types/execution"; -import type { OperatorPredicate, OperatorLink, PortDescription } from "../../types/workflow"; - -function makeOpInfo(overrides: Partial = {}): OperatorInfo { - return { - state: "completed", - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - ...overrides, - }; +import type { OperatorExecutionSummary, OperatorState, SampleRow } from "../../types/execution"; + +// Convert flat test rows (with an optional embedded __row_index__) into the +// structured SampleRow[] the summary now carries. +function toSampleRows(rows: Record[]): SampleRow[] { + return rows.map((row, i) => { + const { __row_index__, ...tuple } = row; + return { rowIndex: typeof __row_index__ === "number" ? __row_index__ : i, tuple }; + }); } -function makeOperator(id: string, inputPortIDs: string[] = []): OperatorPredicate { - const inputPorts: PortDescription[] = inputPortIDs.map((portID, i) => ({ - portID, - displayName: `Input ${i}`, - })); - return { - operatorID: id, - operatorType: "TestOp", - operatorVersion: "1.0", - operatorProperties: {}, - inputPorts, - outputPorts: [{ portID: "output-0", displayName: "Output 0" }], - showAdvanced: false, - }; +// Test convenience: accept the (old) flat fields and assemble the structured +// OperatorExecutionSummary, so the cases below stay terse. +interface OpInfoOverrides { + state?: OperatorState; + error?: string; + outputTuples?: number; + totalRowCount?: number; + warnings?: string[]; + result?: Record[]; } -function makeLink(linkID: string, source: [string, string], target: [string, string]): OperatorLink { - return { - linkID, - source: { operatorID: source[0], portID: source[1] }, - target: { operatorID: target[0], portID: target[1] }, +function makeOpInfo(overrides: OpInfoOverrides = {}): OperatorExecutionSummary { + const summary: OperatorExecutionSummary = { + state: overrides.state ?? "Completed", + errorMessages: overrides.error ? [{ type: "EXECUTION_FAILURE", message: overrides.error }] : [], }; + // The result summary is present only when the operator produced a result. + if (overrides.result !== undefined) { + summary.resultSummary = { + resultMode: "table", + // Non-arrays are passed through to exercise the "(no result data)" guard. + sampleTuples: Array.isArray(overrides.result) ? toSampleRows(overrides.result) : (overrides.result as any), + totalRowCount: overrides.totalRowCount ?? overrides.outputTuples ?? 0, + }; + } + if (overrides.warnings) { + // Warnings are derived from console messages whose title is "WARNING: ...". + summary.consoleLogsSummary = { + messages: overrides.warnings.map(w => ({ msgType: "PRINT", title: w, message: "" })), + }; + } + return summary; } -const EMPTY_STATE = new WorkflowState(); - describe("formatOperatorResult - early returns", () => { test("returns [ERROR] prefix when error field is set", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "boom" })); expect(out).toBe("[ERROR] boom"); }); test("treats empty-string error as falsy and continues to result path", () => { - const out = formatOperatorResult("op1", makeOpInfo({ error: "" }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ error: "" })); expect(out).not.toContain("[ERROR]"); expect(out).toContain("(no result data)"); }); test("returns (no result data) when result is undefined", () => { - const out = formatOperatorResult("op1", makeOpInfo(), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo()); expect(out).toBe("(no result data)"); }); test("returns (no result data) when result is not an array", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ result: { rows: [] } as unknown as Record[] }), - EMPTY_STATE - ); + const out = formatOperatorResult("op1", makeOpInfo({ result: { rows: [] } as unknown as Record[] })); expect(out).toBe("(no result data)"); }); test("empty array result emits brief summary plus zero-column shape only", () => { - const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ result: [], outputTuples: 0 })); expect(out.split("\n")).toEqual(["Executed operator op1", "Output table shape: (0, 0)"]); }); }); describe("formatOperatorResult - table shape and metadata", () => { test("uses outputTuples for row count when totalRowCount missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] }), EMPTY_STATE); + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 7, result: [{ a: 1, b: 2 }] })); expect(out).toContain("Output table shape: (7, 2)"); }); test("totalRowCount overrides outputTuples in output shape", () => { const out = formatOperatorResult( "op1", - makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] }), - EMPTY_STATE + makeOpInfo({ outputTuples: 7, totalRowCount: 999, result: [{ a: 1, b: 2 }] }) ); expect(out).toContain("Output table shape: (999, 2)"); }); @@ -112,8 +112,7 @@ describe("formatOperatorResult - table shape and metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: true, "html-content": "" }], - }), - EMPTY_STATE + }) ); // 1 visible column ("html-content") since __is_visualization__ is filtered. expect(out).toContain("Output table shape: (1, 1)"); @@ -125,85 +124,14 @@ describe("formatOperatorResult - table shape and metadata", () => { makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], - warnings: ["truncated to 1 row", "something else"], - }), - EMPTY_STATE + warnings: ["WARNING: truncated to 1 row", "WARNING: something else"], + }) ); const lines = out.split("\n"); expect(lines[0]).toBe("Executed operator op1"); expect(lines[1]).toBe("Output table shape: (1, 1)"); - expect(lines[2]).toBe("truncated to 1 row"); - expect(lines[3]).toBe("something else"); - }); -}); - -describe("formatOperatorResult - input port metadata", () => { - test("omits input metadata when inputPortShapes is missing", () => { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, result: [{ a: 1 }] }), EMPTY_STATE); - expect(out).not.toContain("Input operator"); - }); - - test("omits input metadata when inputPortShapes is empty", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ outputTuples: 1, result: [{ a: 1 }], inputPortShapes: [] }), - EMPTY_STATE - ); - expect(out).not.toContain("Input operator"); - }); - - test("falls back to inputN placeholder when no upstream link matches the port", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [{ portIndex: 0, rows: 5, columns: 3 }], - }), - EMPTY_STATE - ); - expect(out).toContain("Input operator(table shape): input0(5, 3)"); - }); - - test("uses upstream operator id when an input link matches the port", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("upstream")); - state.addOperator(makeOperator("op1", ["input-0"])); - state.addLink(makeLink("l1", ["upstream", "output-0"], ["op1", "input-0"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 4, - result: [{ a: 1, b: 2 }], - inputPortShapes: [{ portIndex: 0, rows: 10, columns: 2 }], - }), - state - ); - expect(out).toContain("Input operator(table shape): upstream(10, 2)"); - }); - - test("sorts multiple input ports by portIndex regardless of input order", () => { - const state = new WorkflowState(); - state.addOperator(makeOperator("up0")); - state.addOperator(makeOperator("up1")); - state.addOperator(makeOperator("op1", ["input-0", "input-1"])); - state.addLink(makeLink("l0", ["up0", "output-0"], ["op1", "input-0"])); - state.addLink(makeLink("l1", ["up1", "output-0"], ["op1", "input-1"])); - - const out = formatOperatorResult( - "op1", - makeOpInfo({ - outputTuples: 1, - result: [{ a: 1 }], - inputPortShapes: [ - { portIndex: 1, rows: 2, columns: 2 }, - { portIndex: 0, rows: 1, columns: 1 }, - ], - }), - state - ); - expect(out).toContain("Input operator(table shape): up0(1, 1), up1(2, 2)"); + expect(lines[2]).toBe("WARNING: truncated to 1 row"); + expect(lines[3]).toBe("WARNING: something else"); }); }); @@ -221,8 +149,7 @@ describe("formatOperatorResult - visualization rows", () => { label: "chart", }, ], - }), - EMPTY_STATE + }) ); expect(out).toContain(""); expect(out).not.toContain("
hidden
"); @@ -236,8 +163,7 @@ describe("formatOperatorResult - visualization rows", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: false, "html-content": "" }], - }), - EMPTY_STATE + }) ); expect(out).toContain(""); expect(out).not.toContain(""); @@ -249,8 +175,7 @@ describe("formatOperatorResult - visualization rows", () => { makeOpInfo({ outputTuples: 1, result: [{ __is_visualization__: false, value: 1 }], - }), - EMPTY_STATE + }) ); const lines = out.split("\n"); expect(out).toContain("Output table shape: (1, 1)"); @@ -262,8 +187,8 @@ describe("formatOperatorResult - visualization rows", () => { }); describe("jsonToTableFormat - cell coercion via formatOperatorResult", () => { - function tableLines(opInfo: Partial): string[] { - const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo }), EMPTY_STATE); + function tableLines(opInfo: OpInfoOverrides): string[] { + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, ...opInfo })); // Skip brief summary + shape line. return out.split("\n").slice(2); } @@ -305,8 +230,7 @@ describe("jsonToTableFormat - row index gaps", () => { { __row_index__: 0, v: "a" }, { __row_index__: 5, v: "b" }, ], - }), - EMPTY_STATE + }) ); const lines = out.split("\n"); // header, row0, gap marker, row5 @@ -325,18 +249,13 @@ describe("jsonToTableFormat - row index gaps", () => { { __row_index__: 0, v: "a" }, { __row_index__: 1, v: "b" }, ], - }), - EMPTY_STATE + }) ); expect(out).not.toContain("...\t..."); }); test("non-zero starting __row_index__ does not emit a leading gap marker", () => { - const out = formatOperatorResult( - "op1", - makeOpInfo({ outputTuples: 1, result: [{ __row_index__: 9, v: "z" }] }), - EMPTY_STATE - ); + const out = formatOperatorResult("op1", makeOpInfo({ outputTuples: 1, result: [{ __row_index__: 9, v: "z" }] })); expect(out).not.toContain("...\t..."); expect(out.endsWith("9\tz")).toBe(true); }); diff --git a/agent-service/src/agent/tools/result-formatting.ts b/agent-service/src/agent/tools/result-formatting.ts index 5ed4aacc5d4..58757c4b098 100644 --- a/agent-service/src/agent/tools/result-formatting.ts +++ b/agent-service/src/agent/tools/result-formatting.ts @@ -17,100 +17,63 @@ * under the License. */ -import type { OperatorInfo } from "../../types/execution"; -import type { WorkflowState } from "../workflow-state"; -import { formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import type { OperatorExecutionSummary, SampleRow } from "../../types/execution"; +import { formatExecuteOperatorResult, getOperatorWarnings, getVisibleResultHeaders } from "./tools-utility"; -export function formatOperatorResult(operatorId: string, opInfo: OperatorInfo, workflowState: WorkflowState): string { - if (opInfo.error) { - return `[ERROR] ${opInfo.error}`; +export function formatOperatorResult(operatorId: string, opInfo: OperatorExecutionSummary): string { + const errorText = opInfo.errorMessages.map(e => e.message).join("; "); + if (errorText) { + return `[ERROR] ${errorText}`; } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; + const headers = sampleTuples.length > 0 ? getVisibleResultHeaders(sampleTuples[0].tuple) : []; const columns = headers.length; - const isViz = jsonArray.length > 0 && jsonArray[0]["__is_visualization__"] === true; - const serializableArray = isViz - ? jsonArray.map(row => { + const isViz = sampleTuples.length > 0 && sampleTuples[0].tuple["__is_visualization__"] === true; + const rows: SampleRow[] = isViz + ? sampleTuples.map(({ rowIndex, tuple }) => { const cleaned: Record = {}; - for (const key of Object.keys(row)) { + for (const key of Object.keys(tuple)) { if (key === "__is_visualization__") continue; if (key === "html-content" || key === "json-content") { cleaned[key] = ""; } else { - cleaned[key] = row[key]; + cleaned[key] = tuple[key]; } } - return cleaned; + return { rowIndex, tuple: cleaned }; }) - : jsonArray; + : sampleTuples; - const dataString = jsonToTableFormat(serializableArray); + const dataString = jsonToTableFormat(rows); - const metadataLines = [ - formatInputOutputMetadata(workflowState, operatorId, opInfo, columns), - ...(opInfo.warnings ?? []), - ].filter(Boolean); + // Output shape only; input-port shapes are derivable by the agent from the DAG + // links plus each upstream operator's own output shape shown in context. + const outputRows = opInfo.resultSummary?.totalRowCount ?? 0; + const metadataLines = [`Output table shape: (${outputRows}, ${columns})`, ...getOperatorWarnings(opInfo)].filter( + Boolean + ); const briefSummary = formatExecuteOperatorResult(operatorId); return [briefSummary, ...metadataLines, dataString].filter(Boolean).join("\n"); } -function formatInputOutputMetadata( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; - - const hasRowIndex = "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; const headerLine = "\t" + headers.join("\t"); const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -118,7 +81,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); diff --git a/agent-service/src/agent/tools/tools-utility.test.ts b/agent-service/src/agent/tools/tools-utility.spec.ts similarity index 100% rename from agent-service/src/agent/tools/tools-utility.test.ts rename to agent-service/src/agent/tools/tools-utility.spec.ts diff --git a/agent-service/src/agent/tools/tools-utility.ts b/agent-service/src/agent/tools/tools-utility.ts index 6c9ab004f6e..421d1e0dbde 100644 --- a/agent-service/src/agent/tools/tools-utility.ts +++ b/agent-service/src/agent/tools/tools-utility.ts @@ -17,12 +17,20 @@ * under the License. */ +import type { OperatorExecutionSummary } from "../../types/execution"; + export const INTERNAL_RESULT_KEYS: ReadonlySet = new Set(["__row_index__", "__is_visualization__"]); export function getVisibleResultHeaders(row: Record): string[] { return Object.keys(row).filter(k => !INTERNAL_RESULT_KEYS.has(k)); } +// Warnings are the console messages the engine tags with a "WARNING: " title +// prefix; derive them rather than carrying a separate field on the summary. +export function getOperatorWarnings(opInfo: OperatorExecutionSummary): string[] { + return (opInfo.consoleLogsSummary?.messages ?? []).filter(m => m.title.startsWith("WARNING: ")).map(m => m.title); +} + export function createToolResult(message: string): string { return message; } diff --git a/agent-service/src/agent/tools/workflow-execution-tools.ts b/agent-service/src/agent/tools/workflow-execution-tools.ts index 78c6cfa3d55..c52e92cbb21 100644 --- a/agent-service/src/agent/tools/workflow-execution-tools.ts +++ b/agent-service/src/agent/tools/workflow-execution-tools.ts @@ -19,12 +19,17 @@ import { z } from "zod"; import { tool } from "ai"; -import { createErrorResult, formatExecuteOperatorResult, getVisibleResultHeaders } from "./tools-utility"; +import { + createErrorResult, + formatExecuteOperatorResult, + getOperatorWarnings, + getVisibleResultHeaders, +} from "./tools-utility"; import type { WorkflowState } from "../workflow-state"; import { getBackendConfig } from "../../api/backend-api"; import { env } from "../../config/env"; import type { LogicalPlan, LogicalLink } from "../../api/execution-api"; -import type { OperatorInfo, SyncExecutionResult } from "../../types/execution"; +import type { OperatorExecutionSummary, SampleRow, WorkflowExecutionSummary } from "../../types/execution"; import { WorkflowSystemMetadata } from "../util/workflow-system-metadata"; import { DEFAULT_AGENT_SETTINGS } from "../../types/agent"; import { createLogger } from "../../logger"; @@ -255,7 +260,7 @@ async function executeWorkflowHttp( config: ExecutionConfig, logicalPlan: LogicalPlan, options: { abortSignal?: AbortSignal } = {} -): Promise { +): Promise { const backendConfig = getBackendConfig(); const workflowId = config.workflowId; @@ -312,7 +317,7 @@ async function executeWorkflowHttp( throw new Error(`Execution request failed: ${response.status} ${response.statusText} - ${errorText}`); } - return (await response.json()) as SyncExecutionResult; + return (await response.json()) as WorkflowExecutionSummary; } catch (error) { if (error instanceof Error && error.name === "AbortError") { throw error; @@ -327,55 +332,12 @@ async function executeWorkflowHttp( } } -function formatInputOutput( - workflowState: WorkflowState, - operatorId: string, - opInfo: OperatorInfo, - outputColumns: number -): string { - const outputRows = opInfo.totalRowCount ?? opInfo.outputTuples; - const outputLine = `Output table shape: (${outputRows}, ${outputColumns})`; - - const inputShapes = opInfo.inputPortShapes; - if (!inputShapes || inputShapes.length === 0) { - return outputLine; - } - - const inputLinks = workflowState.getAllLinks().filter(l => l.target.operatorID === operatorId); - const portIndexToUpstream = new Map(); - const op = workflowState.getOperator(operatorId); - for (const link of inputLinks) { - const portIdx = op?.inputPorts.findIndex(p => p.portID === link.target.portID) ?? -1; - if (portIdx >= 0) { - portIndexToUpstream.set(portIdx, link.source.operatorID); - } - } - - const inputPart = inputShapes - .sort((a, b) => a.portIndex - b.portIndex) - .map(p => { - const name = portIndexToUpstream.get(p.portIndex) ?? `input${p.portIndex}`; - return `${name}(${p.rows}, ${p.columns})`; - }) - .join(", "); - - return `Input operator(table shape): ${inputPart}\n${outputLine}`; -} - function formatExecutionError( - compilationErrors?: Record, operatorErrors?: Array<{ operatorId: string; error: string }>, generalErrors?: string[] ): string { const lines: string[] = ["Execution failed due to the following error:"]; - if (compilationErrors && Object.keys(compilationErrors).length > 0) { - lines.push("Compilation error:"); - for (const [key, value] of Object.entries(compilationErrors)) { - lines.push(` ${key}: ${value}`); - } - } - if (operatorErrors && operatorErrors.length > 0) { lines.push("Execution error:"); for (const { operatorId, error } of operatorErrors) { @@ -393,11 +355,10 @@ function formatExecutionError( return lines.join("\n"); } -function jsonToTableFormat(jsonResult: Record[]): string { - if (!jsonResult || jsonResult.length === 0) return ""; +function jsonToTableFormat(rows: SampleRow[]): string { + if (!rows || rows.length === 0) return ""; - const hasRowIndex = jsonResult.length > 0 && "__row_index__" in jsonResult[0]; - const headers = getVisibleResultHeaders(jsonResult[0]); + const headers = getVisibleResultHeaders(rows[0].tuple); if (headers.length === 0) return ""; // Leading tab aligns headers with the index column (pandas __repr__ style). const headerLine = "\t" + headers.join("\t"); @@ -405,10 +366,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { const formattedRows: string[] = []; let prevIndex = -1; - for (let i = 0; i < jsonResult.length; i++) { - const row = jsonResult[i]; - const rowIndex = hasRowIndex ? (row["__row_index__"] as number) : i; - + for (const { rowIndex, tuple } of rows) { if (prevIndex >= 0 && rowIndex > prevIndex + 1) { const dots = headers.map(() => "...").join("\t"); formattedRows.push(`...\t${dots}`); @@ -416,7 +374,7 @@ function jsonToTableFormat(jsonResult: Record[]): string { prevIndex = rowIndex; const cells = headers.map(h => { - const val = row[h]; + const val = tuple[h]; if (val === null) return "NaN"; if (val === undefined) return ""; if (typeof val === "number" || typeof val === "boolean") return String(val); @@ -438,7 +396,7 @@ export async function executeOperatorAndFormat( operatorId: string, options: { abortSignal?: AbortSignal; - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void; + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void; onResultLegacy?: (operatorId: string, backendStats?: Record) => void; } = {} ): Promise { @@ -466,34 +424,26 @@ export async function executeOperatorAndFormat( } } - const result: SyncExecutionResult = await executeWorkflowHttp(config, logicalPlan, { + const result: WorkflowExecutionSummary = await executeWorkflowHttp(config, logicalPlan, { abortSignal: options.abortSignal, }); if (!result.success) { - const compilationErrors = - result.state === "CompilationFailed" || result.state === "ValidationFailed" - ? result.compilationErrors - : undefined; - const operatorErrors = result.state === "Failed" ? Object.entries(result.operators) - .filter(([_, op]) => op.error) - .map(([opId, op]) => ({ operatorId: opId, error: op.error! })) + .filter(([_, op]) => op.errorMessages.length) + .map(([opId, op]) => ({ operatorId: opId, error: op.errorMessages.map(e => e.message).join("; ") })) : undefined; const generalErrors = result.state === "Killed" ? ["Workflow execution was killed (timeout)."] : result.errors; - const errorText = formatExecutionError(compilationErrors, operatorErrors, generalErrors); + const errorText = formatExecutionError(operatorErrors, generalErrors); if (options.onResult) { - const errorInfo: OperatorInfo = { - state: result.state, - inputTuples: 0, - outputTuples: 0, - resultMode: "table", - error: errorText, + const errorInfo: OperatorExecutionSummary = { + state: "Failed", + errorMessages: [{ type: "EXECUTION_FAILURE", message: errorText, operatorId }], }; options.onResult(operatorId, errorInfo); } @@ -503,36 +453,35 @@ export async function executeOperatorAndFormat( const opInfo = result.operators[operatorId]; if (!opInfo) { - return createErrorResult( - formatExecutionError(undefined, undefined, [`No result found for operator: ${operatorId}`]) - ); + return createErrorResult(formatExecutionError(undefined, [`No result found for operator: ${operatorId}`])); } - if (opInfo.error) { + if (opInfo.errorMessages.length) { if (options.onResult) { options.onResult(operatorId, opInfo); } - return createErrorResult(formatExecutionError(undefined, [{ operatorId, error: opInfo.error }])); + const opError = opInfo.errorMessages.map(e => e.message).join("; "); + return createErrorResult(formatExecutionError([{ operatorId, error: opError }])); } - if (!opInfo.result || !Array.isArray(opInfo.result)) { + const sampleTuples = opInfo.resultSummary?.sampleTuples; + if (!sampleTuples || !Array.isArray(sampleTuples)) { return "(no result data)"; } - const jsonArray = opInfo.result as Record[]; - const headers = jsonArray.length > 0 ? getVisibleResultHeaders(jsonArray[0]) : []; + const headers = sampleTuples.length > 0 ? getVisibleResultHeaders(sampleTuples[0].tuple) : []; const columns = headers.length; // Notify for every operator in the execution so upstream stats are also stored. if (options.onResult) { for (const [opId, info] of Object.entries(result.operators)) { - if (info && !info.error) { + if (info && !info.errorMessages.length) { options.onResult(opId, info); } } } - let dataString = jsonToTableFormat(jsonArray); + let dataString = jsonToTableFormat(sampleTuples); // Safety-net: TSV serialization may add padding beyond backend's raw-record budget. const charLimit = config.maxOperatorResultCharLimit ?? DEFAULT_AGENT_SETTINGS.maxOperatorResultCharLimit; @@ -568,9 +517,11 @@ export async function executeOperatorAndFormat( dataString = [headerLine, ...keptRows].join("\n"); } - const shapeLine = formatInputOutput(workflowState, operatorId, opInfo, columns); + // Output shape only; the agent derives input-port shapes from the DAG + the + // upstream operators' own output shapes shown in context. + const shapeLine = `Output table shape: (${opInfo.resultSummary?.totalRowCount ?? 0}, ${columns})`; - const warningLines = opInfo.warnings?.map(w => w) ?? []; + const warningLines = getOperatorWarnings(opInfo); const metadataLines = [shapeLine, ...warningLines].filter(Boolean); @@ -589,7 +540,7 @@ export async function executeOperatorAndFormat( export function createExecuteOperatorTool( workflowState: WorkflowState, getConfig: () => ExecutionConfig, - onResult?: (operatorId: string, operatorInfo: OperatorInfo) => void + onResult?: (operatorId: string, operatorInfo: OperatorExecutionSummary) => void ) { return tool({ description: diff --git a/agent-service/src/agent/util/auto-layout.test.ts b/agent-service/src/agent/util/auto-layout.spec.ts similarity index 100% rename from agent-service/src/agent/util/auto-layout.test.ts rename to agent-service/src/agent/util/auto-layout.spec.ts diff --git a/agent-service/src/agent/util/context-utils.ts b/agent-service/src/agent/util/context-utils.ts index 195692cbf50..04e801e3aa7 100644 --- a/agent-service/src/agent/util/context-utils.ts +++ b/agent-service/src/agent/util/context-utils.ts @@ -25,7 +25,7 @@ import type { ModelMessage } from "ai"; import type { WorkflowState } from "../workflow-state"; import type { OperatorPredicate, OperatorPortSchemaMap, PortSchema } from "../../types/workflow"; import type { ReActStep } from "../../types/agent"; -import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../api/compile-api"; +import type { WorkflowCompilationResponse, WorkflowFatalError } from "../../types/dto"; import { extractOperatorInputPortSchemaMap } from "./workflow-utils"; import { createLogger } from "../../logger"; diff --git a/agent-service/src/agent/util/workflow-system-metadata.spec.ts b/agent-service/src/agent/util/workflow-system-metadata.spec.ts new file mode 100644 index 00000000000..cc2976850f4 --- /dev/null +++ b/agent-service/src/agent/util/workflow-system-metadata.spec.ts @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { + WorkflowSystemMetadata, + formatValidationErrors, + formatCompactSchemaForError, +} from "./workflow-system-metadata"; +import type { OperatorMetadata, OperatorSchema } from "../../types/metadata"; + +function operator(overrides: Partial & Pick): OperatorSchema { + return { + operatorVersion: "1", + jsonSchema: {}, + additionalMetadata: { + userFriendlyName: overrides.operatorType, + operatorGroupName: "Test", + inputPorts: [], + outputPorts: [], + }, + ...overrides, + }; +} + +function metadataWith(...operators: OperatorSchema[]): OperatorMetadata { + return { operators, groups: [] }; +} + +// Filter exercises ref-inlining + key filtering; Limit is a clean Ajv-validatable schema. +const FILTER = operator({ + operatorType: "Filter", + additionalMetadata: { + userFriendlyName: "Filter", + operatorGroupName: "Filter", + inputPorts: [], + outputPorts: [], + operatorDescription: "Filters rows", + }, + jsonSchema: { + properties: { + attribute: { $ref: "#/definitions/AttributeName" }, + limit: { type: "integer", propertyOrder: 5 }, + dummyPropertyList: { type: "array" }, + }, + definitions: { + AttributeName: { type: "string", title: "Attribute" }, + PortDescription: { type: "object" }, + }, + required: ["attribute"], + }, +}); + +const LIMIT = operator({ + operatorType: "Limit", + jsonSchema: { type: "object", properties: { limit: { type: "integer" } }, required: ["limit"] }, +}); + +function loaded(...operators: OperatorSchema[]): WorkflowSystemMetadata { + const meta = new WorkflowSystemMetadata(); + meta.loadFromMetadata(metadataWith(...operators)); + return meta; +} + +describe("WorkflowSystemMetadata.loadFromMetadata", () => { + test("indexes operators by type", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getOperatorCount()).toBe(2); + expect(meta.operatorTypeExists("Filter")).toBe(true); + expect(meta.operatorTypeExists("Nope")).toBe(false); + expect(meta.getSchema("Filter")).toEqual(FILTER.jsonSchema); + expect(meta.getAllOperatorTypes()).toEqual({ Filter: "Filters rows", Limit: "Limit" }); + }); + + test("getDescription falls back to userFriendlyName when no description", () => { + const meta = loaded(FILTER, LIMIT); + expect(meta.getDescription("Filter")).toBe("Filters rows"); + expect(meta.getDescription("Limit")).toBe("Limit"); + expect(meta.getDescription("Unknown")).toBe(""); + }); +}); + +describe("WorkflowSystemMetadata.getCompactSchema", () => { + test("returns null for an unknown operator type", () => { + expect(loaded(FILTER).getCompactSchema("Nope")).toBeNull(); + }); + + test("inlines $refs, strips noise keys, and drops filtered properties", () => { + const compact = loaded(FILTER).getCompactSchema("Filter"); + expect(compact).not.toBeNull(); + // $ref resolved to the AttributeName definition. + expect(compact!.properties.attribute).toEqual({ type: "string", title: "Attribute" }); + // propertyOrder is in COMPACT_SCHEMA_EXCLUDED_KEYS and is stripped. + expect(compact!.properties.limit).toEqual({ type: "integer" }); + // dummyPropertyList is in FILTERED_PROPERTY_KEYS and is removed. + expect(compact!.properties).not.toHaveProperty("dummyPropertyList"); + expect(compact!.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.getAllSchemasAsJson", () => { + test("emits filtered properties and definitions as JSON", () => { + const parsed = JSON.parse(loaded(FILTER).getAllSchemasAsJson()); + expect(Object.keys(parsed.Filter.properties)).toEqual(["attribute", "limit"]); // dummyPropertyList filtered + expect(parsed.Filter.definitions).toHaveProperty("AttributeName"); + expect(parsed.Filter.definitions).not.toHaveProperty("PortDescription"); // filtered definition + expect(parsed.Filter.required).toEqual(["attribute"]); + }); +}); + +describe("WorkflowSystemMetadata.validateOperatorProperties", () => { + test("accepts properties that satisfy the schema", () => { + expect(loaded(LIMIT).validateOperatorProperties("Limit", { limit: 5 })).toEqual({ isValid: true }); + }); + + test("reports the missing required property", () => { + const result = loaded(LIMIT).validateOperatorProperties("Limit", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? {} : result.messages).toHaveProperty("limit"); + }); + + test("rejects an unknown operator type", () => { + const result = loaded(LIMIT).validateOperatorProperties("Nope", {}); + expect(result.isValid).toBe(false); + expect(result.isValid ? "" : result.messages.error).toContain("Unknown operator type"); + }); +}); + +describe("formatValidationErrors", () => { + test("returns empty string when valid", () => { + expect(formatValidationErrors({ isValid: true })).toBe(""); + }); + + test("joins messages as 'key: msg'", () => { + expect(formatValidationErrors({ isValid: false, messages: { limit: "is required", attribute: "bad" } })).toBe( + "limit: is required; attribute: bad" + ); + }); +}); + +describe("formatCompactSchemaForError", () => { + test("renders only the required properties", () => { + const formatted = formatCompactSchemaForError({ + properties: { a: { type: "string" }, b: { type: "integer" } }, + required: ["a"], + }); + expect(formatted).toBe('required: [a], properties: {"a":{"type":"string"}}'); + }); +}); diff --git a/agent-service/src/agent/util/workflow-system-metadata.ts b/agent-service/src/agent/util/workflow-system-metadata.ts index 9269a0cff7c..da4fcdf3630 100644 --- a/agent-service/src/agent/util/workflow-system-metadata.ts +++ b/agent-service/src/agent/util/workflow-system-metadata.ts @@ -20,23 +20,13 @@ import Ajv from "ajv"; import { fetchOperatorMetadata, type OperatorSchema, type OperatorMetadata } from "../../api/backend-api"; import type { ValidationError, Validation } from "../../types/workflow"; +import type { OperatorSchemaInfo, CompactOperatorSchema } from "../../types/metadata"; import { createLogger } from "../../logger"; const log = createLogger("WorkflowSystemMetadata"); export type { ValidationError, Validation } from "../../types/workflow"; -interface OperatorSchemaInfo { - properties: any; - required: any; - definitions: any; -} - -interface CompactOperatorSchema { - properties: Record; - required: string[]; -} - const FILTERED_PROPERTY_KEYS = ["dummyPropertyList"]; const FILTERED_DEFINITION_KEYS = [ diff --git a/agent-service/src/agent/workflow-result-state.test.ts b/agent-service/src/agent/workflow-result-state.spec.ts similarity index 79% rename from agent-service/src/agent/workflow-result-state.test.ts rename to agent-service/src/agent/workflow-result-state.spec.ts index b2e46fd0d9e..00f501087ba 100644 --- a/agent-service/src/agent/workflow-result-state.test.ts +++ b/agent-service/src/agent/workflow-result-state.spec.ts @@ -19,14 +19,13 @@ import { describe, expect, test } from "bun:test"; import { WorkflowResultState } from "./workflow-result-state"; -import type { OperatorInfo } from "../types/execution"; +import type { OperatorExecutionSummary } from "../types/execution"; -function makeInfo(outputTuples: number): OperatorInfo { +function makeInfo(totalRowCount: number): OperatorExecutionSummary { return { state: "Completed", - inputTuples: 0, - outputTuples, - resultMode: "table", + errorMessages: [], + resultSummary: { resultMode: "table", sampleTuples: [], totalRowCount }, }; } @@ -40,15 +39,15 @@ describe("WorkflowResultState - ancestor walk", () => { state.set("op1", "step-C", makeInfo(3)); path = ["step-A", "step-B", "step-C"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(3); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(3); // Rewind to step-B; step-C is no longer an ancestor. path = ["step-A", "step-B"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(2); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(2); // Rewind further. path = ["step-A"]; - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(1); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1); }); test("returns undefined when no ancestor has a result", () => { @@ -74,8 +73,8 @@ describe("WorkflowResultState - ancestor walk", () => { path = ["step-A", "step-B"]; const visible = state.getAllVisible(); expect(visible.size).toBe(2); - expect(visible.get("op1")?.operatorInfo.outputTuples).toBe(1); - expect(visible.get("op2")?.operatorInfo.outputTuples).toBe(7); + expect(visible.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(1); + expect(visible.get("op2")?.operatorInfo.resultSummary?.totalRowCount).toBe(7); }); test("clear drops all stored results", () => { @@ -90,6 +89,6 @@ describe("WorkflowResultState - ancestor walk", () => { const state = new WorkflowResultState(() => ["step-A"]); state.set("op1", "step-A", makeInfo(1)); state.set("op1", "step-A", makeInfo(42)); - expect(state.get("op1")?.operatorInfo.outputTuples).toBe(42); + expect(state.get("op1")?.operatorInfo.resultSummary?.totalRowCount).toBe(42); }); }); diff --git a/agent-service/src/agent/workflow-result-state.ts b/agent-service/src/agent/workflow-result-state.ts index e6f13c2301a..34d604657e3 100644 --- a/agent-service/src/agent/workflow-result-state.ts +++ b/agent-service/src/agent/workflow-result-state.ts @@ -17,10 +17,10 @@ * under the License. */ -import type { OperatorInfo } from "../types/execution"; +import type { OperatorExecutionSummary } from "../types/execution"; interface ResultEntry { - operatorInfo: OperatorInfo; + operatorInfo: OperatorExecutionSummary; stepId: string; } @@ -37,7 +37,7 @@ export class WorkflowResultState { constructor(private getAncestorPath: () => string[]) {} - set(operatorId: string, stepId: string, operatorInfo: OperatorInfo): void { + set(operatorId: string, stepId: string, operatorInfo: OperatorExecutionSummary): void { let versions = this.results.get(operatorId); if (!versions) { versions = new Map(); @@ -58,7 +58,7 @@ export class WorkflowResultState { return undefined; } - getOperatorInfo(operatorId: string): OperatorInfo | undefined { + getOperatorInfo(operatorId: string): OperatorExecutionSummary | undefined { return this.get(operatorId)?.operatorInfo; } diff --git a/agent-service/src/agent/workflow-state.test.ts b/agent-service/src/agent/workflow-state.spec.ts similarity index 100% rename from agent-service/src/agent/workflow-state.test.ts rename to agent-service/src/agent/workflow-state.spec.ts diff --git a/agent-service/src/api/backend-api.ts b/agent-service/src/api/backend-api.ts index ffd2c59433f..56e5ada4d64 100644 --- a/agent-service/src/api/backend-api.ts +++ b/agent-service/src/api/backend-api.ts @@ -18,6 +18,16 @@ */ import { env } from "../config/env"; +import type { OperatorMetadata } from "../types/metadata"; + +export type { + InputPortInfo, + OutputPortInfo, + OperatorAdditionalMetadata, + OperatorSchema, + GroupInfo, + OperatorMetadata, +} from "../types/metadata"; interface BackendConfig { apiEndpoint: string; @@ -37,45 +47,6 @@ export function getBackendConfig(): BackendConfig { return { ...currentConfig }; } -export interface InputPortInfo { - displayName?: string; - disallowMultiLinks?: boolean; - dependencies?: { id: number; internal: boolean }[]; -} - -export interface OutputPortInfo { - displayName?: string; -} - -interface OperatorAdditionalMetadata { - userFriendlyName: string; - operatorGroupName: string; - operatorDescription?: string; - inputPorts: InputPortInfo[]; - outputPorts: OutputPortInfo[]; - dynamicInputPorts?: boolean; - dynamicOutputPorts?: boolean; - supportReconfiguration?: boolean; - allowPortCustomization?: boolean; -} - -export interface OperatorSchema { - operatorType: string; - jsonSchema: any; - additionalMetadata: OperatorAdditionalMetadata; - operatorVersion: string; -} - -interface GroupInfo { - groupName: string; - children?: GroupInfo[] | null; -} - -export interface OperatorMetadata { - operators: OperatorSchema[]; - groups: GroupInfo[]; -} - export async function fetchOperatorMetadata(): Promise { const url = `${currentConfig.apiEndpoint}/api/resources/operator-metadata`; const response = await fetch(url); diff --git a/agent-service/src/api/compile-api.spec.ts b/agent-service/src/api/compile-api.spec.ts new file mode 100644 index 00000000000..270463db20a --- /dev/null +++ b/agent-service/src/api/compile-api.spec.ts @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { compileWorkflowAsync } from "./compile-api"; +import type { LogicalPlan } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +const plan: LogicalPlan = { + operators: [{ operatorID: "op1", operatorType: "Filter" }], + links: [], +}; + +describe("compileWorkflowAsync", () => { + test("POSTs to /api/compile and returns the parsed compilation response", async () => { + // operatorErrors uses the proto-accurate WorkflowFatalError shape (type is the enum name string). + const responseBody = { + physicalPlan: { nodes: [] }, + operatorOutputSchemas: {}, + operatorErrors: { + op1: { + type: "COMPILATION_ERROR", + message: "bad attribute", + details: "stack", + operatorId: "op1", + workerId: "", + timestamp: { seconds: 1, nanos: 0 }, + }, + }, + }; + const fn = mock(async () => ({ + ok: true, + status: 200, + statusText: "OK", + json: async () => responseBody, + text: async () => "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + const result = await compileWorkflowAsync(plan); + + const [url, init] = fn.mock.calls[0] as unknown as [string, RequestInit]; + expect(url).toEndWith("/api/compile"); + expect(init.method).toBe("POST"); + expect(JSON.parse(init.body as string)).toEqual({ + operators: plan.operators, + links: plan.links, + opsToReuseResult: [], + opsToViewResult: [], + }); + expect(result).not.toBeNull(); + expect(result!.operatorErrors.op1.type).toBe("COMPILATION_ERROR"); + expect(result!.operatorErrors.op1.message).toBe("bad attribute"); + }); + + test("returns null on a non-ok response", async () => { + const fn = mock(async () => ({ + ok: false, + status: 400, + statusText: "Bad Request", + json: async () => ({}), + text: async () => "compile error", + })); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); + + test("returns null when the request throws", async () => { + const fn = mock(async () => { + throw new Error("network down"); + }); + globalThis.fetch = fn as unknown as typeof fetch; + + expect(await compileWorkflowAsync(plan)).toBeNull(); + }); +}); diff --git a/agent-service/src/api/compile-api.ts b/agent-service/src/api/compile-api.ts index 8ffd27fd52c..defd02344a1 100644 --- a/agent-service/src/api/compile-api.ts +++ b/agent-service/src/api/compile-api.ts @@ -18,7 +18,8 @@ */ import { getBackendConfig } from "./backend-api"; -import type { LogicalPlan, OperatorPortSchemaMap } from "../types/workflow"; +import type { LogicalPlan } from "../types/workflow"; +import type { WorkflowCompilationResponse } from "../types/dto"; import { createLogger } from "../logger"; const log = createLogger("CompileAPI"); @@ -30,18 +31,6 @@ export interface SchemaAttribute { export type PortSchema = ReadonlyArray; -export interface WorkflowFatalError { - type: string; - message: string; - operatorId?: string; -} - -export interface WorkflowCompilationResponse { - physicalPlan?: any; - operatorOutputSchemas: Record; - operatorErrors: Record; -} - export async function compileWorkflowAsync(logicalPlan: LogicalPlan): Promise { const config = getBackendConfig(); const url = `${config.compileEndpoint}/api/compile`; diff --git a/agent-service/src/api/index.ts b/agent-service/src/api/index.ts index eca292d7ffe..1efba63ed67 100644 --- a/agent-service/src/api/index.ts +++ b/agent-service/src/api/index.ts @@ -20,5 +20,5 @@ export * from "./backend-api"; export * from "./execution-api"; export * from "./workflow-api"; -export * from "./auth-api"; +export * from "../auth/jwt"; export * from "./compile-api"; diff --git a/agent-service/src/api/workflow-api.spec.ts b/agent-service/src/api/workflow-api.spec.ts new file mode 100644 index 00000000000..8a8ecb3ed69 --- /dev/null +++ b/agent-service/src/api/workflow-api.spec.ts @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { afterEach, describe, expect, mock, test } from "bun:test"; +import { persistWorkflow, retrieveWorkflow } from "./workflow-api"; +import type { WorkflowContent } from "../types/workflow"; + +const realFetch = globalThis.fetch; +afterEach(() => { + globalThis.fetch = realFetch; +}); + +interface FakeResponseInit { + ok: boolean; + status?: number; + statusText?: string; + json?: unknown; + text?: string; +} + +function mockFetch(init: FakeResponseInit) { + const fn = mock(async () => ({ + ok: init.ok, + status: init.status ?? (init.ok ? 200 : 500), + statusText: init.statusText ?? "", + json: async () => init.json, + text: async () => init.text ?? "", + })); + globalThis.fetch = fn as unknown as typeof fetch; + return fn; +} + +const content: WorkflowContent = { + operators: [], + operatorPositions: {}, + links: [], + commentBoxes: [], + settings: { dataTransferBatchSize: 400 }, +}; + +function lastCall(fn: ReturnType): [string, RequestInit] { + return fn.mock.calls[0] as unknown as [string, RequestInit]; +} + +describe("persistWorkflow", () => { + test("POSTs to /workflow/persist with bearer auth and a stringified content body", async () => { + const fn = mockFetch({ ok: true, json: { wid: 1, name: "wf", content: JSON.stringify(content) } }); + + const result = await persistWorkflow("tok", 1, "wf", content, "desc"); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/persist"); + expect(init.method).toBe("POST"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(JSON.parse(init.body as string)).toEqual({ + wid: 1, + name: "wf", + description: "desc", + content: JSON.stringify(content), + isPublic: false, + }); + // The stringified content in the response is parsed back into an object. + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 500, statusText: "Server Error", text: "boom" }); + expect(persistWorkflow("tok", 1, "wf", content)).rejects.toThrow("Failed to persist workflow: 500"); + }); +}); + +describe("retrieveWorkflow", () => { + test("GETs /workflow/:wid with bearer auth and parses stringified content", async () => { + const fn = mockFetch({ ok: true, json: { wid: 7, name: "wf", content: JSON.stringify(content) } }); + + const result = await retrieveWorkflow("tok", 7); + + const [url, init] = lastCall(fn); + expect(url).toEndWith("/api/workflow/7"); + expect(init.method).toBe("GET"); + expect((init.headers as Record).Authorization).toBe("Bearer tok"); + expect(result.content).toEqual(content); + }); + + test("leaves an already-parsed content object untouched", async () => { + mockFetch({ ok: true, json: { wid: 7, name: "wf", content } }); + const result = await retrieveWorkflow("tok", 7); + expect(result.content).toEqual(content); + }); + + test("throws with status detail on a non-ok response", () => { + mockFetch({ ok: false, status: 404, statusText: "Not Found", text: "missing" }); + expect(retrieveWorkflow("tok", 7)).rejects.toThrow("Failed to retrieve workflow: 404"); + }); +}); diff --git a/agent-service/src/api/workflow-api.ts b/agent-service/src/api/workflow-api.ts index 7a96f979a1c..39387563b6e 100644 --- a/agent-service/src/api/workflow-api.ts +++ b/agent-service/src/api/workflow-api.ts @@ -18,26 +18,9 @@ */ import { getBackendConfig } from "./backend-api"; -import { createAuthHeaders } from "./auth-api"; +import { createAuthHeaders } from "../auth/jwt"; import type { WorkflowContent } from "../types/workflow"; - -export interface Workflow { - wid: number; - name: string; - description?: string; - content: WorkflowContent; - creationTime?: number; - lastModifiedTime?: number; - isPublished?: boolean; -} - -interface WorkflowPersistRequest { - wid?: number; - name: string; - description?: string; - content: string; - isPublic?: boolean; -} +import type { Workflow, WorkflowPersistRequest } from "../types/dto"; const WORKFLOW_BASE_URL = "workflow"; diff --git a/agent-service/src/auth/jwt.spec.ts b/agent-service/src/auth/jwt.spec.ts new file mode 100644 index 00000000000..4ba403dcb41 --- /dev/null +++ b/agent-service/src/auth/jwt.spec.ts @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { describe, expect, test } from "bun:test"; +import { extractUserFromToken, validateToken, extractBearerToken, createAuthHeaders } from "./jwt"; + +// Encode segments as base64url (no padding, `-`/`_` alphabet) to match real JWTs. +function makeToken(payload: Record): string { + const encode = (o: Record) => Buffer.from(JSON.stringify(o)).toString("base64url"); + return `${encode({ alg: "none", typ: "JWT" })}.${encode(payload)}.signature`; +} + +const nowSeconds = () => Math.floor(Date.now() / 1000); + +describe("extractUserFromToken", () => { + test("maps JWT claims onto a UserInfo", () => { + const token = makeToken({ userId: 7, sub: "alice", email: "alice@example.com", role: "ADMIN" }); + expect(extractUserFromToken(token)).toEqual({ uid: 7, name: "alice", email: "alice@example.com", role: "ADMIN" }); + }); + + test("defaults missing email and role", () => { + const token = makeToken({ userId: 1, sub: "bob" }); + expect(extractUserFromToken(token)).toEqual({ uid: 1, name: "bob", email: "", role: "REGULAR" }); + }); + + test("throws on a malformed token", () => { + expect(() => extractUserFromToken("not-a-jwt")).toThrow("Failed to decode JWT"); + }); + + test("decodes a token whose base64url payload contains -/_ characters", () => { + const token = makeToken({ userId: 9, sub: "a~?>>", email: "x@y.z" }); + // Guard that this case stays meaningful: the payload segment must use the url-safe alphabet. + expect(token.split(".")[1]).toMatch(/[-_]/); + expect(extractUserFromToken(token)).toEqual({ uid: 9, name: "a~?>>", email: "x@y.z", role: "REGULAR" }); + }); +}); + +describe("validateToken", () => { + test("accepts a token expiring in the future", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() + 3600 }))).toBe(true); + }); + + test("rejects an expired token", () => { + expect(validateToken(makeToken({ sub: "a", exp: nowSeconds() - 3600 }))).toBe(false); + }); + + test("treats a token without exp as valid", () => { + expect(validateToken(makeToken({ sub: "a" }))).toBe(true); + }); + + test("rejects a malformed token", () => { + expect(validateToken("garbage")).toBe(false); + }); +}); + +describe("extractBearerToken", () => { + test("extracts the token from a Bearer header", () => { + expect(extractBearerToken("Bearer abc.def.ghi")).toBe("abc.def.ghi"); + }); + + test("is case-insensitive on the scheme", () => { + expect(extractBearerToken("bearer xyz")).toBe("xyz"); + }); + + test("returns undefined for a non-Bearer scheme", () => { + expect(extractBearerToken("Basic abc")).toBeUndefined(); + }); + + test("returns undefined when the token is missing", () => { + expect(extractBearerToken("Bearer")).toBeUndefined(); + }); + + test("returns undefined for an absent header", () => { + expect(extractBearerToken(undefined)).toBeUndefined(); + }); +}); + +describe("createAuthHeaders", () => { + test("builds bearer auth headers", () => { + expect(createAuthHeaders("tok")).toEqual({ Authorization: "Bearer tok", "Content-Type": "application/json" }); + }); +}); diff --git a/agent-service/src/api/auth-api.ts b/agent-service/src/auth/jwt.ts similarity index 100% rename from agent-service/src/api/auth-api.ts rename to agent-service/src/auth/jwt.ts diff --git a/agent-service/src/config/endpoints.ts b/agent-service/src/config/endpoints.ts new file mode 100644 index 00000000000..cab8c7ff20c --- /dev/null +++ b/agent-service/src/config/endpoints.ts @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { env } from "./env"; + +/** Base URLs of the backend services this agent service talks to. */ +export interface ServiceEndpoints { + apiEndpoint: string; + modelsEndpoint: string; + compileEndpoint: string; + executionEndpoint: string; +} + +const endpoints: ServiceEndpoints = { + apiEndpoint: env.TEXERA_DASHBOARD_SERVICE_ENDPOINT, + modelsEndpoint: env.LLM_ENDPOINT, + compileEndpoint: env.WORKFLOW_COMPILING_SERVICE_ENDPOINT, + executionEndpoint: env.WORKFLOW_EXECUTION_SERVICE_ENDPOINT, +}; + +export function getServiceEndpoints(): ServiceEndpoints { + return { ...endpoints }; +} diff --git a/agent-service/src/server.test.ts b/agent-service/src/server.spec.ts similarity index 100% rename from agent-service/src/server.test.ts rename to agent-service/src/server.spec.ts diff --git a/agent-service/src/server.ts b/agent-service/src/server.ts index 0da3f693797..f98d06337c6 100644 --- a/agent-service/src/server.ts +++ b/agent-service/src/server.ts @@ -21,9 +21,8 @@ import { Elysia, t } from "elysia"; import { cors } from "@elysiajs/cors"; import { createOpenAI } from "@ai-sdk/openai"; import { TexeraAgent } from "./agent/texera-agent"; -import { getVisibleResultHeaders } from "./agent/tools/tools-utility"; import { getBackendConfig } from "./api/backend-api"; -import { extractBearerToken, extractUserFromToken, validateToken } from "./api/auth-api"; +import { extractBearerToken, extractUserFromToken, validateToken } from "./auth/jwt"; import { retrieveWorkflow } from "./api/workflow-api"; import { WorkflowSystemMetadata } from "./agent/util/workflow-system-metadata"; import { env } from "./config/env"; @@ -40,6 +39,8 @@ import type { ReActStep, } from "./types/agent"; import { OperatorResultSerializationMode } from "./types/agent"; +import type { WsClientRequest, WsServerMessage, WsServerInitMessage } from "./types/ws"; +import type { OperatorExecutionSummary } from "./types/execution"; const agentStore = new Map(); let agentCounter = 0; @@ -410,61 +411,17 @@ const agentsRouter = new Elysia({ prefix: "/agents" }) } ); -interface WsMessage { - type: "message" | "stop"; - content?: string; - messageSource?: "chat" | "feedback"; -} - -interface OperatorResultSummaryWs { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; - outputColumns?: number; - error?: string; - warnings?: string[]; - consoleLogCount?: number; - totalRowCount?: number; - sampleRecords?: Record[]; - resultStatistics?: Record; -} - -interface WsOutgoingMessage { - type: "step" | "state" | "error" | "complete" | "init" | "headChange"; - step?: ReActStep; - state?: string; - error?: string; - steps?: ReActStep[]; - headId?: string; - operatorResults?: Record; - workflowContent?: any; -} - -function getOperatorResultSummaries(agent: TexeraAgent): Record { +function getOperatorResultSummaries(agent: TexeraAgent): Record { const resultState = agent.getWorkflowResultState(); const visible = resultState.getAllVisible(); - const results: Record = {}; + const results: Record = {}; for (const [opId, entry] of visible) { - const info = entry.operatorInfo; - results[opId] = { - state: info.state, - inputTuples: info.inputTuples, - outputTuples: info.outputTuples, - inputPortShapes: info.inputPortShapes, - outputColumns: info.result && info.result.length > 0 ? getVisibleResultHeaders(info.result[0]).length : undefined, - error: info.error, - warnings: info.warnings, - consoleLogCount: info.consoleLogs?.length, - totalRowCount: info.totalRowCount, - sampleRecords: info.result, - resultStatistics: info.resultStatistics, - }; + results[opId] = entry.operatorInfo; } return results; } -function broadcastToAgent(agentId: string, message: WsOutgoingMessage): void { +function broadcastToAgent(agentId: string, message: WsServerMessage): void { const agent = agentStore.get(agentId); if (!agent) return; @@ -504,7 +461,7 @@ export function buildApp() { agent.addWebsocket(ws); - const initMessage: WsOutgoingMessage = { + const initMessage: WsServerInitMessage = { type: "init", state: agent.getState(), steps: agent.getAllSteps(), @@ -523,21 +480,23 @@ export function buildApp() { return; } - let msg: WsMessage; + let msg: WsClientRequest; try { - msg = typeof messageData === "string" ? JSON.parse(messageData) : (messageData as WsMessage); + msg = typeof messageData === "string" ? JSON.parse(messageData) : (messageData as WsClientRequest); } catch { ws.send(JSON.stringify({ type: "error", error: "Invalid message format" })); return; } - if (msg.type === "stop") { - agent.stop(); - broadcastToAgent(agentId, { type: "state", state: "STOPPING" }); + if (msg.type === "command") { + if (msg.commandType === "stop") { + agent.stop(); + broadcastToAgent(agentId, { type: "state", state: "STOPPING" }); + } return; } - if (msg.type === "message") { + if (msg.type === "prompt") { if (!msg.content || typeof msg.content !== "string") { ws.send(JSON.stringify({ type: "error", error: "Message content is required" })); return; @@ -630,9 +589,9 @@ function printStartupMessage(app: ReturnType) { for (const route of wsRoutes) { console.log(` WS ${route.path}`); } - console.log(" Send: { type: 'message', content: '...' }"); - console.log(" Send: { type: 'stop' }"); - console.log(" Recv: { type: 'step' | 'state' | 'complete' | 'error' | 'init', ... }"); + console.log(" Send: { type: 'prompt', content: '...' }"); + console.log(" Send: { type: 'command', commandType: 'stop' }"); + console.log(" Recv: { type: 'step' | 'state' | 'complete' | 'error' | 'init' | 'headChange', ... }"); } console.log(""); diff --git a/agent-service/src/types/agent.ts b/agent-service/src/types/agent.ts index 694b51785fd..03c3524a837 100644 --- a/agent-service/src/types/agent.ts +++ b/agent-service/src/types/agent.ts @@ -17,6 +17,7 @@ * under the License. */ +import type { ModelMessage } from "ai"; import type { WorkflowContent } from "./workflow"; export enum AgentState { @@ -48,15 +49,15 @@ export interface ReActStep { toolCalls?: Array<{ toolName: string; toolCallId: string; - input: any; + input: Record; }>; toolResults?: Array<{ toolCallId: string; - output: any; + output: string; isError?: boolean; }>; usage?: TokenUsage; - inputMessages?: any[]; + inputMessages?: ModelMessage[]; messageSource?: "chat" | "feedback"; beforeWorkflowContent?: WorkflowContent; afterWorkflowContent?: WorkflowContent; diff --git a/agent-service/src/types/dto.ts b/agent-service/src/types/dto.ts new file mode 100644 index 00000000000..f740d930dc1 --- /dev/null +++ b/agent-service/src/types/dto.ts @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// DTOs: request/response bodies exchanged with backend services. Distinct from +// domain types (workflow.ts, execution.ts, agent.ts) which model in-memory +// state, and from ws.ts which carries this service's own WebSocket frames. + +import type { WorkflowContent, OperatorPortSchemaMap } from "./workflow"; + +// --- Dashboard Service: workflow persistence --- + +export interface Workflow { + wid: number; + name: string; + description?: string; + content: WorkflowContent; + creationTime?: number; + lastModifiedTime?: number; + isPublished?: boolean; +} + +export interface WorkflowPersistRequest { + wid?: number; + name: string; + description?: string; + content: string; + isPublic?: boolean; +} + +// --- Workflow Compiling Service --- + +export interface WorkflowFatalError { + // FatalErrorType enum name, e.g. "COMPILATION_ERROR" | "EXECUTION_FAILURE". + type: string; + message: string; + details?: string; + operatorId?: string; + workerId?: string; + timestamp?: { seconds: number; nanos: number }; +} + +export interface WorkflowCompilationResponse { + physicalPlan?: unknown; + operatorOutputSchemas: Record; + operatorErrors: Record; +} diff --git a/agent-service/src/types/execution.ts b/agent-service/src/types/execution.ts index f93be5c583e..90b5081acaa 100644 --- a/agent-service/src/types/execution.ts +++ b/agent-service/src/types/execution.ts @@ -17,37 +17,81 @@ * under the License. */ -interface ConsoleMessage { +import type { WorkflowFatalError } from "./dto"; + +// Lifecycle state of a single operator, as reported by the engine +// (mirrors the backend's WorkflowAggregatedState string mapping). +export type OperatorState = + | "Uninitialized" + | "Ready" + | "Running" + | "Pausing" + | "Paused" + | "Resuming" + | "Completed" + | "Failed" + | "Killed" + | "Terminated" + | "Unknown"; + +// Aggregated state of a whole workflow execution: the OperatorState values the +// engine reports, plus the synthetic outcomes the sync-execution endpoint adds. +export type WorkflowExecutionState = OperatorState | "Error" | "CompilationFailed"; + +// A single console message emitted by an operator during execution. +// `title` is the short header (Scala errors put their text here); `message` is +// the body (Python errors / stack traces). +export interface ConsoleMessage { msgType: string; + title: string; message: string; } -interface PortShape { - portIndex: number; - rows: number; - columns: number; +// One sampled output row: its original position plus the row's columns. (A viz +// payload's tuple still carries an `__is_visualization__` marker.) +export interface SampleRow { + rowIndex: number; + tuple: Record; } -export interface OperatorInfo { - state: string; - inputTuples: number; - outputTuples: number; - inputPortShapes?: PortShape[]; +// An operator's output, summarized for the agent. `sampleTuples` are the +// symmetrically-truncated output rows (the middle is dropped, so `rowIndex` +// values may have gaps). `outputSchema` / per-column statistics are intended +// future additions — the engine does not produce them yet. +export interface OperatorResultSummary { + // "table" or "visualization". resultMode: string; - result?: Record[]; - totalRowCount?: number; - displayedRows?: number; - truncated?: boolean; - consoleLogs?: ConsoleMessage[]; - error?: string; - warnings?: string[]; - resultStatistics?: Record; + sampleTuples: SampleRow[]; + // Total output rows before truncation (sampleTuples may hold fewer). + totalRowCount: number; +} + +// An operator's console output. Warnings are not a separate field: they are the +// messages whose `title` the engine prefixes with "WARNING: ", derived on demand. +export interface OperatorConsoleLogsSummary { + messages: ConsoleMessage[]; +} + +// Per-operator execution summary returned by the sync-execution backend. +// Orthogonal sub-summaries replace the previous flat `OperatorInfo`. +export interface OperatorExecutionSummary { + state: OperatorState; + // Empty means the operator did not fail. + errorMessages: ReadonlyArray; + // Absent when the operator produced no materialized result. + resultSummary?: OperatorResultSummary; + // Absent when the operator produced no console output. + consoleLogsSummary?: OperatorConsoleLogsSummary; } -export interface SyncExecutionResult { +// The result of one synchronous workflow execution. +export interface WorkflowExecutionSummary { + // True only on a clean run; can be false even when state is "Completed" + // (e.g. an operator logged a console error without aborting the run). success: boolean; - state: string; - operators: Record; - compilationErrors?: Record; - errors?: string[]; + state: WorkflowExecutionState; + operators: Record; + // Workflow-level errors (timeouts, init/compile failures, fatal errors); + // empty means none. + errors: string[]; } diff --git a/agent-service/src/types/index.ts b/agent-service/src/types/index.ts index c6d7291e51d..64227cc909f 100644 --- a/agent-service/src/types/index.ts +++ b/agent-service/src/types/index.ts @@ -19,4 +19,7 @@ export * from "./workflow"; export * from "./execution"; +export * from "./metadata"; export * from "./agent"; +export * from "./dto"; +export * from "./ws"; diff --git a/agent-service/src/types/metadata.ts b/agent-service/src/types/metadata.ts new file mode 100644 index 00000000000..49a3bdeed98 --- /dev/null +++ b/agent-service/src/types/metadata.ts @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Operator metadata shapes served by the Dashboard Service +// (`/api/resources/operator-metadata`) and the compact variants the agent +// derives from them for prompts and validation. + +export interface InputPortInfo { + displayName?: string; + disallowMultiLinks?: boolean; + dependencies?: { id: number; internal: boolean }[]; +} + +export interface OutputPortInfo { + displayName?: string; +} + +export interface OperatorAdditionalMetadata { + userFriendlyName: string; + operatorGroupName: string; + operatorDescription?: string; + inputPorts: InputPortInfo[]; + outputPorts: OutputPortInfo[]; + dynamicInputPorts?: boolean; + dynamicOutputPorts?: boolean; + supportReconfiguration?: boolean; + allowPortCustomization?: boolean; +} + +export interface OperatorSchema { + operatorType: string; + jsonSchema: Record; + additionalMetadata: OperatorAdditionalMetadata; + operatorVersion: string; +} + +export interface GroupInfo { + groupName: string; + children?: GroupInfo[] | null; +} + +export interface OperatorMetadata { + operators: OperatorSchema[]; + groups: GroupInfo[]; +} + +/** Full per-operator schema slice surfaced to debugging/inspection callers. */ +export interface OperatorSchemaInfo { + properties: Record; + required: string[]; + definitions: Record; +} + +/** Reduced operator schema (refs inlined, noise stripped) used in prompts and errors. */ +export interface CompactOperatorSchema { + properties: Record; + required: string[]; +} diff --git a/agent-service/src/types/workflow.ts b/agent-service/src/types/workflow.ts index 52c6493cf5f..241b4d9e83c 100644 --- a/agent-service/src/types/workflow.ts +++ b/agent-service/src/types/workflow.ts @@ -52,7 +52,7 @@ export interface OperatorPredicate { readonly operatorID: string; readonly operatorType: string; readonly operatorVersion: string; - readonly operatorProperties: Record; + readonly operatorProperties: Record; readonly inputPorts: PortDescription[]; readonly outputPorts: PortDescription[]; readonly dynamicInputPorts?: boolean; @@ -67,7 +67,7 @@ export interface OperatorPredicate { export interface LogicalOperator { readonly operatorID: string; readonly operatorType: string; - readonly [key: string]: any; + readonly [key: string]: unknown; } export interface OperatorLink { @@ -131,7 +131,7 @@ export interface OperatorDetail { operatorId: string; operatorType: string; customDisplayName?: string; - operatorProperties: Record; + operatorProperties: Record; inputPorts: PortDescription[]; outputPorts: PortDescription[]; } diff --git a/agent-service/src/types/ws/client.ts b/agent-service/src/types/ws/client.ts new file mode 100644 index 00000000000..edc6115b7d2 --- /dev/null +++ b/agent-service/src/types/ws/client.ts @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Client -> server WebSocket frames for this service's protocol +// (/agents/:id/react). Modeled as a discriminated union on `type` so each +// request kind carries only its own fields, rather than one interface with +// everything optional. + +interface WsClientRequestBase { + type: "prompt" | "command"; +} + +// A user prompt to run through the agent. +export interface WsClientRequestPrompt extends WsClientRequestBase { + type: "prompt"; + content: string; + messageSource?: "chat" | "feedback"; +} + +// A control command. Today the only command stops the in-flight run; the +// `commandType` discriminator leaves room for additional commands later. +export interface WsClientRequestStopCommand extends WsClientRequestBase { + type: "command"; + commandType: "stop"; +} + +export type WsClientRequest = WsClientRequestPrompt | WsClientRequestStopCommand; diff --git a/agent-service/src/types/ws/index.ts b/agent-service/src/types/ws/index.ts new file mode 100644 index 00000000000..90f3faac7e0 --- /dev/null +++ b/agent-service/src/types/ws/index.ts @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// WebSocket frames for this service's own protocol (/agents/:id/react): +// inbound client requests and the outbound server messages it pushes back. + +export * from "./client"; +export * from "./server"; diff --git a/agent-service/src/types/ws/server.ts b/agent-service/src/types/ws/server.ts new file mode 100644 index 00000000000..29a48f24341 --- /dev/null +++ b/agent-service/src/types/ws/server.ts @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Server -> client WebSocket frames for this service's protocol +// (/agents/:id/react). Modeled as a discriminated union on `type` so each +// message kind declares exactly the fields it sends. + +import type { ReActStep } from "../agent"; +import type { OperatorExecutionSummary } from "../execution"; +import type { WorkflowContent } from "../workflow"; + +// The server streams the canonical per-operator execution summaries straight to +// the client, keyed by operator id. +type OperatorResults = Record; + +interface WsServerMessageBase { + type: "init" | "step" | "state" | "complete" | "error" | "headChange"; +} + +// Sent once on connect: a snapshot of the agent's current state and steps. +export interface WsServerInitMessage extends WsServerMessageBase { + type: "init"; + state: string; + steps: ReActStep[]; + headId: string; + operatorResults: OperatorResults; +} + +// A single ReAct step streamed as the agent runs. Operator results accompany +// steps that ran tools. +export interface WsServerStepMessage extends WsServerMessageBase { + type: "step"; + step: ReActStep; + operatorResults?: OperatorResults; +} + +// An agent lifecycle transition (e.g. GENERATING, STOPPING). +export interface WsServerStateMessage extends WsServerMessageBase { + type: "state"; + state: string; +} + +// Terminal message for a finished run. +export interface WsServerCompleteMessage extends WsServerMessageBase { + type: "complete"; + state: string; + operatorResults: OperatorResults; +} + +// An error surfaced to the client. +export interface WsServerErrorMessage extends WsServerMessageBase { + type: "error"; + error: string; +} + +// Emitted after a checkout: the head moved, carrying the full step list and the +// workflow snapshot at the new head. +export interface WsServerHeadChangeMessage extends WsServerMessageBase { + type: "headChange"; + headId: string; + steps: ReActStep[]; + workflowContent?: WorkflowContent; + operatorResults: OperatorResults; +} + +export type WsServerMessage = + | WsServerInitMessage + | WsServerStepMessage + | WsServerStateMessage + | WsServerCompleteMessage + | WsServerErrorMessage + | WsServerHeadChangeMessage; diff --git a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala index b70bafb4b0b..e7ff87222f5 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/SyncExecutionResource.scala @@ -44,6 +44,9 @@ import org.apache.texera.amber.engine.common.executionruntimestate.{ ExecutionMetadataStore, ExecutionStatsStore } +import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError +import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE +import com.google.protobuf.timestamp.Timestamp import io.reactivex.rxjava3.core.Observable import org.apache.texera.auth.SessionUser import org.apache.texera.dao.SqlServer @@ -55,12 +58,12 @@ import org.apache.texera.web.service.{ExecutionResultService, WorkflowService} import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState import java.net.URI +import java.time.Instant import java.util.concurrent.TimeUnit import javax.annotation.security.RolesAllowed import javax.ws.rs._ import javax.ws.rs.core.MediaType import scala.collection.mutable -import scala.jdk.CollectionConverters._ import com.fasterxml.jackson.databind.ObjectMapper case class SyncExecutionRequest( @@ -79,32 +82,41 @@ case class ConsoleMessageInfo( message: String ) -case class PortShape( - portIndex: Int, - rows: Long +// One sampled output row: the original row position plus the row's columns as a +// processed/truncated JSON object (not a raw engine Tuple, which would serialize +// as {schema, fields[]} and bypass the type-aware conversion + cell truncation). +// The index is carried explicitly rather than embedded in the tuple. +case class SampleRow( + rowIndex: Int, + tuple: ObjectNode ) -case class OperatorInfo( - state: String, - inputTuples: Long, - outputTuples: Long, - inputPortShapes: Option[List[PortShape]], +case class OperatorResultSummary( resultMode: String, // "table" or "visualization" - result: Option[Any], // JSON array (List[ObjectNode]) - totalRowCount: Option[Int], - displayedRows: Option[Int], - truncated: Option[Boolean], - consoleLogs: Option[List[ConsoleMessageInfo]], - error: Option[String], - warnings: Option[List[String]] + sampleTuples: List[SampleRow], + totalRowCount: Int +) + +case class OperatorConsoleLogsSummary( + messages: List[ConsoleMessageInfo] ) -case class SyncExecutionResult( +// Per-operator execution summary. Orthogonal sub-summaries replace the previous +// flat OperatorInfo; must stay in sync with agent-service's OperatorExecutionSummary. +// `errorMessages` reuses the engine's WorkflowFatalError, the same type the +// compiling service returns for compilation errors, for one consistent wire shape. +case class OperatorExecutionSummary( + state: String, + errorMessages: List[WorkflowFatalError], // empty means the operator did not fail + resultSummary: Option[OperatorResultSummary], + consoleLogsSummary: Option[OperatorConsoleLogsSummary] +) + +case class WorkflowExecutionSummary( success: Boolean, state: String, - operators: Map[String, OperatorInfo], - compilationErrors: Option[Map[String, String]], - errors: Option[List[String]] + operators: Map[String, OperatorExecutionSummary], + errors: List[String] // empty means none ) sealed trait TerminationReason @@ -129,7 +141,7 @@ class SyncExecutionResource extends LazyLogging { @PathParam("cuid") computingUnitId: Int, request: SyncExecutionRequest, @Auth user: SessionUser - ): SyncExecutionResult = { + ): WorkflowExecutionSummary = { val timeoutSeconds = request.timeoutSeconds val maxOperatorResultCharLimit = @@ -176,12 +188,11 @@ class SyncExecutionResource extends LazyLogging { val executionService = workflowService.executionService.getValue if (executionService == null) { - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List("Failed to initialize execution service")) + errors = List("Failed to initialize execution service") ) } @@ -254,21 +265,19 @@ class SyncExecutionResource extends LazyLogging { } catch { case _: java.util.concurrent.TimeoutException => killExecution(executionService) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Killed", operators = Map.empty, - compilationErrors = None, - errors = Some(List(s"Timeout after $timeoutSeconds seconds")) + errors = List(s"Timeout after $timeoutSeconds seconds") ) case e: Exception => logger.error(s"Error waiting for execution: ${e.getMessage}", e) - return SyncExecutionResult( + return WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(e.getMessage)) + errors = List(e.getMessage) ) } } @@ -318,7 +327,7 @@ class SyncExecutionResource extends LazyLogging { .map(err => s"${err.`type`}: ${err.message}") .toList - val hasOperatorConsoleError = operatorInfos.values.exists(_.error.isDefined) + val hasOperatorConsoleError = operatorInfos.values.exists(_.errorMessages.nonEmpty) val stateString = if (terminatedByConsoleError) "Failed" @@ -328,12 +337,11 @@ class SyncExecutionResource extends LazyLogging { val isSuccess = (finalState.state == COMPLETED || terminatedByTargetResults) && !hasOperatorConsoleError && !terminatedByConsoleError - SyncExecutionResult( + WorkflowExecutionSummary( success = isSuccess, state = stateString, operators = operatorInfos, - compilationErrors = None, - errors = if (fatalErrors.nonEmpty) Some(fatalErrors) else None + errors = fatalErrors ) } catch { @@ -382,8 +390,8 @@ class SyncExecutionResource extends LazyLogging { maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int, inMemoryConsoleState: Option[ExecutionConsoleStore] = None - ): Map[String, OperatorInfo] = { - val operatorInfos = mutable.Map[String, OperatorInfo]() + ): Map[String, OperatorExecutionSummary] = { + val operatorInfos = mutable.Map[String, OperatorExecutionSummary]() val statsState = executionService.executionStateStore.statsStore.getState val operatorStats = statsState.operatorInfo @@ -406,23 +414,9 @@ class SyncExecutionResource extends LazyLogging { for (opId <- targetOps) { val stats = operatorStats.get(opId) - val (state, inputTuples, outputTuples): (String, Long, Long) = stats match { - case Some(s) => - val inputCount = s.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum - val outputCount = s.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum - (stateToString(s.operatorState), inputCount, outputCount) - case None => ("Unknown", 0L, 0L) - } + val state = stats.map(s => stateToString(s.operatorState)).getOrElse("Unknown") - val inputPortShapes: Option[List[PortShape]] = stats - .map { s => - s.operatorStatistics.inputMetrics.map { pm => - PortShape(pm.portId.id, pm.tupleMetrics.count) - }.toList - } - .filter(_.nonEmpty) - - val (resultMode, result, totalRowCount, displayedRows, truncated) = + val (resultMode, result, totalRowCount, _, _) = collectOperatorResult( executionId, opId, @@ -459,31 +453,40 @@ class SyncExecutionResource extends LazyLogging { } ) - // Convention: PRINT messages prefixed with "WARNING: " surface as warnings. - val warningMsgs = consoleLogs - .map(_.filter(_.title.startsWith("WARNING: ")).map(_.title)) - .filter(_.nonEmpty) - operatorInfos(opId) = OperatorInfo( + // Absent when the operator produced no materialized result. `result` and + // `totalRowCount` are populated together, so map over the former. + val resultSummary = result.map { tuples => + OperatorResultSummary( + resultMode = resultMode, + sampleTuples = tuples, + totalRowCount = totalRowCount.getOrElse(0) + ) + } + + val consoleLogsSummary = consoleLogs.map { logs => + OperatorConsoleLogsSummary(messages = logs) + } + + // Per-operator runtime errors come from console ERROR logs; surface them as + // EXECUTION_FAILURE WorkflowFatalErrors (same type the compiler emits for + // COMPILATION_ERRORs). Empty list means the operator did not fail. + val errorMessages = errorMsg + .map(msg => List(WorkflowFatalError(EXECUTION_FAILURE, Timestamp(Instant.now), msg, "", opId))) + .getOrElse(List.empty) + + operatorInfos(opId) = OperatorExecutionSummary( state = state, - inputTuples = inputTuples, - outputTuples = outputTuples, - inputPortShapes = inputPortShapes, - resultMode = resultMode, - result = result, - totalRowCount = totalRowCount, - displayedRows = displayedRows, - truncated = truncated, - consoleLogs = consoleLogs, - error = errorMsg, - warnings = warningMsgs + errorMessages = errorMessages, + resultSummary = resultSummary, + consoleLogsSummary = consoleLogsSummary ) } operatorInfos.toMap } - private def handleExecutionError(e: Exception): SyncExecutionResult = { + private def handleExecutionError(e: Exception): WorkflowExecutionSummary = { val errorMsg = e.getMessage val isCompilationError = errorMsg != null && ( errorMsg.contains("compilation") || @@ -493,20 +496,18 @@ class SyncExecutionResource extends LazyLogging { ) if (isCompilationError) { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "CompilationFailed", operators = Map.empty, - compilationErrors = Some(Map("error" -> errorMsg)), - errors = Some(List(errorMsg)) + errors = List(errorMsg) ) } else { - SyncExecutionResult( + WorkflowExecutionSummary( success = false, state = "Error", operators = Map.empty, - compilationErrors = None, - errors = Some(List(Option(e.getMessage).getOrElse("Unknown error"))) + errors = List(Option(e.getMessage).getOrElse("Unknown error")) ) } } @@ -521,9 +522,7 @@ class SyncExecutionResource extends LazyLogging { opId: String, maxOperatorResultCharLimit: Int, maxOperatorResultCellCharLimit: Int - ): (String, Option[Any], Option[Int], Option[Int], Option[Boolean]) = { - import com.fasterxml.jackson.databind.node.ObjectNode - + ): (String, Option[List[SampleRow]], Option[Int], Option[Int], Option[Boolean]) = { try { val storageUriOption = WorkflowExecutionsResource.getResultUriByLogicalPortId( executionId, @@ -543,13 +542,7 @@ class SyncExecutionResource extends LazyLogging { val tupleIterator = document.get() if (totalCount == 0 || !tupleIterator.hasNext) { - return ( - "table", - Some(List.empty[ObjectNode].asJava), - Some(0), - Some(0), - Some(false) - ) + return ("table", Some(List.empty[SampleRow]), Some(0), Some(0), Some(false)) } // A single tuple with html-content / json-content is a visualization payload — @@ -558,71 +551,53 @@ class SyncExecutionResource extends LazyLogging { if (totalCount == 1 && isVisualizationTuple(firstTuple)) { val jsonResults = ExecutionResultService.convertTuplesToJson(List(firstTuple), isVisualization = true) - jsonResults.foreach( - _.asInstanceOf[ObjectNode].put("__is_visualization__", true) - ) - return ( - "visualization", - Some(jsonResults), - Some(totalCount), - Some(1), - Some(false) - ) + jsonResults.foreach(_.put("__is_visualization__", true)) + val rows = jsonResults.zipWithIndex.map { case (json, idx) => SampleRow(idx, json) } + return ("visualization", Some(rows), Some(totalCount), Some(1), Some(false)) } - // __row_index__ preserves the original position so the frontend can show - // "row N" correctly after symmetric truncation drops the middle. + // rowIndex preserves the original position so the client can show "row N" + // correctly after symmetric truncation drops the middle. var rowIndex = 0 val firstJson = ExecutionResultService.convertTuplesToJson(List(firstTuple)).head val truncatedFirst = truncateSingleTuple(firstJson, maxOperatorResultCellCharLimit) - truncatedFirst.put("__row_index__", rowIndex) val firstSize = estimateTupleSize(truncatedFirst, mapper) if (firstSize >= maxOperatorResultCharLimit) { - return ( - "table", - Some(List(truncatedFirst).asJava), - Some(totalCount), - Some(1), - Some(true) - ) + return ("table", Some(List(SampleRow(rowIndex, truncatedFirst))), Some(totalCount), Some(1), Some(true)) } val halfLimit = maxOperatorResultCharLimit / 2 val truncationNoticeSize = 50 // reserved for the "...skipped..." marker - val frontTuples = mutable.ListBuffer[ObjectNode](truncatedFirst) + val frontRows = mutable.ListBuffer[SampleRow](SampleRow(rowIndex, truncatedFirst)) var frontSize = firstSize - var processedCount = 1 while (tupleIterator.hasNext && frontSize < halfLimit) { val tuple = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jsonTuple = ExecutionResultService.convertTuplesToJson(List(tuple)).head val truncatedTuple = truncateSingleTuple(jsonTuple, maxOperatorResultCellCharLimit) - truncatedTuple.put("__row_index__", rowIndex) val tupleSize = estimateTupleSize(truncatedTuple, mapper) + val row = SampleRow(rowIndex, truncatedTuple) if (frontSize + tupleSize <= halfLimit) { - frontTuples += truncatedTuple + frontRows += row frontSize += tupleSize } else { // Front is full — switch to a sliding window for the back half. - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() - backBuffer += ((truncatedTuple, tupleSize)) + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() + backBuffer += ((row, tupleSize)) var backSize = tupleSize while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -631,34 +606,24 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - return ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + return ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } } if (tupleIterator.hasNext) { - val backBuffer = mutable.ArrayBuffer[(ObjectNode, Int)]() + val backBuffer = mutable.ArrayBuffer[(SampleRow, Int)]() var backSize = 0 while (tupleIterator.hasNext) { val t = tupleIterator.next() rowIndex += 1 - processedCount += 1 val jt = ExecutionResultService.convertTuplesToJson(List(t)).head val tt = truncateSingleTuple(jt, maxOperatorResultCellCharLimit) - tt.put("__row_index__", rowIndex) val ts = estimateTupleSize(tt, mapper) - backBuffer += ((tt, ts)) + backBuffer += ((SampleRow(rowIndex, tt), ts)) backSize += ts while (backSize > halfLimit - truncationNoticeSize && backBuffer.size > 1) { @@ -667,25 +632,11 @@ class SyncExecutionResource extends LazyLogging { } } - val backTuples = backBuffer.map(_._1).toList - val allTuples = frontTuples.toList ++ backTuples - val skippedRows = totalCount - allTuples.size - - ( - "table", - Some(allTuples.asJava), - Some(totalCount), - Some(allTuples.size), - Some(skippedRows > 0) - ) + val allRows = frontRows.toList ++ backBuffer.map(_._1).toList + val skippedRows = totalCount - allRows.size + ("table", Some(allRows), Some(totalCount), Some(allRows.size), Some(skippedRows > 0)) } else { - ( - "table", - Some(frontTuples.toList.asJava), - Some(totalCount), - Some(frontTuples.size), - Some(false) - ) + ("table", Some(frontRows.toList), Some(totalCount), Some(frontRows.size), Some(false)) } case None => diff --git a/frontend/src/app/workspace/service/agent/agent.service.ts b/frontend/src/app/workspace/service/agent/agent.service.ts index 462e7679ce5..65e435bf4bb 100644 --- a/frontend/src/app/workspace/service/agent/agent.service.ts +++ b/frontend/src/app/workspace/service/agent/agent.service.ts @@ -103,7 +103,7 @@ export interface OperatorResultSummary { state: string; inputTuples: number; outputTuples: number; - inputPortShapes?: { portIndex: number; rows: number; columns: number }[]; + inputPortShapes?: { portIndex: number; rows: number }[]; outputColumns?: number; error?: string; warnings?: string[]; @@ -113,6 +113,24 @@ export interface OperatorResultSummary { resultStatistics?: Record; } +/** + * Per-operator execution summary as sent by the agent-service over the + * WebSocket / operator-results endpoint (mirror of its OperatorExecutionSummary). + * The flat OperatorResultSummary above is derived from this for display. + */ +interface WireOperatorExecutionSummary { + state: string; + errorMessages?: { type: string; message: string }[]; + resultSummary?: { + resultMode: string; + sampleTuples: { rowIndex: number; tuple: Record }[]; + totalRowCount: number; + }; + consoleLogsSummary?: { + messages: { msgType: string; title: string; message: string }[]; + }; +} + interface ApiAgentInfo { id: string; name: string; @@ -909,7 +927,7 @@ export class AgentService { } const wsMessage = { - type: "message", + type: "prompt", content: message, messageSource, }; @@ -967,7 +985,7 @@ export class AgentService { if (tracking?.websocket && tracking.websocket.readyState === WebSocket.OPEN) { // Send stop via WebSocket for immediate effect try { - tracking.websocket.send(JSON.stringify({ type: "stop" })); + tracking.websocket.send(JSON.stringify({ type: "command", commandType: "stop" })); } catch (error) { console.error("Failed to send stop command:", error); } @@ -1302,10 +1320,24 @@ export class AgentService { /** * Update operator result summaries from a WebSocket or API response. */ - private updateOperatorResultSummaries(results: Record): void { + private updateOperatorResultSummaries(results: Record): void { const summaries = new Map(); for (const [opId, data] of Object.entries(results)) { - summaries.set(opId, data); + summaries.set(opId, { + state: data.state, + // Tuple counts are no longer carried per-port; output rows come from the + // result summary, input shapes are derivable from the DAG when needed. + inputTuples: 0, + outputTuples: data.resultSummary?.totalRowCount ?? 0, + error: data.errorMessages?.map(e => e.message).join("; ") || undefined, + warnings: (data.consoleLogsSummary?.messages ?? []) + .filter(m => m.title.startsWith("WARNING: ")) + .map(m => m.title), + consoleLogCount: data.consoleLogsSummary?.messages.length, + totalRowCount: data.resultSummary?.totalRowCount, + // Flatten back to embedded __row_index__ so the display components are unchanged. + sampleRecords: data.resultSummary?.sampleTuples?.map(r => ({ __row_index__: r.rowIndex, ...r.tuple })), + }); } this.operatorResultSummariesSubject.next(summaries); } @@ -1315,11 +1347,11 @@ export class AgentService { */ public fetchOperatorResults(agentId: string): void { this.http - .get<{ results: Record }>( + .get<{ results: Record }>( `${this.AGENT_API_BASE}/agents/${agentId}/operator-results`, this.agentHeaders(agentId) ) - .pipe(catchError(() => of({ results: {} as Record }))) + .pipe(catchError(() => of({ results: {} as Record }))) .subscribe(response => { this.updateOperatorResultSummaries(response.results); this.resultAnnotationsVisibleSubject.next(true);