Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integration-tests/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/integration-tests-cli

## 1.0.19

### Patch Changes

- Updated dependencies [b2e7b98]
- @openfn/project@0.14.5
- @openfn/lightning-mock@2.4.12

## 1.0.18

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-cli",
"private": true,
"version": "1.0.18",
"version": "1.0.19",
"description": "CLI integration tests",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# @openfn/cli

## 1.33.1

### Patch Changes

- b2e7b98: Include `webhook_reply` and `cron_cursor_job_id` in workflow version hashes.
- Updated dependencies [b2e7b98]
- @openfn/project@0.14.5

## 1.33.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/cli",
"version": "1.33.0",
"version": "1.33.1",
"description": "CLI devtools for the OpenFn toolchain",
"engines": {
"node": ">=18",
Expand Down
6 changes: 6 additions & 0 deletions packages/engine-multi/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# engine-multi

## 1.11.1

### Patch Changes

- aa833d6: Enforce run memory limits at the child_process level

## 1.11.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-multi/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/engine-multi",
"version": "1.11.0",
"version": "1.11.1",
"description": "Multi-process runtime engine",
"main": "dist/index.js",
"type": "module",
Expand Down
9 changes: 8 additions & 1 deletion packages/engine-multi/src/api/call-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type WorkerOptions = {
maxWorkers?: number;
env?: any;
timeout?: number; // ms
memoryLimitMb?: number;
proxyStdout?: boolean; // print internal stdout to console
};

Expand All @@ -22,13 +23,19 @@ export default function initWorkers(
options: WorkerOptions = {},
logger: Logger
) {
const { env = {}, maxWorkers = 5, proxyStdout = false } = options;
const {
env = {},
maxWorkers = 5,
memoryLimitMb,
proxyStdout = false,
} = options;

const workers = createPool(
workerPath,
{
maxWorkers,
env,
memoryLimitMb,
proxyStdout,
},
logger
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ const createEngine = async (
resolvedWorkerPath,
{
maxWorkers: options.maxWorkers,
memoryLimitMb: defaultMemoryLimit,
proxyStdout: options.proxyStdout,
},
options.logger
Expand Down
1 change: 1 addition & 0 deletions packages/engine-multi/src/test/worker-functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const tasks = {
},
threadId: async () => threadId,
processId: async () => process.pid,
getExecArgv: async () => process.execArgv,
// very very simple intepretation of a run function
// Most tests should use the mock-worker instead
run: async (plan: ExecutionPlan, _input: any, _adaptorPaths: any) => {
Expand Down
8 changes: 7 additions & 1 deletion packages/engine-multi/src/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type PoolOptions = {
capacity?: number; // defaults to 5
maxWorkers?: number; // alias for capacity. Which is best?
env?: Record<string, string>; // default environment for workers
memoryLimitMb?: number; // --max-old-space-size for child processes

proxyStdout?: boolean; // print internal stdout to console
};
Expand Down Expand Up @@ -83,8 +84,13 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) {
let child: ChildProcess;
if (!maybeChild) {
// create a new child process and load the module script into it
const execArgv = ['--experimental-vm-modules', '--no-warnings'];
if (options.memoryLimitMb) {
execArgv.push(`--max-old-space-size=${options.memoryLimitMb}`);
}

child = fork(envPath, [script], {
execArgv: ['--experimental-vm-modules', '--no-warnings'],
execArgv,

env: options.env || {},

Expand Down
24 changes: 24 additions & 0 deletions packages/engine-multi/test/worker/pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,30 @@ test('throw if memory limit is exceeded', async (t) => {
}
});

test('child process should have --max-old-space-size when memoryLimitMb is set', async (t) => {
const pool = createPool(workerPath, { memoryLimitMb: 200 }, logger);
const execArgv = await pool.exec('getExecArgv', []);
t.true(execArgv.some((a: string) => a === '--max-old-space-size=200'));
});

test('child process should not have --max-old-space-size when memoryLimitMb is not set', async (t) => {
const pool = createPool(workerPath, {}, logger);
const execArgv = await pool.exec('getExecArgv', []);
t.false(execArgv.some((a: string) => a.includes('--max-old-space-size')));
});

test('pool recovers after process-level OOM', async (t) => {
const pool = createPool(workerPath, { memoryLimitMb: 50 }, logger);

await t.throwsAsync(() => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), {
name: 'OOMError',
});

// Pool should still be functional after the OOM
const result = await pool.exec('test', [42]);
t.is(result, 42);
});

test('handle weird exit', async (t) => {
const pool = createPool(workerPath, {}, logger);

Expand Down
9 changes: 9 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/lightning-mock

## 2.4.12

### Patch Changes

- Updated dependencies [aa833d6]
- Updated dependencies [b2e7b98]
- @openfn/engine-multi@1.11.1
- @openfn/project@0.14.5

## 2.4.11

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.4.11",
"version": "2.4.12",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/project/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/project

## 0.14.5

### Patch Changes

- b2e7b98: Include `webhook_reply` and `cron_cursor_job_id` in workflow version hashes.

## 0.14.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/project/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/project",
"version": "0.14.4",
"version": "0.14.5",
"description": "Read, serialize, replicate and sync OpenFn projects",
"scripts": {
"test": "pnpm ava",
Expand Down
8 changes: 7 additions & 1 deletion packages/project/src/util/version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ export const generateHash = (
'body',
].sort();

const triggerKeys = ['type', 'cron_expression', 'enabled'].sort();
const triggerKeys = [
'type',
'cron_expression',
'enabled',
'webhook_reply',
'cron_cursor_job_id',
].sort();

const edgeKeys = [
'name', // generated
Expand Down
34 changes: 34 additions & 0 deletions packages/project/test/util/version-workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,40 @@ test('hash a trigger', (t) => {
t.not(generateHash(webhook), generateHash(cronExpression));
});

test('hash changes when webhook_reply changes', (t) => {
const wf1 = generateWorkflow(
`@name wf-1
@id workflow-id
t(type=webhook,webhook_reply=x)-x
`
);
const wf2 = generateWorkflow(
`@name wf-1
@id workflow-id
t(type=webhook,webhook_reply=y)-x
`
);

t.not(generateHash(wf1), generateHash(wf2));
});

test('hash changes when cron_cursor_job_id changes', (t) => {
const wf1 = generateWorkflow(
`@name wf-1
@id workflow-id
t(type=cron,cron_expression="1",cron_cursor_job_id=x)-x
`
);
const wf2 = generateWorkflow(
`@name wf-1
@id workflow-id
t(type=cron,cron_expression="1",cron_cursor_job_id=y)-x
`
);

t.not(generateHash(wf1), generateHash(wf2));
});

test('hash changes across an edge', (t) => {
const basicEdge = generateWorkflow(
`
Expand Down
9 changes: 9 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# ws-worker

## 1.23.5

### Patch Changes

- aa833d6: Enforce run memory limits at the child_process level
- 69a7e0e: When processing final state for a run with multiple leaf nodes, don't send empty leaf results. This prevents state recursively growing in cron tasks
- Updated dependencies [aa833d6]
- @openfn/engine-multi@1.11.1

## 1.23.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.23.4",
"version": "1.23.5",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
32 changes: 28 additions & 4 deletions packages/ws-worker/src/events/run-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,45 @@ import logFinalReason from '../util/log-final-reason';
import { timeInMicroseconds } from '../util';
import { sendEvent } from '../util/send-event';

const isEmptyState = (obj: any) => {
if (Object.keys(obj).length == 0) {
return true;
}
if (
Object.keys(obj).length == 1 &&
'data' in obj &&
!Object.keys(obj.data).length
) {
return true;
}
return false;
};

export default async function onWorkflowComplete(
context: Context,
event: WorkflowCompletePayload
) {
const { state, onFinish, logger } = context;

const isSingleLeaf =
state.leafDataclipIds.length === 1 &&
!state.withheldDataclips[state.leafDataclipIds[0]];

const result = event.state;

// remove any empty leaf nodes from state
// This fixes recursive state growth in cron jobs https://github.com/OpenFn/kit/issues/1367
if (!isSingleLeaf) {
for (const key in result) {
if (isEmptyState(result[key])) {
delete result[key];
}
}
}

const reason = calculateRunExitReason(state);
await logFinalReason(context, reason);

const isSingleLeaf =
state.leafDataclipIds.length === 1 &&
!state.withheldDataclips[state.leafDataclipIds[0]];

const payload: RunCompletePayload = {
timestamp: timeInMicroseconds(event.time),
final_state: result,
Expand Down
38 changes: 35 additions & 3 deletions packages/ws-worker/test/events/run-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ test('should send final_state when there are multiple leaves', async (t) => {
const plan = createPlan();

const state = createRunState(plan);
state.leafDataclipIds = ['clip-1', 'clip-2'];
state.leafDataclipIds = ['a', 'b'];

const channel = mockChannel({
[RUN_LOG]: () => true,
Expand All @@ -54,6 +54,32 @@ test('should send final_state when there are multiple leaves', async (t) => {
await handleRunComplete(context, event);
});

test('should ignore empty leaf state in final_state', async (t) => {
const result = {};
const plan = createPlan();

const state = createRunState(plan);
state.leafDataclipIds = ['clip-1', 'clip-2'];
state.dataclips = {
// two different structures of empty
a: {},
b: { data: {} },
};

const channel = mockChannel({
[RUN_LOG]: () => true,
[RUN_COMPLETE]: (evt) => {
t.deepEqual(evt.final_state, {});
t.falsy(evt.final_dataclip_id);
},
});

const event: any = { state: result };

const context: any = { channel, state, onFinish: () => {} };
await handleRunComplete(context, event);
});

test('should include a timestamp', async (t) => {
const plan = createPlan();

Expand Down Expand Up @@ -438,7 +464,10 @@ test('should properly serialize final_state as JSON', async (t) => {
await handleRunComplete(context, event);

t.deepEqual(completeEvent.final_state, complexState);
t.deepEqual(completeEvent.final_state.data.users[0], { id: 1, name: 'Alice' });
t.deepEqual(completeEvent.final_state.data.users[0], {
id: 1,
name: 'Alice',
});
t.is(completeEvent.final_state.data.metadata.nested.deeply.value, 42);

const jsonString = JSON.stringify(completeEvent.final_state);
Expand Down Expand Up @@ -473,5 +502,8 @@ test('should handle Uint8Array in final_state', async (t) => {

await handleRunComplete(context, event);

t.deepEqual(completeEvent.final_state.data.buffer, new Uint8Array([1, 2, 3, 4, 5]));
t.deepEqual(
completeEvent.final_state.data.buffer,
new Uint8Array([1, 2, 3, 4, 5])
);
});