1818
1919-define(TIMEOUT, 30_000).
2020
21+ %% This is the pseudo queue that is specially interpreted by RabbitMQ.
22+ -define(REPLY_QUEUE, <<"amq.rabbitmq.reply-to">>).
23+
2124all() ->
2225 [
2326 {group, cluster_size_1},
@@ -28,7 +31,11 @@ groups() ->
2831 [
2932 {cluster_size_1, [shuffle],
3033 [
31- trace
34+ trace,
35+ failure_ack_mode,
36+ failure_multiple_consumers,
37+ failure_reuse_consumer_tag,
38+ failure_publish
3239 ]},
3340 {cluster_size_3, [shuffle],
3441 [
@@ -82,8 +89,6 @@ trace(Config) ->
8289 Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
8390 TraceQueue = <<"tests.amqpl_direct_reply_to.trace.tracing">>,
8491 RequestQueue = <<"tests.amqpl_direct_reply_to.trace.requests">>,
85- %% This is the pseudo queue that is specially interpreted by RabbitMQ.
86- ReplyQueue = <<"amq.rabbitmq.reply-to">>,
8792 RequestPayload = <<"my request">>,
8893 ReplyPayload = <<"my reply">>,
8994 CorrelationId = <<"my correlation ID">>,
@@ -102,7 +107,7 @@ trace(Config) ->
102107
103108 %% There is no need to declare this pseudo queue first.
104109 amqp_channel:subscribe(RequesterCh,
105- #'basic.consume'{queue = ReplyQueue ,
110+ #'basic.consume'{queue = ?REPLY_QUEUE ,
106111 no_ack = true},
107112 self()),
108113 CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
@@ -114,7 +119,7 @@ trace(Config) ->
114119 amqp_channel:cast(
115120 RequesterCh,
116121 #'basic.publish'{routing_key = RequestQueue},
117- #amqp_msg{props = #'P_basic'{reply_to = ReplyQueue ,
122+ #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE ,
118123 correlation_id = CorrelationId},
119124 payload = RequestPayload}),
120125 receive #'basic.ack'{} -> ok
@@ -182,6 +187,85 @@ trace(Config) ->
182187 [#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q0}) || Q0 <- Qs],
183188 {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]).
184189
190+ %% A consumer must consume in no-ack mode.
191+ failure_ack_mode(Config) ->
192+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
193+ Consume = #'basic.consume'{queue = ?REPLY_QUEUE,
194+ no_ack = false},
195+ try amqp_channel:subscribe(Ch, Consume, self()) of
196+ _ ->
197+ ct:fail("expected subscribe in ack mode to fail")
198+ catch exit:Reason ->
199+ ?assertMatch(
200+ {{_, {_, _, <<"PRECONDITION_FAILED - reply consumer cannot acknowledge">>}}, _},
201+ Reason)
202+ end,
203+ ok = rabbit_ct_client_helpers:close_connection(Conn).
204+
205+ %% In AMQP 0.9.1 there can be at most one reply consumer per channel.
206+ failure_multiple_consumers(Config) ->
207+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
208+ Consume = #'basic.consume'{queue = ?REPLY_QUEUE,
209+ no_ack = true},
210+ amqp_channel:subscribe(Ch, Consume, self()),
211+ receive #'basic.consume_ok'{} -> ok
212+ end,
213+
214+ try amqp_channel:subscribe(Ch, Consume, self()) of
215+ _ ->
216+ ct:fail("expected second subscribe to fail")
217+ catch exit:Reason ->
218+ ?assertMatch(
219+ {{_, {_, _, <<"PRECONDITION_FAILED - reply consumer already set">>}}, _},
220+ Reason)
221+ end,
222+ ok = rabbit_ct_client_helpers:close_connection(Conn).
223+
224+ %% Reusing the same consumer tag should fail.
225+ failure_reuse_consumer_tag(Config) ->
226+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
227+ Ctag = <<"my-tag">>,
228+
229+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
230+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
231+ consumer_tag = Ctag}, self()),
232+ receive #'basic.consume_ok'{} -> ok
233+ end,
234+
235+ try amqp_channel:subscribe(Ch, #'basic.consume'{queue = ?REPLY_QUEUE,
236+ consumer_tag = Ctag,
237+ no_ack = true}, self()) of
238+ _ ->
239+ ct:fail("expected reusing consumer tag to fail")
240+ catch exit:Reason ->
241+ ?assertMatch(
242+ {{_, {connection_closing,
243+ {_, _, <<"NOT_ALLOWED - attempt to reuse consumer tag 'my-tag'">>}
244+ }}, _},
245+ Reason)
246+ end.
247+
248+ %% Publishing with reply_to header set but without consuming from the pseudo queue should fail.
249+ failure_publish(Config) ->
250+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
251+
252+ Ref = monitor(process, Ch),
253+ amqp_channel:cast(
254+ Ch,
255+ #'basic.publish'{routing_key = <<"some request queue">>},
256+ #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
257+ correlation_id = <<"some correlation ID">>},
258+ payload = <<"some payload">>}),
259+
260+ receive {'DOWN', Ref, process, Ch, Reason} ->
261+ ?assertMatch(
262+ {_, {_, _, <<"PRECONDITION_FAILED - fast reply consumer does not exist">>}},
263+ Reason)
264+ after ?TIMEOUT ->
265+ ct:fail("expected channel error")
266+ end,
267+ ok = rabbit_ct_client_helpers:close_connection(Conn).
268+
185269%% "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
186270rpc_new_to_old_node(Config) ->
187271 rpc(0, 1, Config).
@@ -190,36 +274,40 @@ rpc_old_to_new_node(Config) ->
190274 rpc(1, 0, Config).
191275
192276rpc(RequesterNode, ResponderNode, Config) ->
193- RequestQueue = <<"tests.amqpl_direct_reply_to.rpc.requests">>,
194- %% This is the pseudo queue that is specially interpreted by RabbitMQ.
195- ReplyQueue = <<"amq.rabbitmq.reply-to">>,
277+ RequestQueue = <<"request queue">>,
196278 RequestPayload = <<"my request">>,
197- ReplyPayload = <<"my reply">>,
198279 CorrelationId = <<"my correlation ID">>,
199280 RequesterCh = rabbit_ct_client_helpers:open_channel(Config, RequesterNode),
200281 ResponderCh = rabbit_ct_client_helpers:open_channel(Config, ResponderNode),
201282
202283 %% There is no need to declare this pseudo queue first.
203284 amqp_channel:subscribe(RequesterCh,
204- #'basic.consume'{queue = ReplyQueue ,
285+ #'basic.consume'{queue = ?REPLY_QUEUE ,
205286 no_ack = true},
206287 self()),
207288 CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
208289 end,
290+
291+ ?assertEqual(#'queue.declare_ok'{queue = ?REPLY_QUEUE,
292+ message_count = 0,
293+ consumer_count = 1},
294+ amqp_channel:call(RequesterCh,
295+ #'queue.declare'{queue = ?REPLY_QUEUE})),
296+
209297 #'queue.declare_ok'{} = amqp_channel:call(
210298 RequesterCh,
211299 #'queue.declare'{queue = RequestQueue}),
212300 #'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
213301 amqp_channel:register_confirm_handler(RequesterCh, self()),
302+
214303 %% Send the request.
215304 amqp_channel:cast(
216305 RequesterCh,
217306 #'basic.publish'{routing_key = RequestQueue},
218- #amqp_msg{props = #'P_basic'{reply_to = ReplyQueue ,
307+ #amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE ,
219308 correlation_id = CorrelationId},
220309 payload = RequestPayload}),
221310 receive #'basic.ack'{} -> ok
222- after ?TIMEOUT -> ct:fail(confirm_timeout)
223311 end,
224312
225313 ok = wait_for_queue_declared(RequestQueue, ResponderNode, Config),
@@ -229,20 +317,101 @@ rpc(RequesterNode, ResponderNode, Config) ->
229317 correlation_id = CorrelationId},
230318 payload = RequestPayload}
231319 } = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
320+
321+ %% Test what the docs state:
322+ %% "If the RPC server is going to perform some expensive computation it might wish
323+ %% to check if the client has gone away. To do this the server can declare the
324+ %% generated reply name first on a disposable channel in order to determine whether
325+ %% it still exists."
326+ ?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
327+ message_count = 0,
328+ consumer_count = 1},
329+ amqp_channel:call(ResponderCh,
330+ #'queue.declare'{queue = ReplyTo})),
331+
232332 %% Send the reply.
233333 amqp_channel:cast(
234334 ResponderCh,
235335 #'basic.publish'{routing_key = ReplyTo},
236336 #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
237- payload = ReplyPayload }),
337+ payload = <<"reply 1">> }),
238338
239- %% Receive the reply.
339+ %% Let's assume the RPC server sends multiple replies for a single request.
340+ %% (This is a bit unusual but should work.)
341+ amqp_channel:cast(
342+ ResponderCh,
343+ #'basic.publish'{routing_key = ReplyTo},
344+ #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
345+ payload = <<"reply 2">>}),
346+
347+ %% Receive the frst reply.
348+ receive {#'basic.deliver'{consumer_tag = CTag,
349+ redelivered = false,
350+ exchange = <<>>,
351+ routing_key = ReplyTo},
352+ #amqp_msg{payload = P1,
353+ props = #'P_basic'{correlation_id = CorrelationId}}} ->
354+ ?assertEqual(<<"reply 1">>, P1)
355+ after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
356+ end,
357+
358+ %% Receive the second reply.
240359 receive {#'basic.deliver'{consumer_tag = CTag},
241- #amqp_msg{payload = ReplyPayload ,
360+ #amqp_msg{payload = P2 ,
242361 props = #'P_basic'{correlation_id = CorrelationId}}} ->
243- ok
244- after ?TIMEOUT -> ct:fail(missing_reply)
245- end.
362+ ?assertEqual(<<"reply 2">>, P2)
363+ after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
364+ end,
365+
366+ %% The requester sends a reply to itself.
367+ %% (Really odd, but should work.)
368+ amqp_channel:cast(
369+ RequesterCh,
370+ #'basic.publish'{routing_key = ReplyTo},
371+ #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
372+ payload = <<"reply 3">>}),
373+
374+ receive {#'basic.deliver'{consumer_tag = CTag},
375+ #amqp_msg{payload = P3,
376+ props = #'P_basic'{correlation_id = CorrelationId}}} ->
377+ ?assertEqual(<<"reply 3">>, P3)
378+ after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
379+ end,
380+
381+ %% Requester cancels consumption.
382+ ?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
383+ amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})),
384+
385+ %% Send a final reply.
386+ amqp_channel:cast(
387+ ResponderCh,
388+ #'basic.publish'{routing_key = ReplyTo},
389+ #amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
390+ payload = <<"reply 4">>}),
391+
392+ %% The final reply shouldn't be delivered since the requester cancelled consumption.
393+ receive {#'basic.deliver'{}, #amqp_msg{}} ->
394+ ct:fail("did not expect delivery after cancellation")
395+ after 100 -> ok
396+ end,
397+
398+ %% Responder checks again if the requester is still there.
399+ %% This time, the requester and its queue should be gone.
400+ try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of
401+ _ ->
402+ ct:fail("expected queue.declare to fail")
403+ catch exit:Reason ->
404+ ?assertMatch(
405+ {{_, {_, _, <<"NOT_FOUND - no queue '",
406+ ReplyTo:(byte_size(ReplyTo))/binary,
407+ "' in vhost '/'">>}}, _},
408+ Reason)
409+ end,
410+
411+ %% Clean up.
412+ #'queue.delete_ok'{} = amqp_channel:call(RequesterCh,
413+ #'queue.delete'{queue = RequestQueue}),
414+ ok = rabbit_ct_client_helpers:close_channel(RequesterCh).
246415
247416wait_for_queue_declared(Queue, Node, Config) ->
248417 eventually(
0 commit comments