Skip to content

Commit 1cadeff

Browse files
authored
Revert Stream transform simplification causing regression (#14993)
* Revert "Simplify transform as suspend is guaranteed to return one entry" This reverts commit a7c008f. * Add regression test case
1 parent ac44e72 commit 1cadeff

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

lib/elixir/lib/stream.ex

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -964,17 +964,13 @@ defmodule Stream do
964964
after_fun.(user_acc)
965965
:erlang.raise(kind, reason, __STACKTRACE__)
966966
else
967-
{:suspended, [val], next} ->
968-
do_transform_user(val, user_acc, :cont, next, inner_acc, funs)
967+
{:suspended, vals, next} ->
968+
do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs)
969969

970-
{_, result} ->
970+
{_, vals} ->
971971
# Do not attempt to call the resource again, it has either done or halted
972972
next = fn _ -> {:done, []} end
973-
974-
case result do
975-
[val] -> do_transform_user(val, user_acc, :last, next, inner_acc, funs)
976-
[] -> do_transform(user_acc, :last, next, inner_acc, funs)
977-
end
973+
do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs)
978974
end
979975
end
980976

@@ -989,7 +985,7 @@ defmodule Stream do
989985
after_fun.(user_acc)
990986
:erlang.raise(kind, reason, __STACKTRACE__)
991987
else
992-
result -> do_transform_result(result, :halt, next, inner_acc, funs)
988+
result -> do_transform_result(result, [], :halt, next, inner_acc, funs)
993989
end
994990
else
995991
do_transform(user_acc, :halt, next, inner_acc, funs)
@@ -1002,7 +998,11 @@ defmodule Stream do
1002998
{:halted, elem(inner_acc, 1)}
1003999
end
10041000

1005-
defp do_transform_user(val, user_acc, next_op, next, inner_acc, funs) do
1001+
defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do
1002+
do_transform(user_acc, next_op, next, inner_acc, funs)
1003+
end
1004+
1005+
defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do
10061006
{user, _, _, _, after_fun} = funs
10071007

10081008
try do
@@ -1013,20 +1013,20 @@ defmodule Stream do
10131013
after_fun.(user_acc)
10141014
:erlang.raise(kind, reason, __STACKTRACE__)
10151015
else
1016-
result -> do_transform_result(result, next_op, next, inner_acc, funs)
1016+
result -> do_transform_result(result, vals, next_op, next, inner_acc, funs)
10171017
end
10181018
end
10191019

1020-
defp do_transform_result(result, next_op, next, inner_acc, funs) do
1020+
defp do_transform_result(result, vals, next_op, next, inner_acc, funs) do
10211021
{_, fun, inner, _, after_fun} = funs
10221022

10231023
case result do
10241024
{[], user_acc} ->
1025-
do_transform(user_acc, next_op, next, inner_acc, funs)
1025+
do_transform_user(vals, user_acc, next_op, next, inner_acc, funs)
10261026

10271027
{list, user_acc} when is_list(list) ->
10281028
reduce = &Enumerable.List.reduce(list, &1, fun)
1029-
do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs)
1029+
do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs)
10301030

10311031
{:halt, user_acc} ->
10321032
next.({:halt, []})
@@ -1035,11 +1035,11 @@ defmodule Stream do
10351035

10361036
{other, user_acc} ->
10371037
reduce = &Enumerable.reduce(other, &1, inner)
1038-
do_transform_inner_enum(user_acc, next_op, next, inner_acc, reduce, funs)
1038+
do_transform_inner_enum(vals, user_acc, next_op, next, inner_acc, reduce, funs)
10391039
end
10401040
end
10411041

1042-
defp do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs) do
1042+
defp do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) do
10431043
{_, _, _, _, after_fun} = funs
10441044

10451045
try do
@@ -1051,20 +1051,20 @@ defmodule Stream do
10511051
:erlang.raise(kind, reason, __STACKTRACE__)
10521052
else
10531053
{:done, acc} ->
1054-
do_transform(user_acc, next_op, next, {:cont, acc}, funs)
1054+
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
10551055

10561056
{:halted, acc} ->
10571057
next.({:halt, []})
10581058
after_fun.(user_acc)
10591059
{:halted, acc}
10601060

10611061
{:suspended, acc, continuation} ->
1062-
resume = &do_transform_inner_list(user_acc, next_op, next, &1, continuation, funs)
1062+
resume = &do_transform_inner_list(vals, user_acc, next_op, next, &1, continuation, funs)
10631063
{:suspended, acc, resume}
10641064
end
10651065
end
10661066

1067-
defp do_transform_inner_enum(user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
1067+
defp do_transform_inner_enum(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
10681068
{_, _, _, _, after_fun} = funs
10691069

10701070
try do
@@ -1078,18 +1078,18 @@ defmodule Stream do
10781078
# The user wanted to cont/suspend but the stream halted,
10791079
# so we continue with the user intention.
10801080
{:halted, [inner_op | acc]} when op != :halt and inner_op != :halt ->
1081-
do_transform(user_acc, next_op, next, {inner_op, acc}, funs)
1081+
do_transform_user(vals, user_acc, next_op, next, {inner_op, acc}, funs)
10821082

10831083
{:halted, [_ | acc]} ->
10841084
next.({:halt, []})
10851085
after_fun.(user_acc)
10861086
{:halted, acc}
10871087

10881088
{:done, [_ | acc]} ->
1089-
do_transform(user_acc, next_op, next, {:cont, acc}, funs)
1089+
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)
10901090

10911091
{:suspended, [_ | acc], continuation} ->
1092-
resume = &do_transform_inner_enum(user_acc, next_op, next, &1, continuation, funs)
1092+
resume = &do_transform_inner_enum(vals, user_acc, next_op, next, &1, continuation, funs)
10931093
{:suspended, acc, resume}
10941094
end
10951095
end

lib/elixir/test/elixir/stream_test.exs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,24 @@ defmodule StreamTest do
183183
assert Stream.chunk_while([1, 2, 3, 4, 5], [], chunk_fun, after_fun) |> Enum.at(0) == [1]
184184
end
185185

186+
test "chunk_while/4 regression case with concat" do
187+
result =
188+
["WrongHeader\nJohn Doe", "skipped"]
189+
|> Stream.take(1)
190+
|> Stream.chunk_while(
191+
"",
192+
fn element, acc ->
193+
{acc, elements} = String.split(acc <> element, "\n") |> List.pop_at(-1)
194+
{:cont, elements, acc}
195+
end,
196+
&{:cont, [&1], []}
197+
)
198+
|> Stream.concat()
199+
|> Enum.to_list()
200+
201+
assert result == ["WrongHeader", "John Doe"]
202+
end
203+
186204
test "concat/1" do
187205
stream = Stream.concat([1..3, [], [4, 5, 6], [], 7..9])
188206
assert is_function(stream)

0 commit comments

Comments
 (0)