diff --git a/CHANGELOG.md b/CHANGELOG.md index e773671c92..8c638f9c73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,9 @@ and this project adheres to ### Fixed +- Non-map state coming back from the worker would cause a lost run, every time. + Rather than losing these runs that return non-map x's, we now wrap them like + so `{"value": x}` - AI-generated workflows can now be saved when the workflow name collides with an existing workflow or when jobs have duplicate names [#4607](https://github.com/OpenFn/lightning/issues/4607) diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 330695c435..6eeb1b9b4b 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -201,9 +201,17 @@ defmodule Lightning.Runs do @spec complete_run(Run.t(), %{optional(any()) => any()}) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def complete_run(run, params) do + params = wrap_final_state(params) Handlers.CompleteRun.call(run, params) end + defp wrap_final_state(%{"final_state" => state} = params) + when not is_map(state) and not is_nil(state) do + Map.put(params, "final_state", %{"value" => state}) + end + + defp wrap_final_state(params), do: params + @spec update_run(Ecto.Changeset.t(Run.t())) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def update_run(%Ecto.Changeset{data: %Run{}} = changeset) do diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index 164c8c19c0..93763a097a 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -473,10 +473,13 @@ defmodule Lightning.Runs.Handlers do Dataclip.new(%{ id: dataclip_id, project_id: project_id, - body: output_dataclip |> Jason.decode!(), + body: output_dataclip |> Jason.decode!() |> ensure_map(), type: :step_result }) |> Repo.insert() end + + defp ensure_map(%{} = map), do: map + defp ensure_map(value), do: %{"value" => value} end end diff --git a/test/integration/workflow_edge_cases_test.exs b/test/integration/workflow_edge_cases_test.exs new file mode 100644 index 0000000000..0f16de7f0f --- /dev/null +++ b/test/integration/workflow_edge_cases_test.exs @@ -0,0 +1,197 @@ +defmodule Lightning.WorkflowEdgeCasesTest do + @moduledoc """ + Integration tests for "lost run" scenarios — single-step workflows where + the job code exercises edge cases around run completion and state handling. + + To add a new test case, define a function that returns the job body string, + then call `run_single_step_workflow/2` with your webhook input and job body. + Assert against the returned `%{run: run, step: step, work_order: work_order}`. + """ + use LightningWeb.ConnCase, async: false + + import Ecto.Query + import Lightning.Factories + import Mox + + alias Lightning.Runs + alias Lightning.Runs.Events + alias Lightning.Invocation + alias Lightning.Repo + alias Lightning.Runtime.RuntimeManager + alias Lightning.WorkOrders + alias Lightning.Workflows.Snapshot + + setup :set_mox_from_context + setup :verify_on_exit! + + setup_all do + Mox.stub_with(Lightning.MockConfig, Lightning.Config.API) + Mox.stub_with(LightningMock, Lightning.API) + + Mox.stub_with( + Lightning.Extensions.MockUsageLimiter, + Lightning.Extensions.UsageLimiter + ) + + start_runtime_manager() + + uri = LightningWeb.Endpoint.url() + + %{uri: uri} + end + + setup [:register_and_log_in_superuser, :stub_rate_limiter_ok] + + # --------------------------------------------------------------------------- + # Test cases — each one only needs to supply the job body and assertions. + # --------------------------------------------------------------------------- + + @tag :integration + @tag timeout: 10_000 + test "job that returns 42 completes successfully", %{uri: uri} do + job_body = """ + fn(state => { + // if you return a non-map, it used to cause lost + // now it gets wrapped in a map ¯\_(ツ)_/¯ + return 42; + }); + """ + + result = run_single_step_workflow(uri, %{"x" => 1}, job_body) + + assert result.run.state == :success + assert result.step.exit_reason == "success" + + assert %{"value" => 42} = + select_dataclip_body(result.step.output_dataclip_id) + end + + @tag :integration + @tag timeout: 10_000 + test "job that uses too much memory very quickly is properly killed", %{ + uri: uri + } do + job_body = """ + fn(state => { + const arr = []; + while (true) { arr.push(new Array(1e6).fill('x')); } + return state; + }); + """ + + result = run_single_step_workflow(uri, %{"x" => 1}, job_body) + + assert result.run.state == :killed + assert result.run.error_type == "OOMError" + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + @doc """ + Creates a single-step webhook workflow, fires it, waits for completion, + and returns a map with the fully-loaded run, step, and work_order. + + ## Parameters + - `uri` — the endpoint base URL (from setup_all) + - `webhook_body` — the JSON map to POST to the webhook + - `job_body` — the JavaScript expression string for the job + - `opts` — keyword list of options: + - `:adaptor` — adaptor string (default `"@openfn/language-common@latest"`) + - `:timeout` — ms to wait for run completion (default `15_000`) + + ## Returns + %{run: run, step: step, work_order: work_order} + """ + def run_single_step_workflow(uri, webhook_body, job_body, opts \\ []) do + adaptor = Keyword.get(opts, :adaptor, "@openfn/language-common@latest") + timeout = Keyword.get(opts, :timeout, 15_000) + + project = insert(:project) + + webhook_trigger = build(:trigger, type: :webhook, enabled: true) + + job = + build(:job, + adaptor: adaptor, + body: job_body, + name: "test-job" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(webhook_trigger) + |> with_job(job) + |> with_edge({webhook_trigger, job}, condition_type: :always) + |> insert() + + Snapshot.create(workflow) + + # Fire the webhook + response = + Tesla.client( + [ + {Tesla.Middleware.BaseUrl, uri}, + Tesla.Middleware.JSON + ], + {Tesla.Adapter.Finch, name: Lightning.Finch} + ) + |> Tesla.post!("/i/#{webhook_trigger.id}", webhook_body) + + assert response.status == 200 + assert %{"work_order_id" => workorder_id} = response.body + + assert %{runs: [run]} = + WorkOrders.get(workorder_id, include: [:runs]) + + # Subscribe and wait for completion + Events.subscribe(run) + run_id = run.id + + assert_receive %Events.RunUpdated{run: %{id: ^run_id, state: final_state}} + when final_state in [ + :success, + :failed, + :crashed, + :killed, + :lost + ], + timeout + + # Reload with associations + run = + Runs.get(run.id, + include: [steps: [:job, :log_lines], work_order: :workflow] + ) + + [step] = run.steps + + work_order = WorkOrders.get(workorder_id) + + %{run: run, step: step, work_order: work_order} + end + + defp start_runtime_manager(_context \\ nil) do + opts = + Application.get_env(:lightning, RuntimeManager) + |> Keyword.merge( + name: LostRunRedTeamRuntimeManager, + start: true, + endpoint: LightningWeb.Endpoint, + log: :debug, + port: Enum.random(2223..3333), + worker_secret: Lightning.Config.worker_secret() + ) + + start_supervised!({RuntimeManager, opts}, restart: :transient) + end + + defp select_dataclip_body(uuid) do + from(d in Invocation.Dataclip, + where: d.id == ^uuid, + select: d.body + ) + |> Repo.one!() + end +end