Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions lib/elixir/lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -964,17 +964,13 @@ defmodule Stream do
after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
{:suspended, [val], next} ->
do_transform_user(val, user_acc, :cont, next, inner_acc, funs)
{:suspended, vals, next} ->
do_transform_user(:lists.reverse(vals), user_acc, :cont, next, inner_acc, funs)

{_, result} ->
{_, vals} ->
# Do not attempt to call the resource again, it has either done or halted
next = fn _ -> {:done, []} end

case result do
[val] -> do_transform_user(val, user_acc, :last, next, inner_acc, funs)
[] -> do_transform(user_acc, :last, next, inner_acc, funs)
end
do_transform_user(:lists.reverse(vals), user_acc, :last, next, inner_acc, funs)
end
end

Expand All @@ -989,7 +985,7 @@ defmodule Stream do
after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
result -> do_transform_result(result, :halt, next, inner_acc, funs)
result -> do_transform_result(result, [], :halt, next, inner_acc, funs)
end
else
do_transform(user_acc, :halt, next, inner_acc, funs)
Expand All @@ -1002,7 +998,11 @@ defmodule Stream do
{:halted, elem(inner_acc, 1)}
end

defp do_transform_user(val, user_acc, next_op, next, inner_acc, funs) do
defp do_transform_user([], user_acc, next_op, next, inner_acc, funs) do
do_transform(user_acc, next_op, next, inner_acc, funs)
end

defp do_transform_user([val | vals], user_acc, next_op, next, inner_acc, funs) do
{user, _, _, _, after_fun} = funs

try do
Expand All @@ -1013,20 +1013,20 @@ defmodule Stream do
after_fun.(user_acc)
:erlang.raise(kind, reason, __STACKTRACE__)
else
result -> do_transform_result(result, next_op, next, inner_acc, funs)
result -> do_transform_result(result, vals, next_op, next, inner_acc, funs)
end
end

defp do_transform_result(result, next_op, next, inner_acc, funs) do
defp do_transform_result(result, vals, next_op, next, inner_acc, funs) do
{_, fun, inner, _, after_fun} = funs

case result do
{[], user_acc} ->
do_transform(user_acc, next_op, next, inner_acc, funs)
do_transform_user(vals, user_acc, next_op, next, inner_acc, funs)

{list, user_acc} when is_list(list) ->
reduce = &Enumerable.List.reduce(list, &1, fun)
do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs)
do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs)

{:halt, user_acc} ->
next.({:halt, []})
Expand All @@ -1035,11 +1035,11 @@ defmodule Stream do

{other, user_acc} ->
reduce = &Enumerable.reduce(other, &1, inner)
do_transform_inner_enum(user_acc, next_op, next, inner_acc, reduce, funs)
do_transform_inner_enum(vals, user_acc, next_op, next, inner_acc, reduce, funs)
end
end

defp do_transform_inner_list(user_acc, next_op, next, inner_acc, reduce, funs) do
defp do_transform_inner_list(vals, user_acc, next_op, next, inner_acc, reduce, funs) do
{_, _, _, _, after_fun} = funs

try do
Expand All @@ -1051,20 +1051,20 @@ defmodule Stream do
:erlang.raise(kind, reason, __STACKTRACE__)
else
{:done, acc} ->
do_transform(user_acc, next_op, next, {:cont, acc}, funs)
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)

{:halted, acc} ->
next.({:halt, []})
after_fun.(user_acc)
{:halted, acc}

{:suspended, acc, continuation} ->
resume = &do_transform_inner_list(user_acc, next_op, next, &1, continuation, funs)
resume = &do_transform_inner_list(vals, user_acc, next_op, next, &1, continuation, funs)
{:suspended, acc, resume}
end
end

defp do_transform_inner_enum(user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
defp do_transform_inner_enum(vals, user_acc, next_op, next, {op, inner_acc}, reduce, funs) do
{_, _, _, _, after_fun} = funs

try do
Expand All @@ -1078,18 +1078,18 @@ defmodule Stream do
# The user wanted to cont/suspend but the stream halted,
# so we continue with the user intention.
{:halted, [inner_op | acc]} when op != :halt and inner_op != :halt ->
do_transform(user_acc, next_op, next, {inner_op, acc}, funs)
do_transform_user(vals, user_acc, next_op, next, {inner_op, acc}, funs)

{:halted, [_ | acc]} ->
next.({:halt, []})
after_fun.(user_acc)
{:halted, acc}

{:done, [_ | acc]} ->
do_transform(user_acc, next_op, next, {:cont, acc}, funs)
do_transform_user(vals, user_acc, next_op, next, {:cont, acc}, funs)

{:suspended, [_ | acc], continuation} ->
resume = &do_transform_inner_enum(user_acc, next_op, next, &1, continuation, funs)
resume = &do_transform_inner_enum(vals, user_acc, next_op, next, &1, continuation, funs)
{:suspended, acc, resume}
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/elixir/test/elixir/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,24 @@ defmodule StreamTest do
assert Stream.chunk_while([1, 2, 3, 4, 5], [], chunk_fun, after_fun) |> Enum.at(0) == [1]
end

test "chunk_while/4 regression case with concat" do
result =
["WrongHeader\nJohn Doe", "skipped"]
|> Stream.take(1)
|> Stream.chunk_while(
"",
fn element, acc ->
{acc, elements} = String.split(acc <> element, "\n") |> List.pop_at(-1)
{:cont, elements, acc}
end,
&{:cont, [&1], []}
)
|> Stream.concat()
|> Enum.to_list()

assert result == ["WrongHeader", "John Doe"]
end

test "concat/1" do
stream = Stream.concat([1..3, [], [4, 5, 6], [], 7..9])
assert is_function(stream)
Expand Down
Loading