Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell committed May 31, 2024
1 parent a46b70b commit 4b00a71
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ resend_buffered_and_unacknowledged(#state{ awaiting_ack = AwaitAck, buffer = Buf
mark_packet_sent(PacketId, #state{ awaiting_ack = AwaitAck } = State) ->
WaitFor = maps:get(PacketId, AwaitAck),
State#state{
awaiting_ack = WaitFor#wait_for{ is_sent = true }
awaiting_ack = AwaitAck#{ PacketId => WaitFor#wait_for{ is_sent = true } }
}.

% ---------------------------------------------------------------------------------------
Expand Down
202 changes: 201 additions & 1 deletion test/mqtt_sessions_protocol_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ all() ->
[
connect_disconnect_v5_test,
connect_reconnect_v5_test,
connect_reconnect_clean_v5_test
connect_reconnect_clean_v5_test,
connect_reconnect_buffered_v5_test
].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -330,3 +331,202 @@ connect_reconnect_clean_v5_test(_Config) ->
ok
end,
ok.

connect_reconnect_buffered_v5_test(_Config) ->
Connect = #{
type => connect,
protocol_name => <<"MQTT">>,
protocol_version => 5,
clean_start => true,
client_id => <<"test4">>,
will_flag => false,
username => <<>>,
password => <<>>,
properties => #{
}
},
{ok, ConnectMsg} = mqtt_packet_map:encode(5, Connect),
Options = #{
transport => self()
},
{ok, {SessionPid, <<>>}} = mqtt_sessions:incoming_connect(ConnectMsg, Options),
true = is_pid(SessionPid),
receive
{mqtt_transport, SessionPid, MsgBin} when is_binary(MsgBin) ->
{ok, {ConnAck, <<>>}} = mqtt_packet_map:decode(MsgBin),
#{
type := connack,
session_present := false,
reason_code := 0
} = ConnAck,
ok
end,
% Subscribe to a topic
Subscribe = #{
type => subscribe,
topics => [ #{ topic => <<"reconnect_v5_test">>, qos => 2 } ]
},
{ok, SubMsg} = mqtt_packet_map:encode(5, Subscribe),
mqtt_sessions:incoming_data(SessionPid, SubMsg),
receive
{mqtt_transport, SessionPid, SubAckMsgBin} when is_binary(SubAckMsgBin) ->
{ok, {SubAck, <<>>}} = mqtt_packet_map:decode(SubAckMsgBin),
#{
type := suback,
acks := [ {ok, 2} ]
} = SubAck,
ok
end,
Disconnect = #{
type => disconnect,
reason_code => 0,
properties => #{
session_expiry_interval => 3600
}
},
{ok, DisconnectMsg} = mqtt_packet_map:encode(5, Disconnect),
ok = mqtt_sessions:incoming_data(SessionPid, DisconnectMsg),
%
% The connection should be signaled to disconnect
%
receive
{mqtt_transport, SessionPid, disconnect} ->
ok
end,
%
% And the session process should still be running
%
timer:sleep(100),
true = erlang:is_process_alive(SessionPid),
%
% Message should be queued
%
PubMsg1 = #{
type => publish,
topic => <<"reconnect_v5_test">>,
payload => <<"hello-offline-qos-0">>,
qos => 0
},
ok = mqtt_sessions:publish(PubMsg1, undefined),

PubMsg2 = #{
type => publish,
topic => <<"reconnect_v5_test">>,
payload => <<"hello-offline-qos-1">>,
qos => 1
},
ok = mqtt_sessions:publish(PubMsg2, undefined),

PubMsg3 = #{
type => publish,
topic => <<"reconnect_v5_test">>,
payload => <<"hello-offline-qos-2">>,
qos => 2
},
ok = mqtt_sessions:publish(PubMsg3, undefined),
%
% Reconnect without clean_start
%
Reconnect = #{
type => connect,
protocol_name => <<"MQTT">>,
protocol_version => 5,
clean_start => false,
client_id => <<"test4">>,
will_flag => false,
username => <<>>,
password => <<>>,
properties => #{
}
},
{ok, ReconnectMsg} = mqtt_packet_map:encode(5, Reconnect),
{ok, {SessionPid, <<>>}} = mqtt_sessions:incoming_connect(ReconnectMsg, Options),
%
% We have reconnected to the existing session, check the connack
% for the 'session_present' flag.
%
receive
{mqtt_transport, SessionPid, MsgBin2} when is_binary(MsgBin2) ->
{ok, {ConnAck2, <<>>}} = mqtt_packet_map:decode(MsgBin2),
#{
type := connack,
session_present := true,
reason_code := 0
} = ConnAck2,
ok
end,
%
% Should receive queued QoS 0 message
%
receive
{mqtt_transport, SessionPid, PubMsg1Bin} when is_binary(PubMsg1Bin) ->
{ok, {PubMsg1Received, <<>>}} = mqtt_packet_map:decode(PubMsg1Bin),
#{
type := publish,
payload := <<"hello-offline-qos-0">>,
qos := 0,
packet_id := undefined
} = PubMsg1Received,
ok
after 10 ->
ct:fail(unsubscribed)
end,
%
% Should receive queued QoS 1 message
%
PacketId2 = receive
{mqtt_transport, SessionPid, PubMsg2Bin} when is_binary(PubMsg2Bin) ->
{ok, {PubMsg2Received, <<>>}} = mqtt_packet_map:decode(PubMsg2Bin),
#{
type := publish,
qos := 1,
payload := <<"hello-offline-qos-1">>,
packet_id := PId2
} = PubMsg2Received,
PId2
after 10 ->
ct:fail(unsubscribed)
end,
Ack2 = #{
type => puback,
packet_id => PacketId2
},
{ok, Ack2Data} = mqtt_packet_map:encode(5, Ack2),
mqtt_sessions_process:incoming_data(SessionPid, Ack2Data),
%
% Should receive queued QoS 2 message
%
PacketId3 = receive
{mqtt_transport, SessionPid, PubMsg3Bin} when is_binary(PubMsg3Bin) ->
{ok, {PubMsg3Received, <<>>}} = mqtt_packet_map:decode(PubMsg3Bin),
#{
type := publish,
qos := 2,
payload := <<"hello-offline-qos-2">>,
packet_id := PId3
} = PubMsg3Received,
PId3
after 10 ->
ct:fail(unsubscribed)
end,
% Acknowledge with pubrel
Rel3 = #{
type => pubrel,
packet_id => PacketId3
},
{ok, Rel3Data} = mqtt_packet_map:encode(5, Rel3),
mqtt_sessions_process:incoming_data(SessionPid, Rel3Data),
% Should receive a pubcomp back
receive
{mqtt_transport, SessionPid, PubMsg4Bin} when is_binary(PubMsg4Bin) ->
{ok, {PubMsg4Received, <<>>}} = mqtt_packet_map:decode(PubMsg4Bin),
#{
type := pubcomp,
packet_id := PacketId3
} = PubMsg4Received,
ok
after 10 ->
ct:fail(pubcomp)
end,

ok.

0 comments on commit 4b00a71

Please sign in to comment.