Skip to content
Draft
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ priv/openfn
.dev.env
.dev.override.env
.test.override.env
.envrc

worktrees
.docker-cache
Expand Down
24 changes: 18 additions & 6 deletions benchmarking/channels/mock_destination.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,33 @@ defmodule MockDestination.Body do
"""

def generate(body_size) when body_size <= 1024 do
# Build the envelope once with an empty padding to measure overhead.
envelope =
Jason.encode!(%{
ok: true,
server: "mock_destination",
timestamp: DateTime.to_iso8601(DateTime.utc_now()),
padding: ""
})

overhead = byte_size(envelope)
padding_len = max(body_size - overhead, 0)

json =
Jason.encode!(%{
ok: true,
server: "mock_destination",
timestamp: DateTime.to_iso8601(DateTime.utc_now()),
padding: String.duplicate("x", max(body_size - 80, 0))
padding: String.duplicate("x", padding_len)
})

# Trim or pad to reach the target size exactly.
byte_size = byte_size(json)
# Fine-tune to the exact target size.
actual = byte_size(json)

cond do
byte_size == body_size -> json
byte_size > body_size -> binary_part(json, 0, body_size)
true -> json <> String.duplicate(" ", body_size - byte_size)
actual == body_size -> json
actual > body_size -> binary_part(json, 0, body_size)
true -> json <> String.duplicate(" ", body_size - actual)
end
end

Expand Down
26 changes: 26 additions & 0 deletions lib/lightning/channels.ex
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,30 @@ defmodule Lightning.Channels do

{total, nil}
end

@doc """
Returns a channel request with preloads, scoped to the given project.

Returns `nil` if the request doesn't exist, belongs to a different project,
or the ID is not a valid UUID.

Preloads: `channel_events`, `channel`, `channel_snapshot`.
"""
@spec get_channel_request_for_project(Ecto.UUID.t(), String.t()) ::
ChannelRequest.t() | nil
def get_channel_request_for_project(project_id, request_id) do
case Ecto.UUID.cast(request_id) do
{:ok, uuid} ->
from(cr in ChannelRequest,
join: c in Channel,
on: cr.channel_id == c.id,
where: cr.id == ^uuid and c.project_id == ^project_id,
preload: [:channel_events, :channel, :channel_snapshot]
)
|> Repo.one()

:error ->
nil
end
end
end
76 changes: 52 additions & 24 deletions lib/lightning/channels/channel_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@ defmodule Lightning.Channels.ChannelEvent do
type: :destination_response | :error,
request_method: String.t() | nil,
request_path: String.t() | nil,
request_headers: String.t() | nil,
request_query_string: String.t() | nil,
request_headers: list() | nil,
request_body_preview: String.t() | nil,
request_body_hash: String.t() | nil,
request_body_size: integer() | nil,
response_status: integer() | nil,
response_headers: String.t() | nil,
response_headers: list() | nil,
response_body_preview: String.t() | nil,
response_body_hash: String.t() | nil,
latency_ms: integer() | nil,
ttfb_ms: integer() | nil,
response_body_size: integer() | nil,
latency_us: integer() | nil,
ttfb_us: integer() | nil,
request_send_us: integer() | nil,
response_duration_us: integer() | nil,
queue_us: integer() | nil,
connect_us: integer() | nil,
reused_connection: boolean() | nil,
error_message: String.t() | nil,
inserted_at: DateTime.t()
}
Expand All @@ -41,17 +49,25 @@ defmodule Lightning.Channels.ChannelEvent do

field :request_method, :string
field :request_path, :string
field :request_headers, :string
field :request_query_string, :string
field :request_headers, {:array, {:array, :string}}
field :request_body_preview, :string
field :request_body_hash, :string
field :request_body_size, :integer

field :response_status, :integer
field :response_headers, :string
field :response_headers, {:array, {:array, :string}}
field :response_body_preview, :string
field :response_body_hash, :string
field :response_body_size, :integer

field :latency_ms, :integer
field :ttfb_ms, :integer
field :latency_us, :integer
field :ttfb_us, :integer
field :request_send_us, :integer
field :response_duration_us, :integer
field :queue_us, :integer
field :connect_us, :integer
field :reused_connection, :boolean
field :error_message, :string

belongs_to :channel_request, ChannelRequest
Expand All @@ -61,22 +77,34 @@ defmodule Lightning.Channels.ChannelEvent do

def changeset(event, attrs) do
event
|> cast(attrs, [
:channel_request_id,
:type,
:request_method,
:request_path,
:request_headers,
:request_body_preview,
:request_body_hash,
:response_status,
:response_headers,
:response_body_preview,
:response_body_hash,
:latency_ms,
:ttfb_ms,
:error_message
])
|> cast(
attrs,
[
:channel_request_id,
:type,
:request_method,
:request_path,
:request_query_string,
:request_headers,
:request_body_preview,
:request_body_hash,
:request_body_size,
:response_status,
:response_headers,
:response_body_preview,
:response_body_hash,
:response_body_size,
:latency_us,
:ttfb_us,
:request_send_us,
:response_duration_us,
:queue_us,
:connect_us,
:reused_connection,
:error_message
],
empty_values: []
)
|> validate_required([:channel_request_id, :type])
|> assoc_constraint(:channel_request)
end
Expand Down
7 changes: 7 additions & 0 deletions lib/lightning/channels/channel_request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ defmodule Lightning.Channels.ChannelRequest do
alias Lightning.Channels.Channel
alias Lightning.Channels.ChannelEvent
alias Lightning.Channels.ChannelSnapshot
alias Lightning.Workflows.WebhookAuthMethod

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
channel_id: Ecto.UUID.t(),
channel_snapshot_id: Ecto.UUID.t(),
request_id: String.t(),
client_identity: String.t() | nil,
client_webhook_auth_method_id: Ecto.UUID.t() | nil,
client_auth_type: String.t() | nil,
state: :pending | :success | :failed | :timeout | :error,
started_at: DateTime.t(),
completed_at: DateTime.t() | nil
Expand All @@ -23,6 +26,7 @@ defmodule Lightning.Channels.ChannelRequest do
schema "channel_requests" do
field :request_id, :string
field :client_identity, :string
field :client_auth_type, :string

field :state, Ecto.Enum,
values: [:pending, :success, :failed, :timeout, :error]
Expand All @@ -32,6 +36,7 @@ defmodule Lightning.Channels.ChannelRequest do

belongs_to :channel, Channel
belongs_to :channel_snapshot, ChannelSnapshot
belongs_to :client_webhook_auth_method, WebhookAuthMethod

has_many :channel_events, ChannelEvent
end
Expand All @@ -43,6 +48,8 @@ defmodule Lightning.Channels.ChannelRequest do
:channel_snapshot_id,
:request_id,
:client_identity,
:client_webhook_auth_method_id,
:client_auth_type,
:state,
:started_at,
:completed_at
Expand Down
25 changes: 16 additions & 9 deletions lib/lightning/channels/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ defmodule Lightning.Channels.Handler do
channel_snapshot_id: state.snapshot.id,
request_id: state.request_id,
client_identity: state.client_identity,
client_webhook_auth_method_id:
Map.get(state, :client_webhook_auth_method_id),
client_auth_type: Map.get(state, :client_auth_type),
state: :pending,
started_at: state.started_at
}
Expand Down Expand Up @@ -105,15 +108,23 @@ defmodule Lightning.Channels.Handler do
type: event_type,
request_method: state.request_method,
request_path: state.request_path,
request_query_string: Map.get(state, :query_string),
request_headers: encode_headers(state.request_headers),
request_body_preview: get_in(result, [:request_observation, :preview]),
request_body_hash: get_in(result, [:request_observation, :hash]),
request_body_size: get_in(result, [:request_observation, :size]),
response_status: result.status,
response_headers: encode_headers(Map.get(state, :response_headers)),
response_body_preview: get_in(result, [:response_observation, :preview]),
response_body_hash: get_in(result, [:response_observation, :hash]),
latency_ms: div(result.duration_us, 1000),
ttfb_ms: state |> Map.get(:ttfb_us) |> maybe_div(1000),
response_body_size: get_in(result, [:response_observation, :size]),
latency_us: result.timing.total_us,
ttfb_us: Map.get(state, :ttfb_us),
request_send_us: get_in(result, [:timing, :send_us]),
response_duration_us: get_in(result, [:timing, :recv_us]),
queue_us: get_in(result, [:timing, :queue_us]),
connect_us: get_in(result, [:timing, :connect_us]),
reused_connection: get_in(result, [:timing, :reused_connection]),
error_message: if(result.error, do: classify_error(result.error))
}

Expand Down Expand Up @@ -175,12 +186,11 @@ defmodule Lightning.Channels.Handler do

defp encode_headers(nil), do: nil

# Encodes as array-of-pairs rather than a map because HTTP allows
# Returns as array-of-pairs rather than a map because HTTP allows
# duplicate header keys (e.g. multiple Set-Cookie headers).
# Stored as native jsonb in the database.
defp encode_headers(headers) do
headers
|> Enum.map(fn {k, v} -> [k, v] end)
|> Jason.encode!()
Enum.map(headers, fn {k, v} -> [k, v] end)
end

defp classify_error({:timeout, :connect_timeout}), do: "connect_timeout"
Expand All @@ -192,7 +202,4 @@ defmodule Lightning.Channels.Handler do
do: Atom.to_string(reason)

defp classify_error(error), do: inspect(error)

defp maybe_div(nil, _), do: nil
defp maybe_div(us, divisor), do: div(us, divisor)
end
Loading