diff --git a/src/mqtt_sessions_process.erl b/src/mqtt_sessions_process.erl index 3cd0f77..5020749 100644 --- a/src/mqtt_sessions_process.erl +++ b/src/mqtt_sessions_process.erl @@ -1150,10 +1150,11 @@ queue(#{ type := Type } = Msg, MsgNr, #state{ buffer = Buffer } = State) -> maybe_purge(#state{ buffer = Buffer, awaiting_ack = WaitAcks } = State) -> State#state{ - buffer = maybe_purge_buffer(maps:size(Buffer), Buffer), + buffer = maybe_purge_buffer(Buffer), awaiting_ack = maybe_purge_ack(WaitAcks) }. +%% @doc Drop expired acks and expired waiting QoS 1/2 messages. maybe_purge_ack(WaitAcks) -> Now = mqtt_sessions_timestamp:timestamp(), maps:filter( @@ -1167,39 +1168,18 @@ maybe_purge_ack(WaitAcks) -> end, WaitAcks). -maybe_purge_buffer(Size, Buffer) when Size > ?MAX_BUFFERED-> - purge_buffer(Buffer); -maybe_purge_buffer(_Size, Buffer) -> - Buffer. - -purge_buffer(Buffer) -> +%% @doc Purge expired and oldest message from the QoS 0 buffer. +%% In case of overflow, drop the 20% oldest messages +maybe_purge_buffer(Buffer) -> Now = mqtt_sessions_timestamp:timestamp(), - {Oldest, Newest} = maps:fold( - fun(_MsgNr, #queued{ queued = Time }, {OAcc, NAcc}) -> - OAcc1 = min(OAcc, Time), - NAcc1 = max(NAcc, Time), - {OAcc1, NAcc1} - end, - {Now, 0}, - Buffer), - Buffer1 = if - Oldest =< Newest -> - QoS0PurgeAge = (Newest - Oldest) div 2, - maps:filter( - fun - (_MsgNr, #queued{ qos = 0, queued = Queued, expiry = Expiry }) -> - Now < Expiry andalso Now < (Queued + QoS0PurgeAge); - (_MsgNr, #queued{ expiry = Expiry }) -> - Now < Expiry - end, - Buffer); - true -> - Buffer - end, + Buffer1 = maps:filter( + fun(_MsgNr, #queued{ expiry = Expiry }) -> Now < Expiry end, + Buffer), case maps:size(Buffer1) > ?MAX_BUFFERED of true -> - % Drop all QoS 0 messages - maps:filter(fun(_, #queued{ qos = QoS }) -> QoS > 0 end, Buffer1); + Qs = lists:sort(maps:to_list(Buffer)), + {_, Qs1} = lists:split(maps:size(Buffer) div 5, Qs), + maps:from_list(Qs1); false -> Buffer1 end.