Skip to content

Commit 3efc072

Browse files
committed
Lock ingest processing during session transfer
1 parent 702704b commit 3efc072

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

lib/plausible/session/transfer.ex

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ defmodule Plausible.Session.Transfer do
1818
@cmd_dump_cache :get
1919
@cmd_takeover_done :done
2020

21+
# NOTE: Timeout must be higher than respective WriteBuffer's
22+
# flush interval + some margin for flush alone.
23+
@lock_acquire_timeout :timer.seconds(8000)
24+
2125
def telemetry_event, do: [:plausible, :sessions, :takeover]
2226

2327
@doc """
@@ -109,7 +113,15 @@ defmodule Plausible.Session.Transfer do
109113
case request do
110114
{@cmd_list_cache_names, session_version} ->
111115
if session_version == session_version() and attempted?(parent) do
112-
# NOTE: block ingest here
116+
# Blocking ingest before transfer
117+
Task.await_many(
118+
[
119+
Task.async(fn -> Plausible.Event.WriteBuffer.lock(@lock_acquire_timeout) end),
120+
Task.async(fn -> Plausible.Session.WriteBuffer.lock(@lock_acquire_timeout) end)
121+
],
122+
@lock_acquire_timeout + :timer.seconds(1)
123+
)
124+
113125
Cache.Adapter.get_names(:sessions)
114126
else
115127
[]
@@ -119,9 +131,11 @@ defmodule Plausible.Session.Transfer do
119131
Cache.Adapter.cache2list(cache)
120132

121133
@cmd_takeover_done ->
122-
# NOTE: unblock ingest here
123134
# Wipe the cache after transfer is complete to avoid making the transferred cache stale
124135
Cache.Adapter.wipe(:sessions)
136+
# Unblock ingest after trasnsfer is done
137+
Plausible.Event.WriteBuffer.unlock()
138+
Plausible.Session.WriteBuffer.unlock()
125139
:counters.add(given_counter, 1, 1)
126140
end
127141
end

test/plausible/session/transfer_test.exs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,20 @@ defmodule Plausible.Session.TransferTest do
1212

1313
Enum.each(1..250, fn _ -> process_event(old, build(:event, name: "pageview")) end)
1414

15+
:ok = :peer.call(old, Plausible.Session.WriteBuffer, :flush, [])
16+
:ok = :peer.call(old, Plausible.Event.WriteBuffer, :flush, [])
17+
18+
old_sessions_sorted = all_sessions_sorted(old)
19+
1520
new = start_another_plausible(tmp_dir)
21+
22+
:ok = :peer.call(old, Plausible.Session.WriteBuffer, :flush, [])
23+
:ok = :peer.call(old, Plausible.Event.WriteBuffer, :flush, [])
24+
1625
await_transfer(new)
1726

18-
assert all_sessions_sorted(new) == all_sessions_sorted(old)
27+
assert all_sessions_sorted(old) == []
28+
assert all_sessions_sorted(new) == old_sessions_sorted
1929
end
2030

2131
defp start_another_plausible(tmp_dir) do

0 commit comments

Comments
 (0)