-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ds): move session data from mnesia
to emqx_ds
#12675
base: master
Are you sure you want to change the base?
Conversation
cd344c4
to
3daab06
Compare
3daab06
to
4928776
Compare
ref(builtin) | ||
| emqx_schema_hooks:injection_point('durable_storage.backends', []) | ||
MainRef | ||
| emqx_schema_hooks:union_option_injections(InjectionPoint, []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This addresses this comment: https://github.com/emqx/emqx-platform/pull/135#discussion_r1514765105
to_topic(Domain, SessionId0, BinKey) when is_binary(BinKey) -> | ||
SessionId = emqx_http_lib:uri_encode(SessionId0), | ||
emqx_topic:join([ | ||
<<"session">>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the reason to have a common prefix for the topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea to have namespaces, in case we might want to add other data to this database, such as some kind of metadata not tied to a particular clientid.
4928776
to
236c1aa
Compare
%% ensure modeule is loaded | ||
_ = Mod:module_info(), | ||
case erlang:function_exported(Mod, Fun, length(Args)) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be rewritten as
lists:member({Fun, length(Args)}, Mod:module_info(exports)).
3f6948e
to
e18bd65
Compare
3cefa09
to
9d452e5
Compare
|
@@ -18,12 +18,18 @@ | |||
|
|||
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). | |||
|
|||
-ifdef(STORE_STATE_IN_DS). | |||
-define(PERSISTENT_SESSION_DB, emqx_persistent_session). | |||
%% -ifdef(STORE_STATE_IN_DS). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this is a bit weird formatting for the comment, it looks like some forgotten code.
39470a5
to
b0d291a
Compare
4673fde
to
f0be544
Compare
24e31b1
to
77110ab
Compare
77110ab
to
ab20cae
Compare
%% When storing session data in DS, failing here could lead to a session being | ||
%% considered new and not loaded, since we read non-atomically from the DB. | ||
%% If getting the streams fail due to rpc problems, it leniently returns an | ||
%% empty list of streams, and that results in a "new" session. | ||
case is_restoring_session() of | ||
true -> | ||
passthrough; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds weird, though I have to admit I haven't yet finished reviewing.
I think that read failure should not be equivalent to having no session record. Is there any good reason to do that?
On the test code: wouldn't it be simpler to match on DB
and simulate failure only for the message DB? Current test code looks both super-hacky and extemely fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that read failure should not be equivalent to having no session record. Is there any good reason to do that?
Just that I'm not sure how we should handle (recoverable) failures in this case. 🤔
Sleep and retry?
On the test code: wouldn't it be simpler to match on DB and simulate failure only for the message DB? Current test code looks both super-hacky and extemely fragile.
Sounds like a much better idea, I'll fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sleep and retry?
Good question. This warrants some discussion but as a safe bet I'd propagate this failure to the client, and let them decide if they want to retry.
Also wondering what would happen now if locking is unavailable in a mria transaction, probably something effectively similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. This warrants some discussion but as a safe bet I'd propagate this failure to the client, and let them decide if they want to retry.
This comes from get_streams
being lenient with failures, so it could affect other portions of the code as well.
Also wondering what would happen now if locking is unavailable in a mria transaction, probably something effectively similar?
Not sure I understand the question, but indeed not having transactions will lead to potential inconsistencies (at least in the builtin backend; FDB could avoid them, at least if deletions are not involved).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes from
get_streams
being lenient with failures
Oh I see now. TBH this is (again) feels like a hack: we probably don't want to hide failures on that level.
Not sure I understand the question
I mean, I'm trying to understand what would happen to the client which is trying to open an existing session when current storage (mria) is unavailable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, I'm trying to understand what would happen to the client which is trying to open an existing session when current storage (mria) is unavailable.
Good question, not sure on the exact answer. Since it uses dirty reads and writes, my guess is that it'll work with the local copy of the data (maybe hang in replicants while they can't RPC a core). 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the test code to match on the DB.
%% (which allows upserting keys, depending on the properties of the layout | ||
%% keymapper). | ||
%% Default: `true'. | ||
auto_assign_timestamps => boolean() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I have to say I 100% thought that we agreed on the approach where this is part of batch entry rather than whole batch. Having it per-batch seemingly brings too much of unnecessary complexity.
I.e. something close to:
-spec store_batch(db(), [emqx_types:message() | {_TsID :: emqx_ds:time(), emqx_types:message()}], message_store_opts()) ->
store_batch_result().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I have to say I 100% thought that we agreed on the approach where this is part of batch entry rather than whole batch.
Yes, and I thought that it implies having this option per-batch, so that we know whether to auto assign or not.
With the proposed spec change, wouldn't that move the auto-assignment to the caller instead of replication layer? Or are you suggesting that, if TsID
is defined by the caller, it is not subject to the auto-assign?
I didn't understand that suggestion when we talked before. I'll adapt to use this then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are you suggesting that, if
TsID
is defined by the caller, it is not subject to the auto-assign?
Yes, that's basically the idea.
I didn't understand that suggestion when we talked before. I'll adapt to use this then.
Great! Sorry for not being clear enough (as usual 🫠).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a cleaner approach indeed, thanks for the suggestion. 😸
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔️
), | ||
ok. | ||
|
||
delete_all_sessions(N) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd definitely vote for not reaching that deep into the implementation here. Should be enough to emqtt:disconnect(C, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
each client? After all, we strive to respect the spec in this regard, thereby these sessions should disappear from API responses anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✔️
052d34e
to
ecb25a9
Compare
ecb25a9
to
f3d54c8
Compare
persistent_term:get(?UNION_OPTION_PT_KEY(PointName), undefined) =/= undefined. | ||
|
||
call_if_implemented(Mod, Fun, Args, Default) -> | ||
%% ensure modeule is loaded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%% ensure modeule is loaded | |
%% ensure module is loaded |
Partially fixes https://emqx.atlassian.net/browse/EMQX-11841
Release version: v/e5.7
Summary
This implements storing session data into DS itself, instead of relying on mnesia.
By default, things will stay as before, using mnesia to store session data.
To enable this new feature, one must compile the project setting the environment variable
STORE_STATE_IN_DS=1
.PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update