diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex index 81704f1e31..1780738efd 100644 --- a/lib/elixir/lib/stream.ex +++ b/lib/elixir/lib/stream.ex @@ -964,17 +964,13 @@ defmodule Stream do after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else - {:suspended, [val], next} -> - do_transform_user(val, user_acc, :cont, next, inner_acc, funs) + {:suspended, vals, next} -> + do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs) - {_, result} -> + {_, vals} -> # Do not attempt to call the resource again, it has either done or halted next = fn _ -> {:done, []} end - - case result do - [val] -> do_transform_user(val, user_acc, :last, next, inner_acc, funs) - [] -> do_transform(user_acc, :last, next, inner_acc, funs) - end + do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs) end end @@ -989,7 +985,7 @@ defmodule Stream do after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else - result -> do_transform_result(result, :halt, next, inner_acc, funs) + result -> do_transform_result(result, [], :halt, next, inner_acc, funs) end else do_transform(user_acc, :halt, next, inner_acc, funs) @@ -1002,7 +998,11 @@ defmodule Stream do {:halted, elem(inner_acc, 1)} end - defp do_transform_user(val, user_acc, next_op, next, inner_acc, funs) do + defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do + do_transform(user_acc, next_op, next, inner_acc, funs) + end + + defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do {user, _, _, _, after_fun} = funs try do @@ -1013,20 +1013,20 @@ defmodule Stream do after_fun.(user_acc) :erlang.raise(kind, reason, __STACKTRACE__) else - result -> do_transform_result(result, next_op, next, inner_acc, funs) + result -> do_transform_result(result, vals, next_op, next, inner_acc, funs) end end - defp do_transform_result(result, next_op, next, inner_acc, funs) do + defp do_transform_result(result, vals, next_op, next, inner_acc, funs) do {_, fun, inner, _, after_fun} = funs case result do {[], user_acc} -> - do_transform(user_acc, next_op, next, inner_acc, funs) + do_transform_user(vals, user_acc, next_op, next, inner_acc, funs) {list, user_acc} when is_list(list) -> reduce = &Enumerable.List.reduce(list, &1, fun) - do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs) + do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) {:halt, user_acc} -> next.({:halt, []}) @@ -1035,11 +1035,11 @@ defmodule Stream do {other, user_acc} -> reduce = &Enumerable.reduce(other, &1, inner) - do_transform_inner_enum(user_acc, next_op, next, inner_acc, reduce, funs) + do_transform_inner_enum(vals, user_acc, next_op, next, inner_acc, reduce, funs) end end - defp do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs) do + defp do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) do {_, _, _, _, after_fun} = funs try do @@ -1051,7 +1051,7 @@ defmodule Stream do :erlang.raise(kind, reason, __STACKTRACE__) else {:done, acc} -> - do_transform(user_acc, next_op, next, {:cont, acc}, funs) + do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) {:halted, acc} -> next.({:halt, []}) @@ -1059,12 +1059,12 @@ defmodule Stream do {:halted, acc} {:suspended, acc, continuation} -> - resume = &do_transform_inner_list(user_acc, next_op, next, &1, continuation, funs) + resume = &do_transform_inner_list(vals, user_acc, next_op, next, &1, continuation, funs) {:suspended, acc, resume} end end - defp do_transform_inner_enum(user_acc, next_op, next, {op, inner_acc}, reduce, funs) do + defp do_transform_inner_enum(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do {_, _, _, _, after_fun} = funs try do @@ -1078,7 +1078,7 @@ defmodule Stream do # The user wanted to cont/suspend but the stream halted, # so we continue with the user intention. {:halted, [inner_op | acc]} when op != :halt and inner_op != :halt -> - do_transform(user_acc, next_op, next, {inner_op, acc}, funs) + do_transform_user(vals, user_acc, next_op, next, {inner_op, acc}, funs) {:halted, [_ | acc]} -> next.({:halt, []}) @@ -1086,10 +1086,10 @@ defmodule Stream do {:halted, acc} {:done, [_ | acc]} -> - do_transform(user_acc, next_op, next, {:cont, acc}, funs) + do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs) {:suspended, [_ | acc], continuation} -> - resume = &do_transform_inner_enum(user_acc, next_op, next, &1, continuation, funs) + resume = &do_transform_inner_enum(vals, user_acc, next_op, next, &1, continuation, funs) {:suspended, acc, resume} end end diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs index ed62a1cfa4..a7444f7f8a 100644 --- a/lib/elixir/test/elixir/stream_test.exs +++ b/lib/elixir/test/elixir/stream_test.exs @@ -183,6 +183,24 @@ defmodule StreamTest do assert Stream.chunk_while([1, 2, 3, 4, 5], [], chunk_fun, after_fun) |> Enum.at(0) == [1] end + test "chunk_while/4 regression case with concat" do + result = + ["WrongHeader\nJohn Doe", "skipped"] + |> Stream.take(1) + |> Stream.chunk_while( + "", + fn element, acc -> + {acc, elements} = String.split(acc <> element, "\n") |> List.pop_at(-1) + {:cont, elements, acc} + end, + &{:cont, [&1], []} + ) + |> Stream.concat() + |> Enum.to_list() + + assert result == ["WrongHeader", "John Doe"] + end + test "concat/1" do stream = Stream.concat([1..3, [], [4, 5, 6], [], 7..9]) assert is_function(stream)