Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ define PROJECT_ENV
[
{ssl_cert_login,false},
{allow_anonymous, true},
{ignore_unauthorized, false},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{max_session_expiry_interval_seconds, 86400}, %% 1 day
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
{mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous",
[{datatype, {enum, [true, false]}}]}.

% {rabbitmq_mqtt,
% [%% Setting whether disconnect when client publish,subscribe to unauthorized topic.
%%
%% {ignore_unauthorized, false},

{mapping, "mqtt.ignore_unauthorized", "rabbitmq_mqtt.ignore_unauthorized",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a problem with this name. Ignore authorized what specifically?

This could be mqtt.ignore_authorization_failures but that would not explain when exactly and the name can be misleading.

mqtt.maintain_connection_on_authorization_failures is slightly more specific and is the best suggestion I have.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find maintain_connection_on_authorization_failures is too long.

Let's name it disconnect_on_unauthorized (defaulting to true since that's today's behaviour).

[{datatype, {enum, [true, false]}}]}.

%% If you have multiple chosts, specify the one to which the
%% adapter connects.
%%
Expand Down
52 changes: 49 additions & 3 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@

-opaque state() :: #state{}.

%% -------------------------
%% Config helper
%% -------------------------
%% Read configuration from application env (rabbitmq.conf -> mqtt.ignore_unauthorized)
ignore_unauthorized() ->
case application:get_env(rabbitmq_mqtt, ignore_unauthorized) of
{ok, Val} -> Val;
_ -> false
end.
%% -------------------------

%% NB: If init/4 returns an error, it must clean up itself because terminate/3 will not be called.
-spec init(ConnectPacket :: mqtt_packet(),
RawSocket :: rabbit_net:socket(),
Expand Down Expand Up @@ -451,10 +462,11 @@ process_request(?SUBSCRIBE,
State0 = #state{cfg = #cfg{proto_ver = ProtoVer,
binding_args_v2 = BindingArgsV2}}) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
IgnoreUnauth = ignore_unauthorized(),
{ResultRev, RetainedRev, State1} =
lists:foldl(
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) ->
%% Once a subscription failed, mark all following subscriptions
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when IgnoreUnauth =:= false ->
%% If ignore_unauthorized false, once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyway.
{[E | L], R, S};
Expand Down Expand Up @@ -509,6 +521,17 @@ process_request(?SUBSCRIBE,
reason_codes = lists:reverse(ReasonCodesRev)}},
_ = send(Reply, State1),
case hd(ResultRev) of
{error, access_refused} ->
%% ignore_unauthorized setting:
%% - true: not disconnect client,send retained messages for the successfully subscribed topics.
%% - false: disconnect client,treat subscribe failure as fatal and disconnect.
case ignore_unauthorized() of
true ->
State = send_retained_messages(lists:reverse(RetainedRev), State1),
{ok, State};
false ->
{error, subscribe_error, State1}
end;
{error, _} ->
{error, subscribe_error, State1};
_ ->
Expand Down Expand Up @@ -2266,7 +2289,30 @@ publish_to_queues_with_checks(
Error
end;
{error, access_refused} ->
{error, access_refused, State}
%% ignore_unauthorized setting:
%% - true:
%% - MQTT v5 + QoS1: reply with PUBACK including an error reason code and keep connection.
%% - MQTT v3 or QoS0: drop silently and keep connection.
%% - false: disconnect.
case ignore_unauthorized() of
true ->
case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of
{?MQTT_PROTO_V5, ?QOS_1} ->
Reply = #mqtt_packet{
fixed = #mqtt_packet_fixed{type = ?PUBACK},
variable = #mqtt_packet_puback{
packet_id = Msg#mqtt_msg.packet_id,
reason_code = ?RC_NOT_AUTHORIZED
}
},
_ = send(Reply, State);
_ ->
ok
end,
{ok, State};
false ->
{error, access_refused, State}
end
end.

-spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) ->
Expand Down
Loading