Skip to content

Commit

Permalink
Refactor buffering of messages in the mqtt session process
Browse files Browse the repository at this point in the history
  • Loading branch information
mworrell committed May 31, 2024
1 parent b6332bb commit a46b70b
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 170 deletions.
19 changes: 4 additions & 15 deletions src/mqtt_sessions.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
%% @author Marc Worrell <[email protected]>
%% @copyright 2018 Marc Worrell
%% @copyright 2018-2024 Marc Worrell
%% @doc Session management for a MQTT server.
%% @end

%% Copyright 2018 Marc Worrell
%% Copyright 2018-2024 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +33,6 @@

find_session/1,
find_session/2,
fetch_queue/1,
fetch_queue/2,

session_count/1,
router_info/1,
Expand Down Expand Up @@ -147,17 +147,6 @@ session_count(Pool) ->
router_info(Pool) ->
mqtt_sessions_router:info(Pool).

-spec fetch_queue( session_ref() ) -> {ok, list( mqtt_packet_map:mqtt_packet() | binary() )} | {error, notfound}.
fetch_queue(ClientId) ->
fetch_queue(?MQTT_SESSIONS_DEFAULT, ClientId).

-spec fetch_queue( atom(), session_ref() ) -> {ok, list( mqtt_packet_map:mqtt_packet() | binary() )} | {error, notfound}.
fetch_queue(Pool, ClientId) ->
case find_session(Pool, ClientId) of
{ok, Pid} -> mqtt_sessions_process:fetch_queue(Pid);
{error, _} = Error -> Error
end.

-spec get_user_context( session_ref() ) -> {ok, term()} | {error, notfound | noproc}.
get_user_context(ClientId) ->
get_user_context(?MQTT_SESSIONS_DEFAULT, ClientId).
Expand Down
Loading

0 comments on commit a46b70b

Please sign in to comment.