Skip to content

Commit

Permalink
Simplify purge of QoS 0 messages (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell authored May 31, 2024
1 parent e2ea345 commit 28e5d45
Showing 1 changed file with 11 additions and 31 deletions.
42 changes: 11 additions & 31 deletions src/mqtt_sessions_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1150,10 +1150,11 @@ queue(#{ type := Type } = Msg, MsgNr, #state{ buffer = Buffer } = State) ->

maybe_purge(#state{ buffer = Buffer, awaiting_ack = WaitAcks } = State) ->
State#state{
buffer = maybe_purge_buffer(maps:size(Buffer), Buffer),
buffer = maybe_purge_buffer(Buffer),
awaiting_ack = maybe_purge_ack(WaitAcks)
}.

%% @doc Drop expired acks and expired waiting QoS 1/2 messages.
maybe_purge_ack(WaitAcks) ->
Now = mqtt_sessions_timestamp:timestamp(),
maps:filter(
Expand All @@ -1167,39 +1168,18 @@ maybe_purge_ack(WaitAcks) ->
end,
WaitAcks).

maybe_purge_buffer(Size, Buffer) when Size > ?MAX_BUFFERED->
purge_buffer(Buffer);
maybe_purge_buffer(_Size, Buffer) ->
Buffer.

purge_buffer(Buffer) ->
%% @doc Purge expired and oldest message from the QoS 0 buffer.
%% In case of overflow, drop the 20% oldest messages
maybe_purge_buffer(Buffer) ->
Now = mqtt_sessions_timestamp:timestamp(),
{Oldest, Newest} = maps:fold(
fun(_MsgNr, #queued{ queued = Time }, {OAcc, NAcc}) ->
OAcc1 = min(OAcc, Time),
NAcc1 = max(NAcc, Time),
{OAcc1, NAcc1}
end,
{Now, 0},
Buffer),
Buffer1 = if
Oldest =< Newest ->
QoS0PurgeAge = (Newest - Oldest) div 2,
maps:filter(
fun
(_MsgNr, #queued{ qos = 0, queued = Queued, expiry = Expiry }) ->
Now < Expiry andalso Now < (Queued + QoS0PurgeAge);
(_MsgNr, #queued{ expiry = Expiry }) ->
Now < Expiry
end,
Buffer);
true ->
Buffer
end,
Buffer1 = maps:filter(
fun(_MsgNr, #queued{ expiry = Expiry }) -> Now < Expiry end,
Buffer),
case maps:size(Buffer1) > ?MAX_BUFFERED of
true ->
% Drop all QoS 0 messages
maps:filter(fun(_, #queued{ qos = QoS }) -> QoS > 0 end, Buffer1);
Qs = lists:sort(maps:to_list(Buffer)),
{_, Qs1} = lists:split(maps:size(Buffer) div 5, Qs),
maps:from_list(Qs1);
false ->
Buffer1
end.
Expand Down

0 comments on commit 28e5d45

Please sign in to comment.