Skip to content

Commit c8814d9

Browse files
xxdavidDavid Pavlík
andauthored
Do not duplicate demand in BroadcastDispatcher (#319)
Since 5d8193e, the BroadcastDispatcher can send demand upstream even if its consumers did not request that many events. Because `waiting` is set to 0 when a new subscriber subscribes, the demand is repeated when the new subscriber asks for events. I am fixing this by introducing a new counter in state - the number of requested but not yet delivered events. The dispatcher now does not send upstream demand if the demand can be satisfied by the requested events (and sends a lower demand if it can be satisfied partially). Fixes #318. Co-authored-by: David Pavlík <[email protected]>
1 parent 0f0b680 commit c8814d9

File tree

3 files changed

+183
-46
lines changed

3 files changed

+183
-46
lines changed

lib/gen_stage/dispatchers/broadcast_dispatcher.ex

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ defmodule GenStage.BroadcastDispatcher do
5757

5858
@doc false
5959
def init(_opts) do
60-
{:ok, {[], 0, MapSet.new()}}
60+
{:ok, {[], 0, 0, MapSet.new()}}
6161
end
6262

6363
@doc false
@@ -67,7 +67,7 @@ defmodule GenStage.BroadcastDispatcher do
6767
end
6868

6969
@doc false
70-
def subscribe(opts, {pid, ref}, {demands, waiting, subscribed_processes}) do
70+
def subscribe(opts, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do
7171
selector = validate_selector(opts)
7272

7373
if subscribed?(subscribed_processes, pid) do
@@ -80,43 +80,42 @@ defmodule GenStage.BroadcastDispatcher do
8080
else
8181
subscribed_processes = add_subscriber(subscribed_processes, pid)
8282
demands = adjust_demand(-waiting, demands)
83-
{:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, subscribed_processes}}
83+
{:ok, 0, {add_demand(0, pid, ref, selector, demands), 0, requested, subscribed_processes}}
8484
end
8585
end
8686

8787
@doc false
88-
def cancel({pid, ref}, {demands, waiting, subscribed_processes}) do
88+
def cancel({pid, ref}, {demands, waiting, requested, subscribed_processes}) do
8989
subscribed_processes = delete_subscriber(subscribed_processes, pid)
9090

9191
case delete_demand(ref, demands) do
9292
[] ->
93-
{:ok, 0, {[], 0, subscribed_processes}}
93+
{:ok, 0, {[], 0, requested, subscribed_processes}}
9494

9595
demands ->
9696
# Since we may have removed the process we were waiting on,
9797
# cancellation may actually generate demand!
98-
new_min = get_min(demands)
99-
demands = adjust_demand(new_min, demands)
100-
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
98+
{demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested)
99+
{:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}}
101100
end
102101
end
103102

104103
@doc false
105-
def ask(counter, {pid, ref}, {demands, waiting, subscribed_processes}) do
104+
def ask(counter, {pid, ref}, {demands, waiting, requested, subscribed_processes}) do
106105
{current, selector, demands} = pop_demand(ref, demands)
107106
demands = add_demand(current + counter, pid, ref, selector, demands)
108-
new_min = get_min(demands)
109-
demands = adjust_demand(new_min, demands)
110-
{:ok, new_min, {demands, waiting + new_min, subscribed_processes}}
107+
{demands, upstream_demand, waiting, requested} = sync_demands(demands, waiting, requested)
108+
{:ok, upstream_demand, {demands, waiting, requested, subscribed_processes}}
111109
end
112110

113111
@doc false
114-
def dispatch(events, _length, {demands, 0, subscribed_processes}) do
115-
{:ok, events, {demands, 0, subscribed_processes}}
112+
def dispatch(events, _length, {demands, 0, requested, subscribed_processes}) do
113+
{:ok, events, {demands, 0, requested, subscribed_processes}}
116114
end
117115

118-
def dispatch(events, length, {demands, waiting, subscribed_processes}) do
119-
{deliver_now, deliver_later, waiting} = split_events(events, length, waiting)
116+
def dispatch(events, length, {demands, waiting, requested, subscribed_processes}) do
117+
{deliver_now, deliver_later, waiting, deliver_now_count} =
118+
split_events(events, length, waiting)
120119

121120
for {_, pid, ref, selector} <- demands do
122121
selected =
@@ -133,7 +132,7 @@ defmodule GenStage.BroadcastDispatcher do
133132
:ok
134133
end
135134

136-
{:ok, deliver_later, {demands, waiting, subscribed_processes}}
135+
{:ok, deliver_later, {demands, waiting, requested - deliver_now_count, subscribed_processes}}
137136
end
138137

139138
defp filter_and_count(messages, nil) do
@@ -176,12 +175,21 @@ defmodule GenStage.BroadcastDispatcher do
176175
do: demands |> Enum.reduce(acc, fn {val, _, _, _}, acc -> min(val, acc) end) |> max(0)
177176

178177
defp split_events(events, length, counter) when length <= counter do
179-
{events, [], counter - length}
178+
{events, [], counter - length, length}
180179
end
181180

182181
defp split_events(events, _length, counter) do
183182
{now, later} = Enum.split(events, counter)
184-
{now, later, 0}
183+
{now, later, 0, counter}
184+
end
185+
186+
defp sync_demands(demands, waiting, requested) do
187+
new_min = get_min(demands)
188+
demands = adjust_demand(new_min, demands)
189+
waiting = waiting + new_min
190+
request = max(0, waiting - requested)
191+
requested = requested + request
192+
{demands, request, waiting, requested}
185193
end
186194

187195
defp adjust_demand(0, demands) do

test/gen_stage/broadcast_dispatcher_test.exs

Lines changed: 117 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule GenStage.BroadcastDispatcherTest do
44
alias GenStage.BroadcastDispatcher, as: D
55

66
defp dispatcher(opts) do
7-
{:ok, {[], 0, _subscribers} = state} = D.init(opts)
7+
{:ok, {[], 0, 0, _subscribers} = state} = D.init(opts)
88
state
99
end
1010

@@ -15,10 +15,10 @@ defmodule GenStage.BroadcastDispatcherTest do
1515
expected_subscribers = MapSet.new([pid])
1616

1717
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
18-
assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers}
18+
assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers}
1919

2020
{:ok, 0, disp} = D.cancel({pid, ref}, disp)
21-
assert disp == {[], 0, MapSet.new()}
21+
assert disp == {[], 0, 0, MapSet.new()}
2222
end
2323

2424
test "subscribes, asks, and cancels" do
@@ -28,17 +28,17 @@ defmodule GenStage.BroadcastDispatcherTest do
2828
expected_subscribers = MapSet.new([pid])
2929

3030
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
31-
assert disp == {[{0, pid, ref, nil}], 0, expected_subscribers}
31+
assert disp == {[{0, pid, ref, nil}], 0, 0, expected_subscribers}
3232

3333
{:ok, 10, disp} = D.ask(10, {pid, ref}, disp)
34-
assert disp == {[{0, pid, ref, nil}], 10, expected_subscribers}
34+
assert disp == {[{0, pid, ref, nil}], 10, 10, expected_subscribers}
3535

3636
{:ok, 0, disp} = D.cancel({pid, ref}, disp)
37-
assert disp == {[], 0, MapSet.new()}
37+
assert disp == {[], 0, 10, MapSet.new()}
3838

3939
# Now attempt to dispatch with no consumers
4040
{:ok, [1, 2, 3], disp} = D.dispatch([1, 2, 3], 3, disp)
41-
assert disp == {[], 0, MapSet.new()}
41+
assert disp == {[], 0, 10, MapSet.new()}
4242
end
4343

4444
test "multiple subscriptions with early demand" do
@@ -51,23 +51,23 @@ defmodule GenStage.BroadcastDispatcherTest do
5151
expected_subscribers = MapSet.new([pid1])
5252

5353
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
54-
assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers}
54+
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}
5555

5656
{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
57-
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}
57+
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}
5858

5959
expected_subscribers = MapSet.put(expected_subscribers, pid2)
6060

6161
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
62-
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, expected_subscribers}
62+
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers}
6363

6464
expected_subscribers = MapSet.delete(expected_subscribers, pid1)
6565

6666
{:ok, 0, disp} = D.cancel({pid1, ref1}, disp)
67-
assert disp == {[{0, pid2, ref2, nil}], 0, expected_subscribers}
67+
assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers}
6868

69-
{:ok, 10, disp} = D.ask(10, {pid2, ref2}, disp)
70-
assert disp == {[{0, pid2, ref2, nil}], 10, expected_subscribers}
69+
{:ok, 0, disp} = D.ask(10, {pid2, ref2}, disp)
70+
assert disp == {[{0, pid2, ref2, nil}], 10, 10, expected_subscribers}
7171
end
7272

7373
test "multiple subscriptions with late demand" do
@@ -80,23 +80,23 @@ defmodule GenStage.BroadcastDispatcherTest do
8080
expected_subscribers = MapSet.new([pid1])
8181

8282
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
83-
assert disp == {[{0, pid1, ref1, nil}], 0, expected_subscribers}
83+
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}
8484

8585
expected_subscribers = MapSet.put(expected_subscribers, pid2)
8686

8787
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
88-
assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
88+
assert disp == {[{0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
8989

9090
{:ok, 0, disp} = D.ask(10, {pid1, ref1}, disp)
91-
assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, expected_subscribers}
91+
assert disp == {[{10, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 0, expected_subscribers}
9292

9393
expected_subscribers = MapSet.delete(expected_subscribers, pid2)
9494

9595
{:ok, 10, disp} = D.cancel({pid2, ref2}, disp)
96-
assert disp == {[{0, pid1, ref1, nil}], 10, expected_subscribers}
96+
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}
9797

9898
{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
99-
assert disp == {[{0, pid1, ref1, nil}], 20, expected_subscribers}
99+
assert disp == {[{0, pid1, ref1, nil}], 20, 20, expected_subscribers}
100100
end
101101

102102
test "subscribes, asks and dispatches to multiple consumers" do
@@ -116,11 +116,11 @@ defmodule GenStage.BroadcastDispatcherTest do
116116

117117
expected_subscribers = MapSet.new([pid1, pid2])
118118

119-
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, expected_subscribers}
119+
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 2, 2, expected_subscribers}
120120

121121
# One batch fits all
122122
{:ok, [], disp} = D.dispatch([:a, :b], 2, disp)
123-
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, expected_subscribers}
123+
assert disp == {[{0, pid2, ref2, nil}, {1, pid1, ref1, nil}], 0, 0, expected_subscribers}
124124

125125
assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b]}
126126
assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b]}
@@ -129,13 +129,13 @@ defmodule GenStage.BroadcastDispatcherTest do
129129
{:ok, 1, disp} = D.ask(2, {pid2, ref2}, disp)
130130

131131
{:ok, [:d], disp} = D.dispatch([:c, :d], 2, disp)
132-
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
132+
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
133133
assert_receive {:"$gen_consumer", {_, ^ref1}, [:c]}
134134
assert_receive {:"$gen_consumer", {_, ^ref2}, [:c]}
135135

136136
# A batch with no demand
137137
{:ok, [:d], disp} = D.dispatch([:d], 1, disp)
138-
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, expected_subscribers}
138+
assert disp == {[{1, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
139139
refute_received {:"$gen_consumer", {_, _}, _}
140140

141141
# Add a late subscriber
@@ -146,24 +146,114 @@ defmodule GenStage.BroadcastDispatcherTest do
146146
expected_subscribers = MapSet.put(expected_subscribers, pid3)
147147

148148
assert disp ==
149-
{[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0,
149+
{[{0, pid3, ref3, nil}, {1, pid1, ref1, nil}, {1, pid2, ref2, nil}], 0, 1,
150150
expected_subscribers}
151151

152152
# Even out
153153
{:ok, 0, disp} = D.ask(2, {pid1, ref1}, disp)
154154
{:ok, 0, disp} = D.ask(2, {pid2, ref2}, disp)
155-
{:ok, 3, disp} = D.ask(3, {pid3, ref3}, disp)
155+
{:ok, 2, disp} = D.ask(3, {pid3, ref3}, disp)
156156
{:ok, [], disp} = D.dispatch([:d, :e, :f], 3, disp)
157157

158158
assert disp ==
159-
{[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0,
159+
{[{0, pid3, ref3, nil}, {0, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0,
160160
expected_subscribers}
161161

162162
assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f]}
163163
assert_receive {:"$gen_consumer", {_, ^ref2}, [:d, :e, :f]}
164164
assert_receive {:"$gen_consumer", {_, ^ref3}, [:d, :e, :f]}
165165
end
166166

167+
test "subscribes, asks, dispatches, and repeats" do
168+
pid1 = spawn_forwarder()
169+
pid2 = spawn_forwarder()
170+
ref1 = make_ref()
171+
ref2 = make_ref()
172+
disp = dispatcher([])
173+
174+
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
175+
expected_subscribers = MapSet.new([pid1])
176+
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}
177+
178+
{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
179+
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}
180+
181+
{:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp)
182+
assert disp == {[{0, pid1, ref1, nil}], 7, 7, expected_subscribers}
183+
184+
assert_receive {:"$gen_consumer", {_, ^ref1}, [:a, :b, :c]}
185+
186+
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
187+
expected_subscribers = MapSet.put(expected_subscribers, pid2)
188+
assert disp == {[{0, pid2, ref2, nil}, {7, pid1, ref1, nil}], 0, 7, expected_subscribers}
189+
190+
{:ok, 0, disp} = D.ask(20, {pid2, ref2}, disp)
191+
assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 7, 7, expected_subscribers}
192+
193+
{:ok, [], disp} = D.dispatch([:d, :e, :f, :g, :h, :i, :j], 7, disp)
194+
assert disp == {[{13, pid2, ref2, nil}, {0, pid1, ref1, nil}], 0, 0, expected_subscribers}
195+
196+
assert_receive {:"$gen_consumer", {_, ^ref1}, [:d, :e, :f, :g, :h, :i, :j]}
197+
assert_receive {:"$gen_consumer", {_, ^ref2}, [:d, :e, :f, :g, :h, :i, :j]}
198+
end
199+
200+
test "subscribes, asks, cancels, and dispatcher reuses events for another subscriber" do
201+
pid1 = spawn_forwarder()
202+
pid2 = spawn_forwarder()
203+
ref1 = make_ref()
204+
ref2 = make_ref()
205+
disp = dispatcher([])
206+
207+
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
208+
expected_subscribers = MapSet.new([pid1])
209+
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}
210+
211+
{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
212+
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}
213+
214+
{:ok, 0, disp} = D.cancel({pid1, ref1}, disp)
215+
expected_subscribers = MapSet.delete(expected_subscribers, pid1)
216+
assert disp == {[], 0, 10, expected_subscribers}
217+
218+
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
219+
expected_subscribers = MapSet.put(expected_subscribers, pid2)
220+
assert disp == {[{0, pid2, ref2, nil}], 0, 10, expected_subscribers}
221+
222+
{:ok, 0, disp} = D.ask(5, {pid2, ref2}, disp)
223+
assert disp == {[{0, pid2, ref2, nil}], 5, 10, expected_subscribers}
224+
225+
{:ok, [], disp} = D.dispatch([:a, :b, :c], 3, disp)
226+
assert disp == {[{0, pid2, ref2, nil}], 2, 7, expected_subscribers}
227+
228+
assert_receive {:"$gen_consumer", {_, ^ref2}, [:a, :b, :c]}
229+
end
230+
231+
test "cancels blocking subscriber while there is already a requested demand" do
232+
pid1 = spawn_forwarder()
233+
pid2 = spawn_forwarder()
234+
ref1 = make_ref()
235+
ref2 = make_ref()
236+
disp = dispatcher([])
237+
238+
{:ok, 0, disp} = D.subscribe([], {pid1, ref1}, disp)
239+
expected_subscribers = MapSet.new([pid1])
240+
assert disp == {[{0, pid1, ref1, nil}], 0, 0, expected_subscribers}
241+
242+
{:ok, 10, disp} = D.ask(10, {pid1, ref1}, disp)
243+
assert disp == {[{0, pid1, ref1, nil}], 10, 10, expected_subscribers}
244+
245+
{:ok, 0, disp} = D.subscribe([], {pid2, ref2}, disp)
246+
expected_subscribers = MapSet.put(expected_subscribers, pid2)
247+
assert disp == {[{0, pid2, ref2, nil}, {10, pid1, ref1, nil}], 0, 10, expected_subscribers}
248+
249+
{:ok, 0, disp} = D.ask(5, {pid1, ref1}, disp)
250+
assert disp == {[{15, pid1, ref1, nil}, {0, pid2, ref2, nil}], 0, 10, expected_subscribers}
251+
252+
{:ok, 5, disp} = D.cancel({pid2, ref2}, disp)
253+
expected_subscribers = MapSet.delete(expected_subscribers, pid2)
254+
assert disp == {[{0, pid1, ref1, nil}], 15, 15, expected_subscribers}
255+
end
256+
167257
test "subscribing with a selector function" do
168258
pid1 = spawn_forwarder()
169259
pid2 = spawn_forwarder()
@@ -175,7 +265,7 @@ defmodule GenStage.BroadcastDispatcherTest do
175265

176266
{:ok, 0, disp} = D.subscribe([selector: selector1], {pid1, ref1}, disp)
177267
{:ok, 0, disp} = D.subscribe([selector: selector2], {pid2, ref2}, disp)
178-
assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, _} = disp
268+
assert {[{0, ^pid2, ^ref2, _selector2}, {0, ^pid1, ^ref1, _selector1}], 0, 0, _} = disp
179269

180270
{:ok, 0, disp} = D.ask(4, {pid2, ref2}, disp)
181271
{:ok, 4, disp} = D.ask(4, {pid1, ref1}, disp)
@@ -219,7 +309,7 @@ defmodule GenStage.BroadcastDispatcherTest do
219309

220310
assert ExUnit.CaptureLog.capture_log(fn ->
221311
assert {:error, _} = D.subscribe([], {pid, ref2}, disp)
222-
assert disp == {[{0, pid, ref1, nil}], 0, expected_subscribers}
312+
assert disp == {[{0, pid, ref1, nil}], 0, 0, expected_subscribers}
223313
end) =~ "already registered"
224314
end
225315

0 commit comments

Comments
 (0)