Skip to content

Add initial amqtt_client implementation#1

Open
bettio wants to merge 1 commit intoatomvm:mainfrom
bettio:add-initial-mqtt-client-impl
Open

Add initial amqtt_client implementation#1
bettio wants to merge 1 commit intoatomvm:mainfrom
bettio:add-initial-mqtt-client-impl

Conversation

@bettio
Copy link
Copy Markdown
Contributor

@bettio bettio commented Apr 14, 2026

Add amqtt_client implementation. Adding it to amqtt_client/ so we can have later also a simple mqtt server implementation.

Signed-off-by: Davide Bettio <davide@uninstall.it>
@bettio bettio force-pushed the add-initial-mqtt-client-impl branch from b38fab7 to 10f8678 Compare April 14, 2026 11:04
@petermm
Copy link
Copy Markdown

petermm commented Apr 14, 2026

per AMP:

PR Review: Add initial amqtt_client implementation

Commit: 10f8678Add initial amqtt_client implementation
Author: Davide Bettio
Files: mqtt_client.erl (471 lines), mqtt_proto.erl (567 lines), rebar config/lock
Reviewer: AI-assisted (Amp + Oracle)
Date: 2026-04-14


Summary

Initial MQTT 3.1.1 client built on gen_server with a clean protocol codec module. The architecture is sound — separating transport (mqtt_client) from wire format (mqtt_proto) is the right call. Good type specs and clear module structure.

However, there are several issues that should be addressed before merge, ranging from protocol correctness to security defaults.


Critical

1. active => false is broken for gen_tcp

File: mqtt_client.erl:323-337

needs_reader/1 returns true for #{active := false}, but open_connection/1 always opens gen_tcp with {active, true} — ignoring the user's option. This creates a race: both active-mode messages and a recv/2 reader process compete for the same data.

 open_connection(Opts) ->
     Host = maps:get(host, Opts),
     Port = maps:get(port, Opts, 1883),
-    case gen_tcp:connect(Host, Port, [{active, true}, binary]) of
+    Active = maps:get(active, Opts, true),
+    case gen_tcp:connect(Host, Port, [{active, Active}, binary]) of
         {ok, Socket} -> {ok, gen_tcp, Socket};
         {error, _} = Err -> Err
     end.

2. TLS defaults to verify_none — insecure by default

File: mqtt_client.erl:315

Default SSL opts are [{verify, verify_none}], silently allowing MITM. Should default to peer verification; make insecure mode opt-in.

-    SslOpts = maps:get(ssl_opts, Opts, [{verify, verify_none}]),
+    DefaultSslOpts = [
+        {verify, verify_peer},
+        {cacerts, public_key:cacerts_get()},
+        {server_name_indication, Host}
+    ],
+    SslOpts = maps:get(ssl_opts, Opts, DefaultSslOpts),

3. Missing type exports from mqtt_proto

File: mqtt_proto.erl:21-30

mqtt_client.erl references mqtt_proto:connack_data() and mqtt_proto:publish_data() in its -type mqtt_event() spec, but those types are not exported. This will cause compilation warnings/errors with Dialyzer.

 -export_type([
     qos/0,
     packet_id/0,
     packet_type/0,
     connect_opts/0,
     publish_opts/0,
     subscribe_opts/0,
     unsubscribe_opts/0,
+    connack_data/0,
+    publish_data/0,
+    suback_data/0,
+    packet_id_data/0,
     decoded_packet/0
 ]).

Major

4. Transport:send/2 return values ignored everywhere

Files: mqtt_client.erl:221, 236, 246, 250, 259, 262, 297

Almost every T:send(C, Packet) call discards the result. If a send fails, callers using gen_server:call/3 hang until the 30s timeout, and the pending map accumulates dead entries.

Fix: Centralize all sends through a helper that checks the return and tears down on error:

send_packet(Packet, #state{transport = T, connection = C} = State) ->
    case T:send(C, Packet) of
        ok -> {ok, State};
        {error, Reason} -> {error, Reason, State}
    end.

5. Protocol errors silently retained in buffer

File: mqtt_client.erl:365-374

process_buffer(#state{buffer = Buffer} = State) ->
    case mqtt_proto:decode(Buffer) of
        ...
        {error, {protocol_error, _Reason}} ->
            State   %% <-- bad data stays in buffer forever
    end.

On protocol error, the malformed bytes are never consumed. Every future append re-triggers the same error, and the buffer grows unboundedly. Should close the connection on protocol error.

6. No gating on CONNACK — packets sent before session is established

File: mqtt_client.erl:201-263

connect/1 returns {ok, Pid} as soon as the TCP CONNECT packet is sent — before CONNACK is received. The client immediately accepts publish/subscribe/unsubscribe calls. Per MQTT 3.1.1, the client must wait for CONNACK.

Also, a CONNACK with non-zero return code sets connected = false but doesn't close the session.

7. Packet ID reuse after wraparound

File: mqtt_client.erl:455-458

alloc_packet_id(#state{next_packet_id = 65535} = State) ->
    {65535, State#state{next_packet_id = 1}};

After wrapping, the next allocated ID may collide with an in-flight entry in pending. Should skip IDs that are already in use.

8. Pending requests never expire

File: mqtt_client.erl:227, 237, 247

If a gen_server:call/3 times out on the caller side (30s), the server still holds the From in pending forever. This leaks memory and exhausts packet IDs over time.

9. Pre-opened socket handoff race

File: mqtt_client.erl:131-145

controlling_process/2 is called after start_link, so early socket messages (or CONNACK) may go to the old owner. The return value of controlling_process/2 is also ignored.

10. Transport tags wrong for pre-opened {ssl, Socket}

File: mqtt_client.erl:122-126

case maps:get(transport, Opts, gen_tcp) of
    ssl -> {ssl, ssl_closed, ssl_error};
    _ -> {tcp, tcp_closed, tcp_error}     %% <-- {ssl, Socket} falls here
end,

A pre-opened {ssl, Socket} gets TCP tags, so active-mode SSL messages will never match handle_info.

     DefaultTags =
         case maps:get(transport, Opts, gen_tcp) of
             ssl -> {ssl, ssl_closed, ssl_error};
+            {ssl, _} -> {ssl, ssl_closed, ssl_error};
             _ -> {tcp, tcp_closed, tcp_error}
         end,

11. Socket leak on init send failure

File: mqtt_client.erl:186-204

If Transport:send(Connection, ConnectPacket) fails, init/1 returns {stop, ...} without closing the socket. The connection is leaked.

                {error, Reason} ->
+                   Transport:close(Connection),
                    {stop, {send_error, Reason}}

12. Inbound QoS 2 delivers duplicates

File: mqtt_client.erl:390-398

On inbound QoS 2 PUBLISH, the message is delivered immediately and PUBREC sent, but no receive state is stored. If the broker retransmits (before PUBREL), the message is delivered again — breaking exactly-once semantics.

13. Decoder doesn't validate fixed-header flags

File: mqtt_proto.erl:383-458

PUBREL, SUBSCRIBE, UNSUBSCRIBE must have flags 0x2; most others must be 0x0. Currently any flag value is accepted.

14. Decoder accepts QoS 3 and Packet ID 0

File: mqtt_proto.erl:386-430

  • PUBLISH with QoS 3 (reserved, invalid) is not rejected.
  • Packet ID 0 is accepted but is invalid per MQTT 3.1.1.

15. Missing .app.src

No OTP application descriptor exists. The shell config in rebar.config references amqtt_client but there's no src/amqtt_client.app.src. Runtime deps (ssl, crypto, public_key) are undeclared.

%% src/amqtt_client.app.src
{application, amqtt_client,
 [{description, "MQTT 3.1.1 client for AtomVM/OTP"},
  {vsn, "0.1.0"},
  {modules, []},
  {registered, []},
  {applications, [kernel, stdlib, ssl]},
  {env, []}
 ]}.

16. Unbounded buffer growth

File: mqtt_client.erl:273

No limit on buffer size. A malicious or buggy broker can exhaust memory by sending partial frames.


Minor

17. ssl:start() in library code is non-idiomatic

File: mqtt_client.erl:316

Should use application:ensure_all_started(ssl) or declare the dep in .app.src.

18. Credentials retained in process state

File: mqtt_client.erl:102, 197

opts (containing username/password) is stored in #state{} but appears unused after init. Should strip sensitive fields.

19. Owner process not monitored

If the owner process dies without calling disconnect/1, the client keeps running with open socket and timers. Should monitor(process, Owner) in init/1.

20. transport() type doesn't include bare gen_tcp

File: mqtt_client.erl:53

The type spec is ssl | {transport_module(), connection()} but the default transport is bare gen_tcp.

21. Keepalive timer not tied to actual I/O activity

File: mqtt_client.erl:460-470

The ping timer fires on a fixed interval from startup rather than tracking time since the last outbound packet. Also, no PINGRESP timeout — the client can't detect a dead broker.

22. connected field written but never read for gating

File: mqtt_client.erl:109, 378

Suggests a state machine that isn't enforced. Either use it to gate operations or remove it.


Overall Assessment

The code is a solid first draft with clean separation of concerns. The main areas needing attention before merge:

  1. Security: TLS defaults, credential retention
  2. Protocol correctness: CONNACK gating, QoS 2 receive state, flag validation
  3. Robustness: Send error handling, buffer limits, protocol error recovery
  4. Packaging: Missing .app.src

Recommend addressing Critical (#1-#3) and Major (#4, #5, #6, #11, #15) as minimum for merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants