Skip to content

Commit 8208549

Browse files
committed
rabbit_quorum_queue: Shrink batches of QQs in parallel
Shrinking a member node off of a QQ can be parallelized. The operation involves * removing the node from the QQ's cluster membership (appending a command to the log and committing it) with `ra:remove_member/3` * updating the metadata store to remove the member from the QQ type state with `rabbit_amqqueue:update/2` * deleting the queue data from the node with `ra:force_delete_server/2` if the node can be reached All of these operations are I/O bound. Updating the cluster membership and metadata store involves appending commands to those logs and replicating them. Writing commands to Ra synchronously in serial is fairly slow - sending many commands in parallel is much more efficient. By parallelizing these steps we can write larger chunks of commands to WAL(s). `ra:force_delete_server/2` benefits from parallelizing if the node being shrunk off is no longer reachable, for example in some hardware failures. The underlying `rpc:call/4` will attempt to auto-connect to the node and this can take some time to time out. By parallelizing this, each `rpc:call/4` reuses the same underlying distribution entry and all calls fail together once the connection fails to establish.
1 parent 6abeaf4 commit 8208549

File tree

1 file changed

+47
-34
lines changed

1 file changed

+47
-34
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1513,40 +1513,53 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
15131513
{ok, pos_integer()} | {error, pos_integer(), term()}}].
15141514
shrink_all(Node) ->
15151515
?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]),
1516-
[begin
1517-
QName = amqqueue:get_name(Q),
1518-
?LOG_INFO("~ts: removing member (replica) on node ~w",
1519-
[rabbit_misc:rs(QName), Node]),
1520-
Size = length(get_nodes(Q)),
1521-
case delete_member(Q, Node) of
1522-
ok ->
1523-
{QName, {ok, Size-1}};
1524-
{error, cluster_change_not_permitted} ->
1525-
%% this could be timing related and due to a new leader just being
1526-
%% elected but it's noop command not been committed yet.
1527-
%% lets sleep and retry once
1528-
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
1529-
"as cluster change is not permitted. "
1530-
"retrying once in 500ms",
1531-
[rabbit_misc:rs(QName), Node]),
1532-
timer:sleep(500),
1533-
case delete_member(Q, Node) of
1534-
ok ->
1535-
{QName, {ok, Size-1}};
1536-
{error, Err} ->
1537-
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1538-
[rabbit_misc:rs(QName), Node, Err]),
1539-
{QName, {error, Size, Err}}
1540-
end;
1541-
{error, Err} ->
1542-
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1543-
[rabbit_misc:rs(QName), Node, Err]),
1544-
{QName, {error, Size, Err}}
1545-
end
1546-
end || Q <- rabbit_amqqueue:list(),
1547-
amqqueue:get_type(Q) == ?MODULE,
1548-
lists:member(Node, get_nodes(Q))].
1549-
1516+
Parent = self(),
1517+
%% This operation is mostly bound by I/O so this default is set high:
1518+
Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64),
1519+
Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(),
1520+
amqqueue:get_type(Q) == ?MODULE,
1521+
lists:member(Node, get_nodes(Q))]),
1522+
lists:append([begin
1523+
Running = [spawn(fun() ->
1524+
Res = shrink(Node, Q),
1525+
Parent ! {self(), Res}
1526+
end) || Q <- Chunk],
1527+
[receive
1528+
{Pid, Res} ->
1529+
Res
1530+
end || Pid <- Running]
1531+
end || Chunk <- Chunks]).
1532+
1533+
shrink(Node, Q) ->
1534+
QName = amqqueue:get_name(Q),
1535+
?LOG_INFO("~ts: removing member (replica) on node ~w",
1536+
[rabbit_misc:rs(QName), Node]),
1537+
Size = length(get_nodes(Q)),
1538+
case delete_member(Q, Node) of
1539+
ok ->
1540+
{QName, {ok, Size-1}};
1541+
{error, cluster_change_not_permitted} ->
1542+
%% this could be timing related and due to a new leader just being
1543+
%% elected but it's noop command not been committed yet.
1544+
%% lets sleep and retry once
1545+
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
1546+
"as cluster change is not permitted. "
1547+
"retrying once in 500ms",
1548+
[rabbit_misc:rs(QName), Node]),
1549+
timer:sleep(500),
1550+
case delete_member(Q, Node) of
1551+
ok ->
1552+
{QName, {ok, Size-1}};
1553+
{error, Err} ->
1554+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1555+
[rabbit_misc:rs(QName), Node, Err]),
1556+
{QName, {error, Size, Err}}
1557+
end;
1558+
{error, Err} ->
1559+
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
1560+
[rabbit_misc:rs(QName), Node, Err]),
1561+
{QName, {error, Size, Err}}
1562+
end.
15501563

15511564
grow(Node, VhostSpec, QueueSpec, Strategy) ->
15521565
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).

0 commit comments

Comments
 (0)