Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define PROJECT_ENV
[
{ssl_cert_login,false},
{allow_anonymous, true},
{maintain_connection_on_authorization_failures, false},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{max_session_expiry_interval_seconds, 86400}, %% 1 day
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
[{datatype, {enum, [true, false]}}]}.

%% Whether maintain connection when publish or subscribe non-authorized topic.
%%
%% {maintain_connection_on_authorization_failures, false},

{mapping, "mqtt.maintain_connection_on_authorization_failures", "rabbitmq_mqtt.maintain_connection_on_authorization_failures",
[{datatype, {enum, [true, false]}}]}.

%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
Expand Down
41 changes: 38 additions & 3 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ process_request(?SUBSCRIBE,
State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
KeepConnOnAuthFail = application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false),
{ResultRev, RetainedRev, State1} =
lists:foldl(
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) ->
%% Once a subscription failed, mark all following subscriptions
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when KeepConnOnAuthFail =:= false ->
%% If maintain_connection_on_authorization_failures is false,
%% once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyway.
{[E | L], R, S};
Expand Down Expand Up @@ -509,6 +511,17 @@ process_request(?SUBSCRIBE,
reason_codes = lists:reverse(ReasonCodesRev)}},
_ = send(Reply, State1),
case hd(ResultRev) of
{error, access_refused} ->
%% If maintain_connection_on_authorization_failures is true, do not disconnect the client,
%% send retained messages for the topics to which the client could successfully subscribe.
%% Otherwise, disconnect the client, treat the subscription failure.
case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of
true ->
State = send_retained_messages(lists:reverse(RetainedRev), State1),
{ok, State};
false ->
{error, subscribe_error, State1}
end;
{error, _} ->
{error, subscribe_error, State1};
_ ->
Expand Down Expand Up @@ -2266,7 +2279,29 @@ publish_to_queues_with_checks(
Error
end;
{error, access_refused} ->
{error, access_refused, State}
%% If maintain_connection_on_authorization_failures is true, MQTT v5 and QoS1
%% reply with PUBACK including an error reason code and keep connection,
%% MQTT v3 or QoS0 drop silently and keep connection.
%% Otherwise, disconnect.
case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of
true ->
case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of
{?MQTT_PROTO_V5, ?QOS_1} ->
Reply = #mqtt_packet{
fixed = #mqtt_packet_fixed{type = ?PUBACK},
variable = #mqtt_packet_puback{
packet_id = Msg#mqtt_msg.packet_id,
reason_code = ?RC_NOT_AUTHORIZED
}
},
_ = send(Reply, State);
_ ->
ok
end,
{ok, State};
false ->
{error, access_refused, State}
end
end.

-spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) ->
Expand Down
Loading