From 5007aef4fbb347319589ab824af9e96e84c09144 Mon Sep 17 00:00:00 2001 From: simple <991605149@qq.com> Date: Fri, 12 Dec 2025 11:48:59 +0800 Subject: [PATCH 1/3] adjust mqtt disconnect --- deps/rabbitmq_mqtt/Makefile | 1 + .../priv/schema/rabbitmq_mqtt.schema | 8 +++ .../src/rabbit_mqtt_processor.erl | 52 +++++++++++++++++-- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 2f42f7d4c453..0e93f9223602 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -6,6 +6,7 @@ define PROJECT_ENV [ {ssl_cert_login,false}, {allow_anonymous, true}, + {ignore_unauthorized, false}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {max_session_expiry_interval_seconds, 86400}, %% 1 day diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index 1be98c757edf..267ea08b6d2c 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -15,6 +15,14 @@ {mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous", [{datatype, {enum, [true, false]}}]}. +% {rabbitmq_mqtt, +% [%% Setting whether disconnect when client publish,subscribe to unauthorized topic. +%% +%% {ignore_unauthorized, false}, + +{mapping, "mqtt.ignore_unauthorized", "rabbitmq_mqtt.ignore_unauthorized", + [{datatype, {enum, [true, false]}}]}. + %% If you have multiple chosts, specify the one to which the %% adapter connects. %% diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 3481366c90a7..9e474c1d4f4e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -120,6 +120,17 @@ -opaque state() :: #state{}. +%% ------------------------- +%% Config helper +%% ------------------------- +%% Read configuration from application env (rabbitmq.conf -> mqtt.ignore_unauthorized) +ignore_unauthorized() -> + case application:get_env(rabbitmq_mqtt, ignore_unauthorized) of + {ok, Val} -> Val; + _ -> false + end. +%% ------------------------- + %% NB: If init/4 returns an error, it must clean up itself because terminate/3 will not be called. -spec init(ConnectPacket :: mqtt_packet(), RawSocket :: rabbit_net:socket(), @@ -451,10 +462,11 @@ process_request(?SUBSCRIBE, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, binding_args_v2 = BindingArgsV2}}) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), + IgnoreUnauth = ignore_unauthorized(), {ResultRev, RetainedRev, State1} = lists:foldl( - fun(_Subscription, {[{error, _} = E | _] = L, R, S}) -> - %% Once a subscription failed, mark all following subscriptions + fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when IgnoreUnauth =:= false -> + %% If ignore_unauthorized false, once a subscription failed, mark all following subscriptions %% as failed instead of creating bindings because we are going %% to close the client connection anyway. {[E | L], R, S}; @@ -509,6 +521,17 @@ process_request(?SUBSCRIBE, reason_codes = lists:reverse(ReasonCodesRev)}}, _ = send(Reply, State1), case hd(ResultRev) of + {error, access_refused} -> + %% ignore_unauthorized setting: + %% - true: not disconnect client,send retained messages for the successfully subscribed topics. + %% - false: disconnect client,treat subscribe failure as fatal and disconnect. + case ignore_unauthorized() of + true -> + State = send_retained_messages(lists:reverse(RetainedRev), State1), + {ok, State}; + false -> + {error, subscribe_error, State1} + end; {error, _} -> {error, subscribe_error, State1}; _ -> @@ -2266,7 +2289,30 @@ publish_to_queues_with_checks( Error end; {error, access_refused} -> - {error, access_refused, State} + %% ignore_unauthorized setting: + %% - true: + %% - MQTT v5 + QoS1: reply with PUBACK including an error reason code and keep connection. + %% - MQTT v3 or QoS0: drop silently and keep connection. + %% - false: disconnect. + case ignore_unauthorized() of + true -> + case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of + {?MQTT_PROTO_V5, ?QOS_1} -> + Reply = #mqtt_packet{ + fixed = #mqtt_packet_fixed{type = ?PUBACK}, + variable = #mqtt_packet_puback{ + packet_id = Msg#mqtt_msg.packet_id, + reason_code = ?RC_NOT_AUTHORIZED + } + }, + _ = send(Reply, State); + _ -> + ok + end, + {ok, State}; + false -> + {error, access_refused, State} + end end. -spec check_publish_permitted(rabbit_exchange:name(), topic(), state()) -> From 2769755ccb76877d8b9b049e18b922fea00292e6 Mon Sep 17 00:00:00 2001 From: simple <991605149@qq.com> Date: Sat, 13 Dec 2025 15:59:22 +0800 Subject: [PATCH 2/3] Adjust mqtt disconnect comment --- deps/rabbitmq_mqtt/Makefile | 2 +- .../priv/schema/rabbitmq_mqtt.schema | 7 ++-- .../src/rabbit_mqtt_processor.erl | 37 +++++++------------ 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 0e93f9223602..0fd37c07a1ec 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -6,7 +6,7 @@ define PROJECT_ENV [ {ssl_cert_login,false}, {allow_anonymous, true}, - {ignore_unauthorized, false}, + {maintain_connection_on_authorization_failures, false}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {max_session_expiry_interval_seconds, 86400}, %% 1 day diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index 267ea08b6d2c..f184fd4b9cba 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -15,12 +15,11 @@ {mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous", [{datatype, {enum, [true, false]}}]}. -% {rabbitmq_mqtt, -% [%% Setting whether disconnect when client publish,subscribe to unauthorized topic. +%% Whether maintain connection when publish or subscribe non-authorized topic. %% -%% {ignore_unauthorized, false}, +%% {maintain_connection_on_authorization_failures, false}, -{mapping, "mqtt.ignore_unauthorized", "rabbitmq_mqtt.ignore_unauthorized", +{mapping, "mqtt.maintain_connection_on_authorization_failures", "rabbitmq_mqtt.maintain_connection_on_authorization_failures", [{datatype, {enum, [true, false]}}]}. %% If you have multiple chosts, specify the one to which the diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 9e474c1d4f4e..33b0e3506041 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -120,17 +120,6 @@ -opaque state() :: #state{}. -%% ------------------------- -%% Config helper -%% ------------------------- -%% Read configuration from application env (rabbitmq.conf -> mqtt.ignore_unauthorized) -ignore_unauthorized() -> - case application:get_env(rabbitmq_mqtt, ignore_unauthorized) of - {ok, Val} -> Val; - _ -> false - end. -%% ------------------------- - %% NB: If init/4 returns an error, it must clean up itself because terminate/3 will not be called. -spec init(ConnectPacket :: mqtt_packet(), RawSocket :: rabbit_net:socket(), @@ -462,11 +451,12 @@ process_request(?SUBSCRIBE, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, binding_args_v2 = BindingArgsV2}}) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), - IgnoreUnauth = ignore_unauthorized(), + KeepConnOnAuthFail = application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false), {ResultRev, RetainedRev, State1} = lists:foldl( - fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when IgnoreUnauth =:= false -> - %% If ignore_unauthorized false, once a subscription failed, mark all following subscriptions + fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when KeepConnOnAuthFail =:= false -> + %% If maintain_connection_on_authorization_failures is false, + %% once a subscription failed, mark all following subscriptions %% as failed instead of creating bindings because we are going %% to close the client connection anyway. {[E | L], R, S}; @@ -522,10 +512,10 @@ process_request(?SUBSCRIBE, _ = send(Reply, State1), case hd(ResultRev) of {error, access_refused} -> - %% ignore_unauthorized setting: - %% - true: not disconnect client,send retained messages for the successfully subscribed topics. - %% - false: disconnect client,treat subscribe failure as fatal and disconnect. - case ignore_unauthorized() of + %% If maintain_connection_on_authorization_failures is true, do not disconnect the client, + %% send retained messages for the topics to which the client could successfully subscribe. + %% Otherwise, disconnect the client, treat the subscription failure. + case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of true -> State = send_retained_messages(lists:reverse(RetainedRev), State1), {ok, State}; @@ -2289,12 +2279,11 @@ publish_to_queues_with_checks( Error end; {error, access_refused} -> - %% ignore_unauthorized setting: - %% - true: - %% - MQTT v5 + QoS1: reply with PUBACK including an error reason code and keep connection. - %% - MQTT v3 or QoS0: drop silently and keep connection. - %% - false: disconnect. - case ignore_unauthorized() of + %% If maintain_connection_on_authorization_failures is true, MQTT v5 and QoS1 + %% reply with PUBACK including an error reason code and keep connection, + %% MQTT v3 or QoS0 drop silently and keep connection. + %% Otherwise, disconnect. + case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of true -> case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of {?MQTT_PROTO_V5, ?QOS_1} -> From 22f42f50cbd13558343c7d0e2f6559ec7a83dbcf Mon Sep 17 00:00:00 2001 From: simple <991605149@qq.com> Date: Wed, 17 Dec 2025 22:56:01 +0800 Subject: [PATCH 3/3] Add a config option to toggle mqtt disconnect on unauthorized topic mqtt.disconnect_on_unauthorized=true(defualt) disconnect the clinet when publish or subscribe unauthorized topic. mqtt.disconnect_on_unauthorized=false subscribe unauthorized topic, reply SUBACK(non-authorized) and send retained messages for the successfully subscribed topics and keep connection. publish unauthorized topic, MQTT v5 and QoS1 reply with PUBACK including an error reason code and keep connection, MQTT v3 and QoS1 reply with PUBACK no error reason code and keep connection, QoS0 drop silently and keep connection. --- deps/rabbitmq_mqtt/Makefile | 2 +- .../priv/schema/rabbitmq_mqtt.schema | 6 +- .../src/rabbit_mqtt_processor.erl | 58 ++++--- .../test/disconnect_on_unauthorized_SUITE.erl | 141 ++++++++++++++++++ 4 files changed, 180 insertions(+), 27 deletions(-) create mode 100644 deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 0fd37c07a1ec..e08f61eb61f6 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -6,7 +6,7 @@ define PROJECT_ENV [ {ssl_cert_login,false}, {allow_anonymous, true}, - {maintain_connection_on_authorization_failures, false}, + {disconnect_on_unauthorized, true}, {vhost, <<"/">>}, {exchange, <<"amq.topic">>}, {max_session_expiry_interval_seconds, 86400}, %% 1 day diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index f184fd4b9cba..5de087fc7279 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -15,11 +15,11 @@ {mapping, "mqtt.allow_anonymous", "rabbitmq_mqtt.allow_anonymous", [{datatype, {enum, [true, false]}}]}. -%% Whether maintain connection when publish or subscribe non-authorized topic. +%% Whether disconnect when publish or subscribe non-authorized topic. %% -%% {maintain_connection_on_authorization_failures, false}, +%% {disconnect_on_unauthorized, true}, -{mapping, "mqtt.maintain_connection_on_authorization_failures", "rabbitmq_mqtt.maintain_connection_on_authorization_failures", +{mapping, "mqtt.disconnect_on_unauthorized", "rabbitmq_mqtt.disconnect_on_unauthorized", [{datatype, {enum, [true, false]}}]}. %% If you have multiple chosts, specify the one to which the diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 33b0e3506041..842dabf78da8 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -451,11 +451,11 @@ process_request(?SUBSCRIBE, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, binding_args_v2 = BindingArgsV2}}) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), - KeepConnOnAuthFail = application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false), + DisconnectOnUnauthorized = application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true), {ResultRev, RetainedRev, State1} = lists:foldl( - fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when KeepConnOnAuthFail =:= false -> - %% If maintain_connection_on_authorization_failures is false, + fun(_Subscription, {[{error, _} = E | _] = L, R, S}) when DisconnectOnUnauthorized =:= true -> + %% If disconnect_on_unauthorized is true, %% once a subscription failed, mark all following subscriptions %% as failed instead of creating bindings because we are going %% to close the client connection anyway. @@ -512,14 +512,14 @@ process_request(?SUBSCRIBE, _ = send(Reply, State1), case hd(ResultRev) of {error, access_refused} -> - %% If maintain_connection_on_authorization_failures is true, do not disconnect the client, + %% If disconnect_on_unauthorized is false, do not disconnect the client, %% send retained messages for the topics to which the client could successfully subscribe. %% Otherwise, disconnect the client, treat the subscription failure. - case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of - true -> + case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of + false -> State = send_retained_messages(lists:reverse(RetainedRev), State1), {ok, State}; - false -> + true -> {error, subscribe_error, State1} end; {error, _} -> @@ -2279,27 +2279,39 @@ publish_to_queues_with_checks( Error end; {error, access_refused} -> - %% If maintain_connection_on_authorization_failures is true, MQTT v5 and QoS1 - %% reply with PUBACK including an error reason code and keep connection, - %% MQTT v3 or QoS0 drop silently and keep connection. + %% If disconnect_on_unauthorized is false, + %% MQTT v5 and QoS1 reply with PUBACK including an error reason code and keep connection, + %% MQTT v3 and QoS1 reply with PUBACK no error reason code and keep connection, + %% QoS0 drop silently and keep connection. %% Otherwise, disconnect. - case application:get_env(rabbitmq_mqtt, maintain_connection_on_authorization_failures, false) of - true -> - case {State#state.cfg#cfg.proto_ver, Msg#mqtt_msg.qos} of - {?MQTT_PROTO_V5, ?QOS_1} -> - Reply = #mqtt_packet{ - fixed = #mqtt_packet_fixed{type = ?PUBACK}, - variable = #mqtt_packet_puback{ - packet_id = Msg#mqtt_msg.packet_id, - reason_code = ?RC_NOT_AUTHORIZED - } - }, - _ = send(Reply, State); + case application:get_env(rabbitmq_mqtt, disconnect_on_unauthorized, true) of + false -> + case Msg#mqtt_msg.qos of + ?QOS_1 -> + case State#state.cfg#cfg.proto_ver of + ?MQTT_PROTO_V5 -> + Reply = #mqtt_packet{ + fixed = #mqtt_packet_fixed{type = ?PUBACK}, + variable = #mqtt_packet_puback{ + packet_id = Msg#mqtt_msg.packet_id, + reason_code = ?RC_NOT_AUTHORIZED + } + }, + _ = send(Reply, State); + _ -> + Reply = #mqtt_packet{ + fixed = #mqtt_packet_fixed{type = ?PUBACK}, + variable = #mqtt_packet_puback{ + packet_id = Msg#mqtt_msg.packet_id + } + }, + _ = send(Reply, State) + end; _ -> ok end, {ok, State}; - false -> + true -> {error, access_refused, State} end end. diff --git a/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl b/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl new file mode 100644 index 000000000000..dc7cd0ba3528 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/disconnect_on_unauthorized_SUITE.erl @@ -0,0 +1,141 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +-module(disconnect_on_unauthorized_SUITE). + +-compile([export_all, + nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("rabbit_mqtt.hrl"). + +-define(RC_Failure, 16#80). +-define(RC_NOT_AUTHORIZED, 16#87). +-define(RC_GRANTED_QOS_0, 16#0). + +all() -> + [ + {group, v4}, + {group, v5} + ]. + +groups() -> + [ + {v4, [], test_cases()}, + {v5, [], test_cases()} + ]. + +test_cases() -> + [ + publish_unauthorized_no_disconnect, + subscribe_unauthorized_no_disconnect + ]. + +init_per_suite(Config0) -> + rabbit_ct_helpers:log_environment(), + + User = <<"mqtt-user">>, + Password = <<"mqtt-password">>, + + Env = [{rabbitmq_mqtt, + [{disconnect_on_unauthorized, false}]} + ], + Config = rabbit_ct_helpers:merge_app_env(Config0, Env), + + Config1 = rabbit_ct_helpers:run_setup_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + util:enable_plugin(Config1, rabbitmq_mqtt), + + rabbit_ct_broker_helpers:add_user(Config1, User, Password), + + Config2 = rabbit_ct_helpers:set_config(Config1, [{mqtt_user, User}, {mqtt_pass, Password}]), + + rabbit_ct_broker_helpers:set_permissions(Config2, User, <<"/">>, <<".*">>, <<"">>, <<".*">>), + ok = rabbit_ct_broker_helpers:rpc(Config2, 0, + rabbit_auth_backend_internal, set_topic_permissions, + [?config(mqtt_user, Config2), <<"/">>, + <<"amq.topic">>, <<"^topic1$">>, <<"^topic1$">>, <<"acting-user">>]), + + Config2. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, {mqtt_version, Group}), + Config1. + +end_per_group(_Group, Config) -> + Config. + +%%==================================================================== +%% Test cases +%%==================================================================== + +publish_unauthorized_no_disconnect(Config) -> + C = util:connect( + <<"pub_client">>, + Config, + [{username, ?config(mqtt_user, Config)}, + {password, ?config(mqtt_pass, Config)}]), + + case ?config(mqtt_version, Config) of + v5 -> + {ok, #{reason_code := ?RC_NOT_AUTHORIZED}} = + emqtt:publish( + C, + <<"topic2">>, + <<"payload">>, + [{qos, 1}] + ); + v4 -> + {ok, _} = + emqtt:publish( + C, + <<"topic2">>, + <<"payload">>, + [{qos, 1}] + ) + end, + + timer:sleep(300), + %% Client still connected + ?assert(is_process_alive(C)), + + ok = emqtt:disconnect(C). + +subscribe_unauthorized_no_disconnect(Config) -> + C = util:connect( + <<"sub_client">>, + Config, + [{username, ?config(mqtt_user, Config)}, + {password, ?config(mqtt_pass, Config)}]), + + {ok, _, [ReasonCode]} = + emqtt:subscribe( + C, + {<<"topic2">>, qos0} + ), + + case ?config(mqtt_version, Config) of + v5 -> + ?assertEqual(?RC_NOT_AUTHORIZED, ReasonCode); + v4 -> + ?assertEqual(?RC_Failure, ReasonCode) + end, + + timer:sleep(300), + %% Client still connected + ?assert(is_process_alive(C)), + + ok = emqtt:disconnect(C).