From de4bcc608555f35f0a76660362ba3a17d7ac7394 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Wed, 15 Apr 2026 17:56:46 +0200 Subject: [PATCH 1/4] non-map state creates lost --- CHANGELOG.md | 3 +++ lib/lightning/runs.ex | 8 ++++++++ lib/lightning/runs/handlers.ex | 5 ++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a31cac9030..631fb6a1ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,9 @@ and this project adheres to ### Fixed +- Non-map state coming back from the worker would cause a lost run, every time. + Rather than losing these runs that return non-map x's, we now wrap them like + so `{"value": x}` - Since OTP26, if `SMTP_PROVIDER` is set to `smtp` and `SMTP_TLS` is set to `true` or `if_available` this would result in TLS-related failures when trying to send emails. This is now fixed for a limited number of use cases (see diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 330695c435..6eeb1b9b4b 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -201,9 +201,17 @@ defmodule Lightning.Runs do @spec complete_run(Run.t(), %{optional(any()) => any()}) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def complete_run(run, params) do + params = wrap_final_state(params) Handlers.CompleteRun.call(run, params) end + defp wrap_final_state(%{"final_state" => state} = params) + when not is_map(state) and not is_nil(state) do + Map.put(params, "final_state", %{"value" => state}) + end + + defp wrap_final_state(params), do: params + @spec update_run(Ecto.Changeset.t(Run.t())) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def update_run(%Ecto.Changeset{data: %Run{}} = changeset) do diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index 164c8c19c0..93763a097a 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -473,10 +473,13 @@ defmodule Lightning.Runs.Handlers do Dataclip.new(%{ id: dataclip_id, project_id: project_id, - body: output_dataclip |> Jason.decode!(), + body: output_dataclip |> Jason.decode!() |> ensure_map(), type: :step_result }) |> Repo.insert() end + + defp ensure_map(%{} = map), do: map + defp ensure_map(value), do: %{"value" => value} end end From 88c286fce5854d5ff43fb84087f788acdbb17c28 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Wed, 15 Apr 2026 18:16:17 +0200 Subject: [PATCH 2/4] red team test --- test/integration/red_team_test.exs | 178 +++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 test/integration/red_team_test.exs diff --git a/test/integration/red_team_test.exs b/test/integration/red_team_test.exs new file mode 100644 index 0000000000..0e3e9c77d9 --- /dev/null +++ b/test/integration/red_team_test.exs @@ -0,0 +1,178 @@ +defmodule Lightning.RedTeamTest do + @moduledoc """ + Integration tests for "lost run" scenarios — single-step workflows where + the job code exercises edge cases around run completion and state handling. + + To add a new test case, define a function that returns the job body string, + then call `run_single_step_workflow/2` with your webhook input and job body. + Assert against the returned `%{run: run, step: step, work_order: work_order}`. + """ + use LightningWeb.ConnCase, async: false + + import Ecto.Query + import Lightning.Factories + import Mox + + alias Lightning.Runs + alias Lightning.Runs.Events + alias Lightning.Invocation + alias Lightning.Repo + alias Lightning.Runtime.RuntimeManager + alias Lightning.WorkOrders + alias Lightning.Workflows.Snapshot + + setup :set_mox_from_context + setup :verify_on_exit! + + setup_all do + Mox.stub_with(Lightning.MockConfig, Lightning.Config.API) + Mox.stub_with(LightningMock, Lightning.API) + + Mox.stub_with( + Lightning.Extensions.MockUsageLimiter, + Lightning.Extensions.UsageLimiter + ) + + start_runtime_manager() + + uri = LightningWeb.Endpoint.url() + + %{uri: uri} + end + + setup [:register_and_log_in_superuser, :stub_rate_limiter_ok] + + # --------------------------------------------------------------------------- + # Test cases — each one only needs to supply the job body and assertions. + # --------------------------------------------------------------------------- + + @tag :integration + @tag timeout: 30_000 + test "job that returns 42 completes successfully", %{uri: uri} do + job_body = """ + fn(state => { + // if you return a non-map, it used to cause lost + // now it gets wrapped in a map ¯\_(ツ)_/¯ + return 42; + }); + """ + + result = run_single_step_workflow(uri, %{"x" => 1}, job_body) + + assert result.run.state == :success + assert result.step.exit_reason == "success" + + assert %{"value" => 42} = + select_dataclip_body(result.step.output_dataclip_id) + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + @doc """ + Creates a single-step webhook workflow, fires it, waits for completion, + and returns a map with the fully-loaded run, step, and work_order. + + ## Parameters + - `uri` — the endpoint base URL (from setup_all) + - `webhook_body` — the JSON map to POST to the webhook + - `job_body` — the JavaScript expression string for the job + - `opts` — keyword list of options: + - `:adaptor` — adaptor string (default `"@openfn/language-common@latest"`) + - `:timeout` — ms to wait for run completion (default `15_000`) + + ## Returns + %{run: run, step: step, work_order: work_order} + """ + def run_single_step_workflow(uri, webhook_body, job_body, opts \\ []) do + adaptor = Keyword.get(opts, :adaptor, "@openfn/language-common@latest") + timeout = Keyword.get(opts, :timeout, 15_000) + + project = insert(:project) + + webhook_trigger = build(:trigger, type: :webhook, enabled: true) + + job = + build(:job, + adaptor: adaptor, + body: job_body, + name: "test-job" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(webhook_trigger) + |> with_job(job) + |> with_edge({webhook_trigger, job}, condition_type: :always) + |> insert() + + Snapshot.create(workflow) + + # Fire the webhook + response = + Tesla.client( + [ + {Tesla.Middleware.BaseUrl, uri}, + Tesla.Middleware.JSON + ], + {Tesla.Adapter.Finch, name: Lightning.Finch} + ) + |> Tesla.post!("/i/#{webhook_trigger.id}", webhook_body) + + assert response.status == 200 + assert %{"work_order_id" => workorder_id} = response.body + + assert %{runs: [run]} = + WorkOrders.get(workorder_id, include: [:runs]) + + # Subscribe and wait for completion + Events.subscribe(run) + run_id = run.id + + assert_receive %Events.RunUpdated{run: %{id: ^run_id, state: final_state}} + when final_state in [ + :success, + :failed, + :crashed, + :killed, + :lost + ], + timeout + + # Reload with associations + run = + Runs.get(run.id, + include: [steps: [:job, :log_lines], work_order: :workflow] + ) + + [step] = run.steps + + work_order = WorkOrders.get(workorder_id) + + %{run: run, step: step, work_order: work_order} + end + + defp start_runtime_manager(_context \\ nil) do + opts = + Application.get_env(:lightning, RuntimeManager) + |> Keyword.merge( + name: LostRunRedTeamRuntimeManager, + start: true, + endpoint: LightningWeb.Endpoint, + log: :debug, + port: Enum.random(2223..3333), + worker_secret: Lightning.Config.worker_secret() + ) + + start_supervised!({RuntimeManager, opts}, restart: :transient) + end + + defp select_dataclip_body(uuid) do + from(d in Invocation.Dataclip, + where: d.id == ^uuid, + select: d.body + ) + |> Repo.one!() + end +end From 9cbfd4332e1d63b02b50b879f763d9001bf6f6f4 Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Thu, 16 Apr 2026 07:47:58 +0200 Subject: [PATCH 3/4] rename --- .../{red_team_test.exs => workflow_edge_cases_test.exs} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename test/integration/{red_team_test.exs => workflow_edge_cases_test.exs} (99%) diff --git a/test/integration/red_team_test.exs b/test/integration/workflow_edge_cases_test.exs similarity index 99% rename from test/integration/red_team_test.exs rename to test/integration/workflow_edge_cases_test.exs index 0e3e9c77d9..2cf3c4d059 100644 --- a/test/integration/red_team_test.exs +++ b/test/integration/workflow_edge_cases_test.exs @@ -1,4 +1,4 @@ -defmodule Lightning.RedTeamTest do +defmodule Lightning.WorkflowEdgeCasesTest do @moduledoc """ Integration tests for "lost run" scenarios — single-step workflows where the job code exercises edge cases around run completion and state handling. From 037dcda536550b19b019643fcf77409dc6a22f2f Mon Sep 17 00:00:00 2001 From: Taylor Downs Date: Thu, 16 Apr 2026 07:54:48 +0200 Subject: [PATCH 4/4] add oom kill test --- test/integration/workflow_edge_cases_test.exs | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/test/integration/workflow_edge_cases_test.exs b/test/integration/workflow_edge_cases_test.exs index 2cf3c4d059..0f16de7f0f 100644 --- a/test/integration/workflow_edge_cases_test.exs +++ b/test/integration/workflow_edge_cases_test.exs @@ -47,7 +47,7 @@ defmodule Lightning.WorkflowEdgeCasesTest do # --------------------------------------------------------------------------- @tag :integration - @tag timeout: 30_000 + @tag timeout: 10_000 test "job that returns 42 completes successfully", %{uri: uri} do job_body = """ fn(state => { @@ -66,6 +66,25 @@ defmodule Lightning.WorkflowEdgeCasesTest do select_dataclip_body(result.step.output_dataclip_id) end + @tag :integration + @tag timeout: 10_000 + test "job that uses too much memory very quickly is properly killed", %{ + uri: uri + } do + job_body = """ + fn(state => { + const arr = []; + while (true) { arr.push(new Array(1e6).fill('x')); } + return state; + }); + """ + + result = run_single_step_workflow(uri, %{"x" => 1}, job_body) + + assert result.run.state == :killed + assert result.run.error_type == "OOMError" + end + # --------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------