From 4b00a71a28a76fb35586c5619b8bbb23de443256 Mon Sep 17 00:00:00 2001 From: Marc Worrell Date: Fri, 31 May 2024 17:30:57 +0200 Subject: [PATCH] Add test --- src/mqtt_sessions_process.erl | 2 +- test/mqtt_sessions_protocol_SUITE.erl | 202 +++++++++++++++++++++++++- 2 files changed, 202 insertions(+), 2 deletions(-) diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 1eefb07..786d542 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -990,7 +990,7 @@ resend_buffered_and_unacknowledged(#state{ awaiting_ack = AwaitAck, buffer = Buf mark_packet_sent(PacketId, #state{ awaiting_ack = AwaitAck } = State) -> WaitFor = maps:get(PacketId, AwaitAck), State#state{ - awaiting_ack = WaitFor#wait_for{ is_sent = true } + awaiting_ack = AwaitAck#{ PacketId => WaitFor#wait_for{ is_sent = true } } }. % --------------------------------------------------------------------------------------- diff --git a/test/mqtt_sessions_protocol_SUITE.erl b/test/mqtt_sessions_protocol_SUITE.erl index ce351ae..8629a00 100644 --- a/test/mqtt_sessions_protocol_SUITE.erl +++ b/test/mqtt_sessions_protocol_SUITE.erl @@ -32,7 +32,8 @@ all() -> [ connect_disconnect_v5_test, connect_reconnect_v5_test, - connect_reconnect_clean_v5_test + connect_reconnect_clean_v5_test, + connect_reconnect_buffered_v5_test ]. %%-------------------------------------------------------------------- @@ -330,3 +331,202 @@ connect_reconnect_clean_v5_test(_Config) -> ok end, ok. + +connect_reconnect_buffered_v5_test(_Config) -> + Connect = #{ + type => connect, + protocol_name => <<"MQTT">>, + protocol_version => 5, + clean_start => true, + client_id => <<"test4">>, + will_flag => false, + username => <<>>, + password => <<>>, + properties => #{ + } + }, + {ok, ConnectMsg} = mqtt_packet_map:encode(5, Connect), + Options = #{ + transport => self() + }, + {ok, {SessionPid, <<>>}} = mqtt_sessions:incoming_connect(ConnectMsg, Options), + true = is_pid(SessionPid), + receive + {mqtt_transport, SessionPid, MsgBin} when is_binary(MsgBin) -> + {ok, {ConnAck, <<>>}} = mqtt_packet_map:decode(MsgBin), + #{ + type := connack, + session_present := false, + reason_code := 0 + } = ConnAck, + ok + end, + % Subscribe to a topic + Subscribe = #{ + type => subscribe, + topics => [ #{ topic => <<"reconnect_v5_test">>, qos => 2 } ] + }, + {ok, SubMsg} = mqtt_packet_map:encode(5, Subscribe), + mqtt_sessions:incoming_data(SessionPid, SubMsg), + receive + {mqtt_transport, SessionPid, SubAckMsgBin} when is_binary(SubAckMsgBin) -> + {ok, {SubAck, <<>>}} = mqtt_packet_map:decode(SubAckMsgBin), + #{ + type := suback, + acks := [ {ok, 2} ] + } = SubAck, + ok + end, + Disconnect = #{ + type => disconnect, + reason_code => 0, + properties => #{ + session_expiry_interval => 3600 + } + }, + {ok, DisconnectMsg} = mqtt_packet_map:encode(5, Disconnect), + ok = mqtt_sessions:incoming_data(SessionPid, DisconnectMsg), + % + % The connection should be signaled to disconnect + % + receive + {mqtt_transport, SessionPid, disconnect} -> + ok + end, + % + % And the session process should still be running + % + timer:sleep(100), + true = erlang:is_process_alive(SessionPid), + % + % Message should be queued + % + PubMsg1 = #{ + type => publish, + topic => <<"reconnect_v5_test">>, + payload => <<"hello-offline-qos-0">>, + qos => 0 + }, + ok = mqtt_sessions:publish(PubMsg1, undefined), + + PubMsg2 = #{ + type => publish, + topic => <<"reconnect_v5_test">>, + payload => <<"hello-offline-qos-1">>, + qos => 1 + }, + ok = mqtt_sessions:publish(PubMsg2, undefined), + + PubMsg3 = #{ + type => publish, + topic => <<"reconnect_v5_test">>, + payload => <<"hello-offline-qos-2">>, + qos => 2 + }, + ok = mqtt_sessions:publish(PubMsg3, undefined), + % + % Reconnect without clean_start + % + Reconnect = #{ + type => connect, + protocol_name => <<"MQTT">>, + protocol_version => 5, + clean_start => false, + client_id => <<"test4">>, + will_flag => false, + username => <<>>, + password => <<>>, + properties => #{ + } + }, + {ok, ReconnectMsg} = mqtt_packet_map:encode(5, Reconnect), + {ok, {SessionPid, <<>>}} = mqtt_sessions:incoming_connect(ReconnectMsg, Options), + % + % We have reconnected to the existing session, check the connack + % for the 'session_present' flag. + % + receive + {mqtt_transport, SessionPid, MsgBin2} when is_binary(MsgBin2) -> + {ok, {ConnAck2, <<>>}} = mqtt_packet_map:decode(MsgBin2), + #{ + type := connack, + session_present := true, + reason_code := 0 + } = ConnAck2, + ok + end, + % + % Should receive queued QoS 0 message + % + receive + {mqtt_transport, SessionPid, PubMsg1Bin} when is_binary(PubMsg1Bin) -> + {ok, {PubMsg1Received, <<>>}} = mqtt_packet_map:decode(PubMsg1Bin), + #{ + type := publish, + payload := <<"hello-offline-qos-0">>, + qos := 0, + packet_id := undefined + } = PubMsg1Received, + ok + after 10 -> + ct:fail(unsubscribed) + end, + % + % Should receive queued QoS 1 message + % + PacketId2 = receive + {mqtt_transport, SessionPid, PubMsg2Bin} when is_binary(PubMsg2Bin) -> + {ok, {PubMsg2Received, <<>>}} = mqtt_packet_map:decode(PubMsg2Bin), + #{ + type := publish, + qos := 1, + payload := <<"hello-offline-qos-1">>, + packet_id := PId2 + } = PubMsg2Received, + PId2 + after 10 -> + ct:fail(unsubscribed) + end, + Ack2 = #{ + type => puback, + packet_id => PacketId2 + }, + {ok, Ack2Data} = mqtt_packet_map:encode(5, Ack2), + mqtt_sessions_process:incoming_data(SessionPid, Ack2Data), + % + % Should receive queued QoS 2 message + % + PacketId3 = receive + {mqtt_transport, SessionPid, PubMsg3Bin} when is_binary(PubMsg3Bin) -> + {ok, {PubMsg3Received, <<>>}} = mqtt_packet_map:decode(PubMsg3Bin), + #{ + type := publish, + qos := 2, + payload := <<"hello-offline-qos-2">>, + packet_id := PId3 + } = PubMsg3Received, + PId3 + after 10 -> + ct:fail(unsubscribed) + end, + % Acknowledge with pubrel + Rel3 = #{ + type => pubrel, + packet_id => PacketId3 + }, + {ok, Rel3Data} = mqtt_packet_map:encode(5, Rel3), + mqtt_sessions_process:incoming_data(SessionPid, Rel3Data), + % Should receive a pubcomp back + receive + {mqtt_transport, SessionPid, PubMsg4Bin} when is_binary(PubMsg4Bin) -> + {ok, {PubMsg4Received, <<>>}} = mqtt_packet_map:decode(PubMsg4Bin), + #{ + type := pubcomp, + packet_id := PacketId3 + } = PubMsg4Received, + ok + after 10 -> + ct:fail(pubcomp) + end, + + ok.