diff --git a/integration-tests/cli/CHANGELOG.md b/integration-tests/cli/CHANGELOG.md index 28ececc6f..8a3d71d07 100644 --- a/integration-tests/cli/CHANGELOG.md +++ b/integration-tests/cli/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/integration-tests-cli +## 1.0.19 + +### Patch Changes + +- Updated dependencies [b2e7b98] + - @openfn/project@0.14.5 + - @openfn/lightning-mock@2.4.12 + ## 1.0.18 ### Patch Changes diff --git a/integration-tests/cli/package.json b/integration-tests/cli/package.json index 1e485754c..4fc120824 100644 --- a/integration-tests/cli/package.json +++ b/integration-tests/cli/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-cli", "private": true, - "version": "1.0.18", + "version": "1.0.19", "description": "CLI integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index c041ed4dc..64731b30b 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/cli +## 1.33.1 + +### Patch Changes + +- b2e7b98: Include `webhook_reply` and `cron_cursor_job_id` in workflow version hashes. +- Updated dependencies [b2e7b98] + - @openfn/project@0.14.5 + ## 1.33.0 ### Minor Changes diff --git a/packages/cli/package.json b/packages/cli/package.json index bb305b4dc..bffa927a4 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/cli", - "version": "1.33.0", + "version": "1.33.1", "description": "CLI devtools for the OpenFn toolchain", "engines": { "node": ">=18", diff --git a/packages/engine-multi/CHANGELOG.md b/packages/engine-multi/CHANGELOG.md index d585ce2d3..e6fdbe3eb 100644 --- a/packages/engine-multi/CHANGELOG.md +++ b/packages/engine-multi/CHANGELOG.md @@ -1,5 +1,11 @@ # engine-multi +## 1.11.1 + +### Patch Changes + +- aa833d6: Enforce run memory limits at the child_process level + ## 1.11.0 ### Minor Changes diff --git a/packages/engine-multi/package.json b/packages/engine-multi/package.json index ae8fc363c..ebe6b4b1e 100644 --- a/packages/engine-multi/package.json +++ b/packages/engine-multi/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/engine-multi", - "version": "1.11.0", + "version": "1.11.1", "description": "Multi-process runtime engine", "main": "dist/index.js", "type": "module", diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index a89832f41..c3a062407 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -12,6 +12,7 @@ type WorkerOptions = { maxWorkers?: number; env?: any; timeout?: number; // ms + memoryLimitMb?: number; proxyStdout?: boolean; // print internal stdout to console }; @@ -22,13 +23,19 @@ export default function initWorkers( options: WorkerOptions = {}, logger: Logger ) { - const { env = {}, maxWorkers = 5, proxyStdout = false } = options; + const { + env = {}, + maxWorkers = 5, + memoryLimitMb, + proxyStdout = false, + } = options; const workers = createPool( workerPath, { maxWorkers, env, + memoryLimitMb, proxyStdout, }, logger diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 630e334ab..31ecc0774 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -139,6 +139,7 @@ const createEngine = async ( resolvedWorkerPath, { maxWorkers: options.maxWorkers, + memoryLimitMb: defaultMemoryLimit, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/test/worker-functions.ts b/packages/engine-multi/src/test/worker-functions.ts index 7c73af6d1..dca335cea 100644 --- a/packages/engine-multi/src/test/worker-functions.ts +++ b/packages/engine-multi/src/test/worker-functions.ts @@ -24,6 +24,7 @@ const tasks = { }, threadId: async () => threadId, processId: async () => process.pid, + getExecArgv: async () => process.execArgv, // very very simple intepretation of a run function // Most tests should use the mock-worker instead run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => { diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 7a306b974..48a268803 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -18,6 +18,7 @@ export type PoolOptions = { capacity?: number; // defaults to 5 maxWorkers?: number; // alias for capacity. Which is best? env?: Record; // default environment for workers + memoryLimitMb?: number; // --max-old-space-size for child processes proxyStdout?: boolean; // print internal stdout to console }; @@ -83,8 +84,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { let child: ChildProcess; if (!maybeChild) { // create a new child process and load the module script into it + const execArgv = ['--experimental-vm-modules', '--no-warnings']; + if (options.memoryLimitMb) { + execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`); + } + child = fork(envPath, [script], { - execArgv: ['--experimental-vm-modules', '--no-warnings'], + execArgv, env: options.env || {}, diff --git a/packages/engine-multi/test/worker/pool.test.ts b/packages/engine-multi/test/worker/pool.test.ts index fe162d249..5a3b8d9dc 100644 --- a/packages/engine-multi/test/worker/pool.test.ts +++ b/packages/engine-multi/test/worker/pool.test.ts @@ -185,6 +185,30 @@ test('throw if memory limit is exceeded', async (t) => { } }); +test('child process should have --max-old-space-size when memoryLimitMb is set', async (t) => { + const pool = createPool(workerPath, { memoryLimitMb: 200 }, logger); + const execArgv = await pool.exec('getExecArgv', []); + t.true(execArgv.some((a: string) => a === '--max-old-space-size=200')); +}); + +test('child process should not have --max-old-space-size when memoryLimitMb is not set', async (t) => { + const pool = createPool(workerPath, {}, logger); + const execArgv = await pool.exec('getExecArgv', []); + t.false(execArgv.some((a: string) => a.includes('--max-old-space-size'))); +}); + +test('pool recovers after process-level OOM', async (t) => { + const pool = createPool(workerPath, { memoryLimitMb: 50 }, logger); + + await t.throwsAsync(() => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), { + name: 'OOMError', + }); + + // Pool should still be functional after the OOM + const result = await pool.exec('test', [42]); + t.is(result, 42); +}); + test('handle weird exit', async (t) => { const pool = createPool(workerPath, {}, logger); diff --git a/packages/lightning-mock/CHANGELOG.md b/packages/lightning-mock/CHANGELOG.md index f05f0b161..50ca254ab 100644 --- a/packages/lightning-mock/CHANGELOG.md +++ b/packages/lightning-mock/CHANGELOG.md @@ -1,5 +1,14 @@ # @openfn/lightning-mock +## 2.4.12 + +### Patch Changes + +- Updated dependencies [aa833d6] +- Updated dependencies [b2e7b98] + - @openfn/engine-multi@1.11.1 + - @openfn/project@0.14.5 + ## 2.4.11 ### Patch Changes diff --git a/packages/lightning-mock/package.json b/packages/lightning-mock/package.json index 6f7e846ce..b4090f822 100644 --- a/packages/lightning-mock/package.json +++ b/packages/lightning-mock/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/lightning-mock", - "version": "2.4.11", + "version": "2.4.12", "private": true, "description": "A mock Lightning server", "main": "dist/index.js", diff --git a/packages/project/CHANGELOG.md b/packages/project/CHANGELOG.md index d11aa3f2b..6ccb64031 100644 --- a/packages/project/CHANGELOG.md +++ b/packages/project/CHANGELOG.md @@ -1,5 +1,11 @@ # @openfn/project +## 0.14.5 + +### Patch Changes + +- b2e7b98: Include `webhook_reply` and `cron_cursor_job_id` in workflow version hashes. + ## 0.14.4 ### Patch Changes diff --git a/packages/project/package.json b/packages/project/package.json index 7e53315e5..c5608ca1f 100644 --- a/packages/project/package.json +++ b/packages/project/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/project", - "version": "0.14.4", + "version": "0.14.5", "description": "Read, serialize, replicate and sync OpenFn projects", "scripts": { "test": "pnpm ava", diff --git a/packages/project/src/util/version.ts b/packages/project/src/util/version.ts index df42b8bb1..d815d794c 100644 --- a/packages/project/src/util/version.ts +++ b/packages/project/src/util/version.ts @@ -54,7 +54,13 @@ export const generateHash = ( 'body', ].sort(); - const triggerKeys = ['type', 'cron_expression', 'enabled'].sort(); + const triggerKeys = [ + 'type', + 'cron_expression', + 'enabled', + 'webhook_reply', + 'cron_cursor_job_id', + ].sort(); const edgeKeys = [ 'name', // generated diff --git a/packages/project/test/util/version-workflow.test.ts b/packages/project/test/util/version-workflow.test.ts index 4f34e8843..6b0dd0543 100644 --- a/packages/project/test/util/version-workflow.test.ts +++ b/packages/project/test/util/version-workflow.test.ts @@ -384,6 +384,40 @@ test('hash a trigger', (t) => { t.not(generateHash(webhook), generateHash(cronExpression)); }); +test('hash changes when webhook_reply changes', (t) => { + const wf1 = generateWorkflow( + `@name wf-1 + @id workflow-id + t(type=webhook,webhook_reply=x)-x + ` + ); + const wf2 = generateWorkflow( + `@name wf-1 + @id workflow-id + t(type=webhook,webhook_reply=y)-x + ` + ); + + t.not(generateHash(wf1), generateHash(wf2)); +}); + +test('hash changes when cron_cursor_job_id changes', (t) => { + const wf1 = generateWorkflow( + `@name wf-1 + @id workflow-id + t(type=cron,cron_expression="1",cron_cursor_job_id=x)-x + ` + ); + const wf2 = generateWorkflow( + `@name wf-1 + @id workflow-id + t(type=cron,cron_expression="1",cron_cursor_job_id=y)-x + ` + ); + + t.not(generateHash(wf1), generateHash(wf2)); +}); + test('hash changes across an edge', (t) => { const basicEdge = generateWorkflow( ` diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index a90514188..2544db8ce 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,14 @@ # ws-worker +## 1.23.5 + +### Patch Changes + +- aa833d6: Enforce run memory limits at the child_process level +- 69a7e0e: When processing final state for a run with multiple leaf nodes, don't send empty leaf results. This prevents state recursively growing in cron tasks +- Updated dependencies [aa833d6] + - @openfn/engine-multi@1.11.1 + ## 1.23.4 ### Patch Changes diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 05feee5ae..c07b9c477 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.23.4", + "version": "1.23.5", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/events/run-complete.ts b/packages/ws-worker/src/events/run-complete.ts index 8840fd2c1..657045f9e 100644 --- a/packages/ws-worker/src/events/run-complete.ts +++ b/packages/ws-worker/src/events/run-complete.ts @@ -8,21 +8,45 @@ import logFinalReason from '../util/log-final-reason'; import { timeInMicroseconds } from '../util'; import { sendEvent } from '../util/send-event'; +const isEmptyState = (obj: any) => { + if (Object.keys(obj).length == 0) { + return true; + } + if ( + Object.keys(obj).length == 1 && + 'data' in obj && + !Object.keys(obj.data).length + ) { + return true; + } + return false; +}; + export default async function onWorkflowComplete( context: Context, event: WorkflowCompletePayload ) { const { state, onFinish, logger } = context; + const isSingleLeaf = + state.leafDataclipIds.length === 1 && + !state.withheldDataclips[state.leafDataclipIds[0]]; + const result = event.state; + // remove any empty leaf nodes from state + // This fixes recursive state growth in cron jobs https://github.com/OpenFn/kit/issues/1367 + if (!isSingleLeaf) { + for (const key in result) { + if (isEmptyState(result[key])) { + delete result[key]; + } + } + } + const reason = calculateRunExitReason(state); await logFinalReason(context, reason); - const isSingleLeaf = - state.leafDataclipIds.length === 1 && - !state.withheldDataclips[state.leafDataclipIds[0]]; - const payload: RunCompletePayload = { timestamp: timeInMicroseconds(event.time), final_state: result, diff --git a/packages/ws-worker/test/events/run-complete.test.ts b/packages/ws-worker/test/events/run-complete.test.ts index 23224dffc..c18a07870 100644 --- a/packages/ws-worker/test/events/run-complete.test.ts +++ b/packages/ws-worker/test/events/run-complete.test.ts @@ -38,7 +38,7 @@ test('should send final_state when there are multiple leaves', async (t) => { const plan = createPlan(); const state = createRunState(plan); - state.leafDataclipIds = ['clip-1', 'clip-2']; + state.leafDataclipIds = ['a', 'b']; const channel = mockChannel({ [RUN_LOG]: () => true, @@ -54,6 +54,32 @@ test('should send final_state when there are multiple leaves', async (t) => { await handleRunComplete(context, event); }); +test('should ignore empty leaf state in final_state', async (t) => { + const result = {}; + const plan = createPlan(); + + const state = createRunState(plan); + state.leafDataclipIds = ['clip-1', 'clip-2']; + state.dataclips = { + // two different structures of empty + a: {}, + b: { data: {} }, + }; + + const channel = mockChannel({ + [RUN_LOG]: () => true, + [RUN_COMPLETE]: (evt) => { + t.deepEqual(evt.final_state, {}); + t.falsy(evt.final_dataclip_id); + }, + }); + + const event: any = { state: result }; + + const context: any = { channel, state, onFinish: () => {} }; + await handleRunComplete(context, event); +}); + test('should include a timestamp', async (t) => { const plan = createPlan(); @@ -438,7 +464,10 @@ test('should properly serialize final_state as JSON', async (t) => { await handleRunComplete(context, event); t.deepEqual(completeEvent.final_state, complexState); - t.deepEqual(completeEvent.final_state.data.users[0], { id: 1, name: 'Alice' }); + t.deepEqual(completeEvent.final_state.data.users[0], { + id: 1, + name: 'Alice', + }); t.is(completeEvent.final_state.data.metadata.nested.deeply.value, 42); const jsonString = JSON.stringify(completeEvent.final_state); @@ -473,5 +502,8 @@ test('should handle Uint8Array in final_state', async (t) => { await handleRunComplete(context, event); - t.deepEqual(completeEvent.final_state.data.buffer, new Uint8Array([1, 2, 3, 4, 5])); + t.deepEqual( + completeEvent.final_state.data.buffer, + new Uint8Array([1, 2, 3, 4, 5]) + ); });