Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions lib/gen_stage/dispatchers/broadcast_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule GenStage.BroadcastDispatcher do

@doc false
def init(_opts) do
{:ok, {[], 0, MapSet.new()}}
{:ok, {[], 0, 0, MapSet.new()}}
end

@doc false
Expand All @@ -67,7 +67,7 @@ defmodule GenStage.BroadcastDispatcher do
end

@doc false
def subscribe(opts, {pid, ref}, {demands, waiting, subscribed_processes}) do
def subscribe(opts, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do
selector = validate_selector(opts)

if subscribed?(subscribed_processes, pid) do
Expand All @@ -80,43 +80,42 @@ defmodule GenStage.BroadcastDispatcher do
else
subscribed_processes = add_subscriber(subscribed_processes, pid)
demands = adjust_demand(-waiting, demands)
{:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, subscribed_processes}}
{:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, requested, subscribed_processes}}
end
end

@doc false
def cancel({pid, ref}, {demands, waiting, subscribed_processes}) do
def cancel({pid, ref}, {demands, waiting, requested, subscribed_processes}) do
subscribed_processes = delete_subscriber(subscribed_processes, pid)

case delete_demand(ref, demands) do
[] ->
{:ok, 0, {[], 0, subscribed_processes}}
{:ok, 0, {[], 0, requested, subscribed_processes}}

demands ->
# Since we may have removed the process we were waiting on,
# cancellation may actually generate demand!
new_min = get_min(demands)
demands = adjust_demand(new_min, demands)
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
{demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested)
{:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}}
end
end

@doc false
def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do
def ask(counter, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do
{current, selector, demands} = pop_demand(ref, demands)
demands = add_demand(current + counter, pid, ref, selector, demands)
new_min = get_min(demands)
demands = adjust_demand(new_min, demands)
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
{demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested)
{:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}}
end

@doc false
def dispatch(events, _length, {demands, 0, subscribed_processes}) do
{:ok, events, {demands, 0, subscribed_processes}}
def dispatch(events, _length, {demands, 0, requested, subscribed_processes}) do
{:ok, events, {demands, 0, requested, subscribed_processes}}
end

def dispatch(events, length, {demands, waiting, subscribed_processes}) do
{deliver_now, deliver_later, waiting} = split_events(events, length, waiting)
def dispatch(events, length, {demands, waiting, requested, subscribed_processes}) do
{deliver_now, deliver_later, waiting, deliver_now_count} =
split_events(events, length, waiting)

for {_, pid, ref, selector} <- demands do
selected =
Expand All @@ -133,7 +132,7 @@ defmodule GenStage.BroadcastDispatcher do
:ok
end

{:ok, deliver_later, {demands, waiting, subscribed_processes}}
{:ok, deliver_later, {demands, waiting, requested - deliver_now_count, subscribed_processes}}
end

defp filter_and_count(messages, nil) do
Expand Down Expand Up @@ -176,12 +175,21 @@ defmodule GenStage.BroadcastDispatcher do
do: demands |> Enum.reduce(acc, fn {val, _, _, _}, acc -> min(val, acc) end) |> max(0)

defp split_events(events, length, counter) when length <= counter do
{events, [], counter - length}
{events, [], counter - length, length}
end

defp split_events(events, _length, counter) do
{now, later} = Enum.split(events, counter)
{now, later, 0}
{now, later, 0, counter}
end

defp sync_demands(demands, waiting, requested) do
new_min = get_min(demands)
demands = adjust_demand(new_min, demands)
waiting = waiting + new_min
request = max(0, waiting - requested)
requested = requested + request
{demands, request, waiting, requested}
end

defp adjust_demand(0, demands) do
Expand Down
144 changes: 117 additions & 27 deletions test/gen_stage/broadcast_dispatcher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule GenStage.BroadcastDispatcherTest do
alias GenStage.BroadcastDispatcher, as: D

defp dispatcher(opts) do
{:ok, {[], 0, _subscribers} = state} = D.init(opts)
{:ok, {[], 0, 0, _subscribers} = state} = D.init(opts)
state
end

Expand All @@ -15,10 +15,10 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers = MapSet.new([pid])

{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers}
assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
assert disp == {[], 0, MapSet.new()}
assert disp == {[], 0, 0, MapSet.new()}
end

test "subscribes, asks, and cancels" do
Expand All @@ -28,17 +28,17 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers = MapSet.new([pid])

{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers}
assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid, ref}, disp)
assert disp == {[{0, pid, ref, nil}], 10, expected_subscribers}
assert disp == {[{0, pid, ref, nil}], 10, 10, expected_subscribers}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
assert disp == {[], 0, MapSet.new()}
assert disp == {[], 0, 10, MapSet.new()}

# Now attempt to dispatch with no consumers
{:ok, [1, 2, 3], disp} = D.dispatch([1, 2, 3], 3, disp)
assert disp == {[], 0, MapSet.new()}
assert disp == {[], 0, 10, MapSet.new()}
end

test "multiple subscriptions with early demand" do
Expand All @@ -51,23 +51,23 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers = MapSet.new([pid1])

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}

expected_subscribers = MapSet.put(expected_subscribers, pid2)

{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers}

expected_subscribers = MapSet.delete(expected_subscribers, pid1)

{:ok, 0, disp} = D.cancel({pid1, ref1}, disp)
assert disp == {[{0, pid2, ref2, nil}], 0, expected_subscribers}
assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}], 10, expected_subscribers}
{:ok, 0, disp} = D.ask(10, {pid2, ref2}, disp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that here (and also at line 155) I am reverting the IMO incorrect change in assertions in 5d8193e.

assert disp == {[{0, pid2, ref2, nil}], 10, 10, expected_subscribers}
end

test "multiple subscriptions with late demand" do
Expand All @@ -80,23 +80,23 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers = MapSet.new([pid1])

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}

expected_subscribers = MapSet.put(expected_subscribers, pid2)

{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}

{:ok, 0, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, expected_subscribers}
assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 0, expected_subscribers}

expected_subscribers = MapSet.delete(expected_subscribers, pid2)

{:ok, 10, disp} = D.cancel({pid2, ref2}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 20, expected_subscribers}
assert disp == {[{0, pid1, ref1, nil}], 20, 20, expected_subscribers}
end

test "subscribes, asks and dispatches to multiple consumers" do
Expand All @@ -116,11 +116,11 @@ defmodule GenStage.BroadcastDispatcherTest do

expected_subscribers = MapSet.new([pid1, pid2])

assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, expected_subscribers}
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, 2, expected_subscribers}

# One batch fits all
{:ok, [], disp} = D.dispatch([:a, :b], 2, disp)
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, 0, expected_subscribers}

assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b]}
assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b]}
Expand All @@ -129,13 +129,13 @@ defmodule GenStage.BroadcastDispatcherTest do
{:ok, 1, disp} = D.ask(2, {pid2, ref2}, disp)

{:ok, [:d], disp} = D.dispatch([:c, :d], 2, disp)
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
assert_receive {:"$gen_consumer", {_, ^ref1}, [:c]}
assert_receive {:"$gen_consumer", {_, ^ref2}, [:c]}

# A batch with no demand
{:ok, [:d], disp} = D.dispatch([:d], 1, disp)
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
refute_received {:"$gen_consumer", {_, _}, _}

# Add a late subscriber
Expand All @@ -146,24 +146,114 @@ defmodule GenStage.BroadcastDispatcherTest do
expected_subscribers = MapSet.put(expected_subscribers, pid3)

assert disp ==
{[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0,
{[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0, 1,
expected_subscribers}

# Even out
{:ok, 0, disp} = D.ask(2, {pid1, ref1}, disp)
{:ok, 0, disp} = D.ask(2, {pid2, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid3, ref3}, disp)
{:ok, 2, disp} = D.ask(3, {pid3, ref3}, disp)
{:ok, [], disp} = D.dispatch([:d, :e, :f], 3, disp)

assert disp ==
{[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0,
{[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0,
expected_subscribers}

assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f]}
assert_receive {:"$gen_consumer", {_, ^ref2}, [:d, :e, :f]}
assert_receive {:"$gen_consumer", {_, ^ref3}, [:d, :e, :f]}
end

test "subscribes, asks, dispatches, and repeats" do
pid1 = spawn_forwarder()
pid2 = spawn_forwarder()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
expected_subscribers = MapSet.new([pid1])
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}

{:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp)
assert disp == {[{0, pid1, ref1, nil}], 7, 7, expected_subscribers}

assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b, :c]}

{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
expected_subscribers = MapSet.put(expected_subscribers, pid2)
assert disp == {[{0, pid2, ref2, nil}, {7, pid1, ref1, nil}], 0, 7, expected_subscribers}

{:ok, 0, disp} = D.ask(20, {pid2, ref2}, disp)
assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 7, 7, expected_subscribers}

{:ok, [], disp} = D.dispatch([:d, :e, :f, :g, :h, :i, :j], 7, disp)
assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}

assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f, :g, :h, :i, :j]}
assert_receive {:"$gen_consumer", {_, ^ref2}, [:d, :e, :f, :g, :h, :i, :j]}
end

test "subscribes, asks, cancels, and dispatcher reuses events for another subscriber" do
pid1 = spawn_forwarder()
pid2 = spawn_forwarder()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
expected_subscribers = MapSet.new([pid1])
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}

{:ok, 0, disp} = D.cancel({pid1, ref1}, disp)
expected_subscribers = MapSet.delete(expected_subscribers, pid1)
assert disp == {[], 0, 10, expected_subscribers}

{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
expected_subscribers = MapSet.put(expected_subscribers, pid2)
assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers}

{:ok, 0, disp} = D.ask(5, {pid2, ref2}, disp)
assert disp == {[{0, pid2, ref2, nil}], 5, 10, expected_subscribers}

{:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp)
assert disp == {[{0, pid2, ref2, nil}], 2, 7, expected_subscribers}

assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b, :c]}
end

test "cancels blocking subscriber while there is already a requested demand" do
pid1 = spawn_forwarder()
pid2 = spawn_forwarder()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
expected_subscribers = MapSet.new([pid1])
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}

{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}

{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
expected_subscribers = MapSet.put(expected_subscribers, pid2)
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers}

{:ok, 0, disp} = D.ask(5, {pid1, ref1}, disp)
assert disp == {[{15, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 10, expected_subscribers}

{:ok, 5, disp} = D.cancel({pid2, ref2}, disp)
expected_subscribers = MapSet.delete(expected_subscribers, pid2)
assert disp == {[{0, pid1, ref1, nil}], 15, 15, expected_subscribers}
end

test "subscribing with a selector function" do
pid1 = spawn_forwarder()
pid2 = spawn_forwarder()
Expand All @@ -175,7 +265,7 @@ defmodule GenStage.BroadcastDispatcherTest do

{:ok, 0, disp} = D.subscribe([selector: selector1], {pid1, ref1}, disp)
{:ok, 0, disp} = D.subscribe([selector: selector2], {pid2, ref2}, disp)
assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, _} = disp
assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, 0, _} = disp

{:ok, 0, disp} = D.ask(4, {pid2, ref2}, disp)
{:ok, 4, disp} = D.ask(4, {pid1, ref1}, disp)
Expand Down Expand Up @@ -219,7 +309,7 @@ defmodule GenStage.BroadcastDispatcherTest do

assert ExUnit.CaptureLog.capture_log(fn ->
assert {:error, _} = D.subscribe([], {pid, ref2}, disp)
assert disp == {[{0, pid, ref1, nil}], 0, expected_subscribers}
assert disp == {[{0, pid, ref1, nil}], 0, 0, expected_subscribers}
end) =~ "already registered"
end

Expand Down
Loading