Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define PROJECT_ENV
[
{ssl_cert_login,false},
{allow_anonymous, true},
{maintain_connection_on_authorization_failures, false},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{max_session_expiry_interval_seconds, 86400}, %% 1 day
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
[{datatype, {enum, [true, false]}}]}.

%% Whether maintain connection when publish or subscribe non-authorized topic.
%%
%% {maintain_connection_on_authorization_failures, false},

{mapping, "mqtt.maintain_connection_on_authorization_failures", "rabbitmq_mqtt.maintain_connection_on_authorization_failures",
[{datatype, {enum, [true, false]}}]}.

%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
Expand Down
41 changes: 38 additions & 3 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,12 @@ process_request(?SUBSCRIBE,
State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
KeepConnOnAuthFail = application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false),
{ResultRev, RetainedRev, State1} =
lists:foldl(
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) ->
%% Once a subscription failed, mark all following subscriptions
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when KeepConnOnAuthFail =:= false ->
%% If maintain_connection_on_authorization_failures is false,
%% once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyway.
{[E | L], R, S};
Expand Down Expand Up @@ -509,6 +511,17 @@ process_request(?SUBSCRIBE,
reason_codes = lists:reverse(ReasonCodesRev)}},
_ = send(Reply, State1),
case hd(ResultRev) of
{error, access_refused} ->
%% If maintain_connection_on_authorization_failures is true, do not disconnect the client,
%% send retained messages for the topics to which the client could successfully subscribe.
%% Otherwise, disconnect the client, treat the subscription failure.
case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of
true ->
State = send_retained_messages(lists:reverse(RetainedRev), State1),
{ok, State};
false ->
{error, subscribe_error, State1}
end;
{error, _} ->
{error, subscribe_error, State1};
_ ->
Expand Down Expand Up @@ -2266,7 +2279,29 @@ publish_to_queues_with_checks(
Error
end;
{error, access_refused} ->
{error, access_refused, State}
%% If maintain_connection_on_authorization_failures is true, MQTT v5 and QoS1
%% reply with PUBACK including an error reason code and keep connection,
%% MQTT v3 or QoS0 drop silently and keep connection.
%% Otherwise, disconnect.
case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of
true ->
case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of
{?MQTT_PROTO_V5, ?QOS_1} ->
Reply = #mqtt_packet{
fixed = #mqtt_packet_fixed{type = ?PUBACK},
variable = #mqtt_packet_puback{
packet_id = Msg#mqtt_msg.packet_id,
reason_code = ?RC_NOT_AUTHORIZED
}
},
_ = send(Reply, State);
_ ->
ok
end,
{ok, State};
false ->
{error, access_refused, State}
end
end.

-spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) ->
Expand Down
Loading