diff --git a/lib/gen_stage/dispatchers/broadcast_dispatcher.ex b/lib/gen_stage/dispatchers/broadcast_dispatcher.ex index 8996d83..5456a0f 100644 --- a/lib/gen_stage/dispatchers/broadcast_dispatcher.ex +++ b/lib/gen_stage/dispatchers/broadcast_dispatcher.ex @@ -57,7 +57,7 @@ defmodule GenStage.BroadcastDispatcher do @doc false def init(_opts) do - {:ok, {[], 0, MapSet.new()}} + {:ok, {[], 0, 0, MapSet.new()}} end @doc false @@ -67,7 +67,7 @@ defmodule GenStage.BroadcastDispatcher do end @doc false - def subscribe(opts, {pid, ref}, {demands, waiting, subscribed_processes}) do + def subscribe(opts, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do selector = validate_selector(opts) if subscribed?(subscribed_processes, pid) do @@ -80,43 +80,42 @@ defmodule GenStage.BroadcastDispatcher do else subscribed_processes = add_subscriber(subscribed_processes, pid) demands = adjust_demand(-waiting, demands) - {:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, subscribed_processes}} + {:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, requested, subscribed_processes}} end end @doc false - def cancel({pid, ref}, {demands, waiting, subscribed_processes}) do + def cancel({pid, ref}, {demands, waiting, requested, subscribed_processes}) do subscribed_processes = delete_subscriber(subscribed_processes, pid) case delete_demand(ref, demands) do [] -> - {:ok, 0, {[], 0, subscribed_processes}} + {:ok, 0, {[], 0, requested, subscribed_processes}} demands -> # Since we may have removed the process we were waiting on, # cancellation may actually generate demand! - new_min = get_min(demands) - demands = adjust_demand(new_min, demands) - {:ok, new_min, {demands, waiting + new_min, subscribed_processes}} + {demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested) + {:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}} end end @doc false - def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do + def ask(counter, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do {current, selector, demands} = pop_demand(ref, demands) demands = add_demand(current + counter, pid, ref, selector, demands) - new_min = get_min(demands) - demands = adjust_demand(new_min, demands) - {:ok, new_min, {demands, waiting + new_min, subscribed_processes}} + {demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested) + {:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}} end @doc false - def dispatch(events, _length, {demands, 0, subscribed_processes}) do - {:ok, events, {demands, 0, subscribed_processes}} + def dispatch(events, _length, {demands, 0, requested, subscribed_processes}) do + {:ok, events, {demands, 0, requested, subscribed_processes}} end - def dispatch(events, length, {demands, waiting, subscribed_processes}) do - {deliver_now, deliver_later, waiting} = split_events(events, length, waiting) + def dispatch(events, length, {demands, waiting, requested, subscribed_processes}) do + {deliver_now, deliver_later, waiting, deliver_now_count} = + split_events(events, length, waiting) for {_, pid, ref, selector} <- demands do selected = @@ -133,7 +132,7 @@ defmodule GenStage.BroadcastDispatcher do :ok end - {:ok, deliver_later, {demands, waiting, subscribed_processes}} + {:ok, deliver_later, {demands, waiting, requested - deliver_now_count, subscribed_processes}} end defp filter_and_count(messages, nil) do @@ -176,12 +175,21 @@ defmodule GenStage.BroadcastDispatcher do do: demands |> Enum.reduce(acc, fn {val, _, _, _}, acc -> min(val, acc) end) |> max(0) defp split_events(events, length, counter) when length <= counter do - {events, [], counter - length} + {events, [], counter - length, length} end defp split_events(events, _length, counter) do {now, later} = Enum.split(events, counter) - {now, later, 0} + {now, later, 0, counter} + end + + defp sync_demands(demands, waiting, requested) do + new_min = get_min(demands) + demands = adjust_demand(new_min, demands) + waiting = waiting + new_min + request = max(0, waiting - requested) + requested = requested + request + {demands, request, waiting, requested} end defp adjust_demand(0, demands) do diff --git a/test/gen_stage/broadcast_dispatcher_test.exs b/test/gen_stage/broadcast_dispatcher_test.exs index 72a5acd..413dd4c 100644 --- a/test/gen_stage/broadcast_dispatcher_test.exs +++ b/test/gen_stage/broadcast_dispatcher_test.exs @@ -4,7 +4,7 @@ defmodule GenStage.BroadcastDispatcherTest do alias GenStage.BroadcastDispatcher, as: D defp dispatcher(opts) do - {:ok, {[], 0, _subscribers} = state} = D.init(opts) + {:ok, {[], 0, 0, _subscribers} = state} = D.init(opts) state end @@ -15,10 +15,10 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.new([pid]) {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) - assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers} + assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers} {:ok, 0, disp} = D.cancel({pid, ref}, disp) - assert disp == {[], 0, MapSet.new()} + assert disp == {[], 0, 0, MapSet.new()} end test "subscribes, asks, and cancels" do @@ -28,17 +28,17 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.new([pid]) {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) - assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers} + assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers} {:ok, 10, disp} = D.ask(10, {pid, ref}, disp) - assert disp == {[{0, pid, ref, nil}], 10, expected_subscribers} + assert disp == {[{0, pid, ref, nil}], 10, 10, expected_subscribers} {:ok, 0, disp} = D.cancel({pid, ref}, disp) - assert disp == {[], 0, MapSet.new()} + assert disp == {[], 0, 10, MapSet.new()} # Now attempt to dispatch with no consumers {:ok, [1, 2, 3], disp} = D.dispatch([1, 2, 3], 3, disp) - assert disp == {[], 0, MapSet.new()} + assert disp == {[], 0, 10, MapSet.new()} end test "multiple subscriptions with early demand" do @@ -51,23 +51,23 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.new([pid1]) {:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp) - assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers} {:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp) - assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers} + assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers} expected_subscribers = MapSet.put(expected_subscribers, pid2) {:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp) - assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers} expected_subscribers = MapSet.delete(expected_subscribers, pid1) {:ok, 0, disp} = D.cancel({pid1, ref1}, disp) - assert disp == {[{0, pid2, ref2, nil}], 0, expected_subscribers} + assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers} - {:ok, 10, disp} = D.ask(10, {pid2, ref2}, disp) - assert disp == {[{0, pid2, ref2, nil}], 10, expected_subscribers} + {:ok, 0, disp} = D.ask(10, {pid2, ref2}, disp) + assert disp == {[{0, pid2, ref2, nil}], 10, 10, expected_subscribers} end test "multiple subscriptions with late demand" do @@ -80,23 +80,23 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.new([pid1]) {:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp) - assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers} expected_subscribers = MapSet.put(expected_subscribers, pid2) {:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp) - assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers} {:ok, 0, disp} = D.ask(10, {pid1, ref1}, disp) - assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, expected_subscribers} + assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 0, expected_subscribers} expected_subscribers = MapSet.delete(expected_subscribers, pid2) {:ok, 10, disp} = D.cancel({pid2, ref2}, disp) - assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers} + assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers} {:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp) - assert disp == {[{0, pid1, ref1, nil}], 20, expected_subscribers} + assert disp == {[{0, pid1, ref1, nil}], 20, 20, expected_subscribers} end test "subscribes, asks and dispatches to multiple consumers" do @@ -116,11 +116,11 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.new([pid1, pid2]) - assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, expected_subscribers} + assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, 2, expected_subscribers} # One batch fits all {:ok, [], disp} = D.dispatch([:a, :b], 2, disp) - assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, 0, expected_subscribers} assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b]} assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b]} @@ -129,13 +129,13 @@ defmodule GenStage.BroadcastDispatcherTest do {:ok, 1, disp} = D.ask(2, {pid2, ref2}, disp) {:ok, [:d], disp} = D.dispatch([:c, :d], 2, disp) - assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers} assert_receive {:"$gen_consumer", {_, ^ref1}, [:c]} assert_receive {:"$gen_consumer", {_, ^ref2}, [:c]} # A batch with no demand {:ok, [:d], disp} = D.dispatch([:d], 1, disp) - assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers} + assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers} refute_received {:"$gen_consumer", {_, _}, _} # Add a late subscriber @@ -146,17 +146,17 @@ defmodule GenStage.BroadcastDispatcherTest do expected_subscribers = MapSet.put(expected_subscribers, pid3) assert disp == - {[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0, + {[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0, 1, expected_subscribers} # Even out {:ok, 0, disp} = D.ask(2, {pid1, ref1}, disp) {:ok, 0, disp} = D.ask(2, {pid2, ref2}, disp) - {:ok, 3, disp} = D.ask(3, {pid3, ref3}, disp) + {:ok, 2, disp} = D.ask(3, {pid3, ref3}, disp) {:ok, [], disp} = D.dispatch([:d, :e, :f], 3, disp) assert disp == - {[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, + {[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers} assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f]} @@ -164,6 +164,96 @@ defmodule GenStage.BroadcastDispatcherTest do assert_receive {:"$gen_consumer", {_, ^ref3}, [:d, :e, :f]} end + test "subscribes, asks, dispatches, and repeats" do + pid1 = spawn_forwarder() + pid2 = spawn_forwarder() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp) + expected_subscribers = MapSet.new([pid1]) + assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers} + + {:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp) + assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers} + + {:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp) + assert disp == {[{0, pid1, ref1, nil}], 7, 7, expected_subscribers} + + assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b, :c]} + + {:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp) + expected_subscribers = MapSet.put(expected_subscribers, pid2) + assert disp == {[{0, pid2, ref2, nil}, {7, pid1, ref1, nil}], 0, 7, expected_subscribers} + + {:ok, 0, disp} = D.ask(20, {pid2, ref2}, disp) + assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 7, 7, expected_subscribers} + + {:ok, [], disp} = D.dispatch([:d, :e, :f, :g, :h, :i, :j], 7, disp) + assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers} + + assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f, :g, :h, :i, :j]} + assert_receive {:"$gen_consumer", {_, ^ref2}, [:d, :e, :f, :g, :h, :i, :j]} + end + + test "subscribes, asks, cancels, and dispatcher reuses events for another subscriber" do + pid1 = spawn_forwarder() + pid2 = spawn_forwarder() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp) + expected_subscribers = MapSet.new([pid1]) + assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers} + + {:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp) + assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers} + + {:ok, 0, disp} = D.cancel({pid1, ref1}, disp) + expected_subscribers = MapSet.delete(expected_subscribers, pid1) + assert disp == {[], 0, 10, expected_subscribers} + + {:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp) + expected_subscribers = MapSet.put(expected_subscribers, pid2) + assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers} + + {:ok, 0, disp} = D.ask(5, {pid2, ref2}, disp) + assert disp == {[{0, pid2, ref2, nil}], 5, 10, expected_subscribers} + + {:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp) + assert disp == {[{0, pid2, ref2, nil}], 2, 7, expected_subscribers} + + assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b, :c]} + end + + test "cancels blocking subscriber while there is already a requested demand" do + pid1 = spawn_forwarder() + pid2 = spawn_forwarder() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp) + expected_subscribers = MapSet.new([pid1]) + assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers} + + {:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp) + assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers} + + {:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp) + expected_subscribers = MapSet.put(expected_subscribers, pid2) + assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers} + + {:ok, 0, disp} = D.ask(5, {pid1, ref1}, disp) + assert disp == {[{15, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 10, expected_subscribers} + + {:ok, 5, disp} = D.cancel({pid2, ref2}, disp) + expected_subscribers = MapSet.delete(expected_subscribers, pid2) + assert disp == {[{0, pid1, ref1, nil}], 15, 15, expected_subscribers} + end + test "subscribing with a selector function" do pid1 = spawn_forwarder() pid2 = spawn_forwarder() @@ -175,7 +265,7 @@ defmodule GenStage.BroadcastDispatcherTest do {:ok, 0, disp} = D.subscribe([selector: selector1], {pid1, ref1}, disp) {:ok, 0, disp} = D.subscribe([selector: selector2], {pid2, ref2}, disp) - assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, _} = disp + assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, 0, _} = disp {:ok, 0, disp} = D.ask(4, {pid2, ref2}, disp) {:ok, 4, disp} = D.ask(4, {pid1, ref1}, disp) @@ -219,7 +309,7 @@ defmodule GenStage.BroadcastDispatcherTest do assert ExUnit.CaptureLog.capture_log(fn -> assert {:error, _} = D.subscribe([], {pid, ref2}, disp) - assert disp == {[{0, pid, ref1, nil}], 0, expected_subscribers} + assert disp == {[{0, pid, ref1, nil}], 0, 0, expected_subscribers} end) =~ "already registered" end diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index dce3e4d..8f6c30a 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -105,6 +105,29 @@ defmodule GenStageTest do end end + defmodule NothingProducer do + @moduledoc """ + A producer that does not produce any events. + + It sends a message when there is demand to the pid in state. + """ + + use GenStage + + def start_link(init, opts \\ []) do + GenStage.start_link(__MODULE__, init, opts) + end + + def init(init) do + init + end + + def handle_demand(demand, pid) do + send(pid, {:demand, demand}) + {:noreply, [], pid} + end + end + defmodule Doubler do @moduledoc """ Multiples every event by two. @@ -431,6 +454,22 @@ defmodule GenStageTest do assert_receive {:consumed, [1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009]} assert_receive {:consumed, [1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009]} end + + test "with shared (broadcast) demand does not result in duplicate upstream demand" do + {:ok, producer} = + NothingProducer.start_link({:producer, self(), dispatcher: GenStage.BroadcastDispatcher}) + + {:ok, consumer1} = Sleeper.start_link({:consumer, self()}) + {:ok, consumer2} = Sleeper.start_link({:consumer, self()}) + + {:ok, _} = GenStage.sync_subscribe(consumer1, to: producer, max_demand: 10, min_demand: 0) + {:ok, _} = GenStage.sync_subscribe(consumer2, to: producer, max_demand: 20, min_demand: 0) + + # The demand of 10 (common demand of the two consumers) should be sent upstream only once + # (until the consumers demand another batch, which is never in the case of Sleepers). + assert_receive {:demand, 10} + refute_receive {:demand, _} + end end describe "producer-producer_consumer-consumer demand" do