-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC Couch Stats Resource Tracker for tracking process local resource usage #4812
base: main
Are you sure you want to change the base?
Conversation
This reverts commit 4f00910.
add_delta(T, get_delta()) | ||
end. | ||
|
||
add_delta({A}, Delta) -> {A, Delta}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same could be expressed with the help of erlang:insert_element/3
and erlang:size/1
:
add_delta(T, Delta) -> erlang:insert_element(erlang:size(T) + 1, T, Delta).
If maximum tuple size matters (semantically equivalent):
add_delta(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
erlang:insert_element(erlang:size(T) + 1, T, Delta);
add_delta(T, _Delta) -> T.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would not it be better to make it the inverse of rexi_utils:get_delta/1
? (And place this function in rexi_utils
as well?) That is, the Delta
argument would not be expected to be like {delta, D}
, but only D
and the delta
tag would be automatically applied on that:
add_delta(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
erlang:insert_element(erlang:size(T) + 1, T, {delta, Delta});
add_delta(T, _Delta) -> T.
This way the following property would hold:
?FORALL(T, tuple(),
?FORALL(D, any(),
?IMPLIES(0 < erlang:size(T) andalso erlang:size(T) < 8,
extract_delta(add_delta(T, D)) == {T, D}))).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also https://www.erlang.org/doc/apps/erts/erlang#append_element/2. However in the way it is written there might be an opportunity for structural sharing. Since in this case the new tuple is constructed from existing pointers. I am not sure whether VM would eliminate copying in this case.
I did a quick experiment just to see what gets emited.
% erlc +time +to_core add_delta.erl
-module(add_delta).
-export([test/0]).
add_delta0({A}, Delta) -> {A, Delta};
add_delta0({A, B}, Delta) -> {A, B, Delta}.
add_delta1(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
erlang:insert_element(erlang:size(T) + 1, T, Delta);
add_delta1(T, _Delta) -> T.
add_delta2(T, Delta) -> erlang:append_element(T, Delta).
test() ->
{1,2,3, {delta, 4}} = add_delta0({1,2,3}, {delta, 4}),
{1,2,3, {delta, 4}} = add_delta1({1,2,3}, {delta, 4}),
{1,2,3, {delta, 4}} = add_delta2({1,2,3}, {delta, 4}),
ok.
The resulting core erlang is
module 'add_delta' ['module_info'/0,
'module_info'/1,
'test'/0]
attributes [%% Line 1
'file' =
%% Line 1
[{[97|[100|[100|[95|[100|[101|[108|[116|[97|[46|[101|[114|[108]]]]]]]]]]]]],1}]]
'add_delta0'/2 =
%% Line 6
( fun (_0,_1) ->
( case <_0,_1> of
<{A},Delta> when 'true' ->
{A,Delta}
%% Line 7
<{A,B},Delta> when 'true' ->
{A,B,Delta}
( <_3,_2> when 'true' ->
( primop 'match_fail'
(( {'function_clause',_3,_2}
-| [{'function',{'add_delta0',2}}] ))
-| [{'function',{'add_delta0',2}}] )
-| ['compiler_generated'] )
end
-| [{'function',{'add_delta0',2}}] )
-| [{'function',{'add_delta0',2}}] )
'add_delta1'/2 =
%% Line 9
( fun (_0,_1) ->
( case <_0,_1> of
<T,Delta>
when try
let <_2> =
call 'erlang':'size'
(T)
in let <_3> =
call 'erlang':'<'
(0, _2)
in let <_4> =
call 'erlang':'size'
(T)
in let <_5> =
call 'erlang':'<'
(_4, 8)
in call 'erlang':'and'
(_3, _5)
of <Try> ->
Try
catch <T,R> ->
'false' ->
let <_6> =
call %% Line 10
'erlang':%% Line 10
'size'
(%% Line 10
T)
in let <_7> =
call %% Line 10
'erlang':%% Line 10
'+'
(%% Line 10
_6, %% Line 10
1)
in %% Line 10
call 'erlang':'insert_element'
(_7, T, Delta)
%% Line 11
<T,_X_Delta> when 'true' ->
T
end
-| [{'function',{'add_delta1',2}}] )
-| [{'function',{'add_delta1',2}}] )
'add_delta2'/2 =
%% Line 13
( fun (_0,_1) ->
call 'erlang':'append_element'
(_0, _1)
-| [{'function',{'add_delta2',2}}] )
'test'/0 =
%% Line 15
( fun () ->
%% Line 16
case apply 'add_delta0'/2
({1,2,3}, {'delta',4}) of
<{1,2,3,{'delta',4}}> when 'true' ->
%% Line 17
case apply 'add_delta1'/2
({1,2,3}, {'delta',4}) of
<{1,2,3,{'delta',4}}> when 'true' ->
%% Line 18
case apply 'add_delta2'/2
({1,2,3}, {'delta',4}) of
<{1,2,3,{'delta',4}}> when 'true' ->
%% Line 19
'ok'
( <_2> when 'true' ->
primop 'match_fail'
({'badmatch',_2})
-| ['compiler_generated'] )
end
( <_1> when 'true' ->
primop 'match_fail'
({'badmatch',_1})
-| ['compiler_generated'] )
end
( <_0> when 'true' ->
primop 'match_fail'
({'badmatch',_0})
-| ['compiler_generated'] )
end
-| [{'function',{'test',0}}] )
'module_info'/0 =
( fun () ->
call 'erlang':'get_module_info'
('add_delta')
-| [{'function',{'module_info',0}}] )
'module_info'/1 =
( fun (_0) ->
call 'erlang':'get_module_info'
('add_delta', ( _0
-| [{'function',{'module_info',1}}] ))
-| [{'function',{'module_info',1}}] )
end
In the first case there is a direct reuse of tuple elements. Most likely no data copying.
In the second case there are calls to size, construction of a new frame in try/catch and 'erlang':'insert_element'
. AFAIK the 'erlang':'insert_element'
(BIF written in C) copies the values https://github.com/erlang/otp/blob/1afaa459eaae8d51585acc2a363dd2714b98c9c2/erts/emulator/beam/bif.c#L2866.
In the third case 'erlang':'append_element'
which is a call to BIF written in C. This BIF copies the values https://github.com/erlang/otp/blob/1afaa459eaae8d51585acc2a363dd2714b98c9c2/erts/emulator/beam/bif.c#L2833
I think the way it is written would be the fastest. I think Russell wanted to optimize this code path. If I am right Russell could you add a comment about it. Something like "The tuple match is used to enable subterm sharing and avoid data copying.".
Also there might be a middle ground.
add_delta({A, B}, Delta) -> {A, B, Delta};
add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
add_delta(T, Delta) when is_tuple(T) -> erlang:append_element(T, Delta).
end. | ||
|
||
extract_delta({A, {delta, Delta}}) -> {{A}, Delta}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider this shorter (and somewhat more flexible) version:
extract_delta(T) ->
Size = erlang:size(T),
case erlang:element(Size, T) of
{delta, Delta} when 0 < Size, Size < 9 -> {erlang:delete_element(Size, T), Delta};
_ -> {T, undefined}
end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not modify add_delta?
add_delta(Tuple, Delta) -> {Delta, Tuple};
This ensures that the coordinator process flushes on demonitoring of the attachment refs in chttpd_db. The problem here is that it's possible to receive a 'DOWN' message for the monitor ref that is not receive'ed, causing it to stick around in the coordinator message queue while the next http request is handled. The pending message will not become apparent until the next fabric call is invoked, as fabric expects to have full access to all messages in the calling process, an expectation which is violated by the pending message and causes a case clause crash in the fabric receive message callbacks. I noticed this during eunit runs with stubbed attachment handles that generate an immediate noproc message on the monitor call. Normal operations should not result in an immediate noproc result on monitoring the attachment process, however, any failure that causes the attachment process to fail between acquisition of the refs and the demonitor calls will induce this bug, causing the next http request handled by the particular chttpd coordinator pool processs to fail on whatever next fabric call is made.
We need to potentially extract the usage delta from the incoming RPC message. Rather than pattern match on all possible message formats that could potentially include usage deltas, we instead utilize rexi_util:extract_delta which matches against tuples ending in `{delta, Delta}`, and separates that out from the underlying message. The subtlety here is that receiving the message to extract the delta changes the behavior as this was previously doing a selective receive keyed off of the Ref, and then ignoring any other messages that arrived. I don't know if the selective receive was intended, but I don't think it's appropriate to leave unexpected messages floating around, especially given things like issue #4909. Instead of utilizing a selective receive, this switches to extracting the message and delta like we need to do, and then in the event it finds unexpected messages they're logged and skipped. This selective receive was masking the lack of unlink on the linked rexi_mon pid in fix #4906. I've also noticed some rpc responses arriving late as well, but I haven't tracked that down, so let's log when it does happen.
Most tests should not persist config changes. This is no exception. While the test deletes the change afterwards, system halts between setup and teardown result in a persisted max doc setting that causes unexpected failures on the next test suite that writes a document larger than 50 bytes.
@@ -323,6 +324,9 @@ handle_request_int(MochiReq) -> | |||
% Save client socket so that it can be monitored for disconnects | |||
chttpd_util:mochiweb_client_req_set(MochiReq), | |||
|
|||
%% This is probably better in before_request, but having Path is nice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is fine IMO
@@ -25,7 +25,7 @@ | |||
setup() -> | |||
Hashed = couch_passwords:hash_admin_password(?PASS), | |||
ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist = false), | |||
ok = config:set("couchdb", "max_document_size", "50"), | |||
ok = config:set("couchdb", "max_document_size", "50", false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_Persist = false
for consistency.
@@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when | |||
-> | |||
case lists:member(sys_db, Options) of | |||
true -> | |||
%% TODO: we shouldn't leak resource usage just because it's a sys_db |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm. How we can do it? Are you planing to store IS_SYSTEM_DB
key on process dictionary or something?
Final solution to this question is out of scope for the first PR. TODO is good enough for first version of the feature.
row_read() -> inc(rows_read). | ||
btree_fold() -> inc(?COUCH_BT_FOLDS). | ||
%% TODO: do we need ioq_called and this access pattern? | ||
ioq_called() -> is_enabled() andalso inc(ioq_calls). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do check is_enabled
in updated_counter/3
are you doing a shortcut for performance reasons? If so let's add a comment.
|
||
|
||
update_counter(Field, Count) -> | ||
is_enabled() andalso update_counter(get_pid_ref(), Field, Count). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update_counter/2
implemented in terms of update_counter/3
which does call is_enabled()
.
ets:select(couch_stats_resource_tracker, | ||
[{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); | ||
select_by_type(workers) -> | ||
ets:select(couch_stats_resource_tracker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(node1@127.0.0.1)17> ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end).
[{#rctx{started_at = '_',updated_at = '_',exited_at = '_',
pid_ref = '_',mon_ref = '_',mfa = '_',nonce = '_',
from = '_',
type = {worker,'_','_'},
state = '_',dbname = '_',username = '_',db_open = '_',
docs_read = '_',rows_read = '_',btree_folds = '_',
changes_processed = '_',changes_returned = '_',
ioq_calls = '_',io_bytes_read = '_',io_bytes_written = '_',
js_evals = '_',js_filter = '_',js_filter_error = '_',
js_filtered_docs = '_',mango_eval_match = '_',...},
[],
['$_']}]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would only work with -include_lib("stdlib/include/ms_transform.hrl")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an experiment
% erlc +time +to_core add_delta.erl
-module(ets_select).
-include_lib("stdlib/include/ms_transform.hrl").
-export([test/0]).
%% TODO: switch to:
%% -record(?RCTX, {
-record(rctx, {
%% Metadata
started_at,
updated_at,
exited_at, %% TODO: do we need a final exit time and additional update times afterwards?
pid_ref,
mon_ref,
mfa,
nonce,
from,
type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
state = alive,
dbname,
username,
%% Stats counters
db_open = 0,
docs_read = 0,
rows_read = 0,
btree_folds = 0,
changes_processed = 0,
changes_returned = 0,
ioq_calls = 0,
io_bytes_read = 0,
io_bytes_written = 0,
js_evals = 0,
js_filter = 0,
js_filter_error = 0,
js_filtered_docs = 0,
mango_eval_match = 0,
%% TODO: switch record definitions to be macro based, eg:
%% ?COUCH_BT_GET_KP_NODE = 0,
get_kv_node = 0,
get_kp_node = 0
}).
select() ->
ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end)).
test() ->
(catch ets:new(?MODULE, [named_table, {keypos, #rctx.started_at}])),
true = ets:insert(?MODULE, [#rctx{started_at = 1, type = {worker, 1, 2}}]),
true = ets:insert(?MODULE, [#rctx{started_at = 2, type = {other, 1, 2}}]),
io:format("selected = ~p~n", [select()]),
io:format("all = ~p~n", [ets:tab2list(?MODULE)]).
The output.
27> c(ets_select).
{ok,ets_select}
28> ets_select:test().
selected = [{rctx,1,undefined,undefined,undefined,undefined,undefined,
undefined,undefined,
{worker,1,2},
alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]
all = [{rctx,1,undefined,undefined,undefined,undefined,undefined,undefined,
undefined,
{worker,1,2},
alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
{rctx,2,undefined,undefined,undefined,undefined,undefined,undefined,
undefined,
{other,1,2},
alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]
ok
29>
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
(l)oaded (v)ersion (k)ill (D)b-tables (d)istribution
The core erlang generated by compiler
module 'ets_select' ['module_info'/0,
'module_info'/1,
'test'/0]
attributes ['compile' =
[{'nowarn_unused_record',[['rctx']]}],
%% Line 1
'file' =
%% Line 1
[{[101|[116|[115|[95|[115|[101|[108|[101|[99|[116|[46|[101|[114|[108]]]]]]]]]]]]]],1}],
%% Line 1
'file' =
%% Line 1
[{[47|[85|[115|[101|[114|[115|[47|[105|[105|[108|[121|[97|[107|[47|[46|[97|[115|[100|[102|[47|[105|[110|[115|[116|[97|[108|[108|[115|[47|[101|[114|[108|[97|[110|[103|[47|[50|[53|[46|[51|[46|[50|[46|[57|[47|[108|[105|[98|[47|[115|[116|[100|[108|[105|[98|[45|[52|[46|[51|[46|[49|[46|[51|[47|[105|[110|[99|[108|[117|[100|[101|[47|[109|[115|[95|[116|[114|[97|[110|[115|[102|[111|[114|[109|[46|[104|[114|[108]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]],1}],
%% Line 4
'file' =
%% Line 4
[{[101|[116|[115|[95|[115|[101|[108|[101|[99|[116|[46|[101|[114|[108]]]]]]]]]]]]]],4}],
%% Line 26
'record' =
%% Line 26
[{'rctx',[{'record_field',{28,5},{'atom',{28,5},'started_at'}}|[{'record_field',{29,5},{'atom',{29,5},'updated_at'}}|[{'record_field',{30,5},{'atom',{30,5},'exited_at'}}|[{'record_field',{31,5},{'atom',{31,5},'pid_ref'}}|[{'record_field',{32,5},{'atom',{32,5},'mon_ref'}}|[{'record_field',{33,5},{'atom',{33,5},'mfa'}}|[{'record_field',{34,5},{'atom',{34,5},'nonce'}}|[{'record_field',{35,5},{'atom',{35,5},'from'}}|[{'record_field',{36,5},{'atom',{36,5},'type'},{'atom',{36,12},'unknown'}}|[{'record_field',{37,5},{'atom',{37,5},'state'},{'atom',{37,13},'alive'}}|[{'record_field',{38,5},{'atom',{38,5},'dbname'}}|[{'record_field',{39,5},{'atom',{39,5},'username'}}|[{'record_field',{42,5},{'atom',{42,5},'db_open'},{'integer',{42,15},0}}|[{'record_field',{43,5},{'atom',{43,5},'docs_read'},{'integer',{43,17},0}}|[{'record_field',{44,5},{'atom',{44,5},'rows_read'},{'integer',{44,17},0}}|[{'record_field',{45,5},{'atom',{45,5},'btree_folds'},{'integer',{45,19},0}}|[{'record_field',{46,5},{'atom',{46,5},'changes_processed'},{'integer',{46,25},0}}|[{'record_field',{47,5},{'atom',{47,5},'changes_returned'},{'integer',{47,24},0}}|[{'record_field',{48,5},{'atom',{48,5},'ioq_calls'},{'integer',{48,17},0}}|[{'record_field',{49,5},{'atom',{49,5},'io_bytes_read'},{'integer',{49,21},0}}|[{'record_field',{50,5},{'atom',{50,5},'io_bytes_written'},{'integer',{50,24},0}}|[{'record_field',{51,5},{'atom',{51,5},'js_evals'},{'integer',{51,16},0}}|[{'record_field',{52,5},{'atom',{52,5},'js_filter'},{'integer',{52,17},0}}|[{'record_field',{53,5},{'atom',{53,5},'js_filter_error'},{'integer',{53,23},0}}|[{'record_field',{54,5},{'atom',{54,5},'js_filtered_docs'},{'integer',{54,24},0}}|[{'record_field',{55,5},{'atom',{55,5},'mango_eval_match'},{'integer',{55,24},0}}|[{'record_field',{58,5},{'atom',{58,5},'get_kv_node'},{'integer',{58,19},0}}|[{'record_field',{59,5},{'atom',{59,5},'get_kp_node'},{'integer',{59,19},0}}]]]]]]]]]]]]]]]]]]]]]]]]]]]]}]]
'select'/0 =
%% Line 62
( fun () ->
%% Line 63
call 'ets':'select'
('ets_select', [{{'rctx','_','_','_','_','_','_','_','_',{'worker','_','_'},'_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_'},[],['$_']}])
-| [{'function',{'select',0}}] )
'test'/0 =
%% Line 65
( fun () ->
do try
%% Line 66
call 'ets':'new'
('ets_select', ['named_table'|[{'keypos',2}]])
of <_catch_value> ->
_catch_value
catch <Class,Reason,Stk> ->
'ok'
%% Line 67
case call 'ets':'insert'
('ets_select', [{'rctx',1,'undefined','undefined','undefined','undefined','undefined','undefined','undefined',{'worker',1,2},'alive','undefined','undefined',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]) of
<'true'> when 'true' ->
%% Line 68
case call 'ets':'insert'
('ets_select', [{'rctx',2,'undefined','undefined','undefined','undefined','undefined','undefined','undefined',{'other',1,2},'alive','undefined','undefined',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]) of
<'true'> when 'true' ->
let <_2> =
apply %% Line 69
'select'/0
()
in do %% Line 69
call 'io':'format'
([115|[101|[108|[101|[99|[116|[101|[100|[32|[61|[32|[126|[112|[126|[110]]]]]]]]]]]]]]], [_2|[]])
let <_3> =
call %% Line 70
'ets':%% Line 70
'tab2list'
(%% Line 70
'ets_select')
in %% Line 70
call 'io':'format'
([97|[108|[108|[32|[61|[32|[126|[112|[126|[110]]]]]]]]]], [_3|[]])
( <_1> when 'true' ->
primop 'match_fail'
({'badmatch',_1})
-| ['compiler_generated'] )
end
( <_0> when 'true' ->
primop 'match_fail'
({'badmatch',_0})
-| ['compiler_generated'] )
end
-| [{'function',{'test',0}}] )
'module_info'/0 =
( fun () ->
call 'erlang':'get_module_info'
('ets_select')
-| [{'function',{'module_info',0}}] )
'module_info'/1 =
( fun (_0) ->
call 'erlang':'get_module_info'
('ets_select', ( _0
-| [{'function',{'module_info',1}}] ))
-| [{'function',{'module_info',1}}] )
end
As you can see the magic syntax get expanded
call 'ets':'select'
('ets_select', [{{'rctx','_','_','_','_','_','_','_','_',{'worker','_','_'},'_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_'},[],['$_']}])
F = atom_to_binary(F0), | ||
A = integer_to_binary(A0), | ||
<<M/binary, ":", F/binary, "/", A/binary>>; | ||
convert_mfa(undefined) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we handle a case when it is something else.
convert_mfa(Else) ->
list_to_binary(lists:flatten(io_lib:format("{unexpected, ~p}", [Else]))).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe not. Let it crash.
Fold(FoldFun, #{}, ?MODULE). | ||
|
||
|
||
%% Sorts largest first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) -> | ||
Reason = atom_to_binary(Reason0), | ||
<<"shutdown: ", Reason/binary>>; | ||
term_to_flat_json({type, Atom}) when is_atom(Atom) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not collapse these three cases into one
term_to_flat_json({type, Type}=_Type) -> convert_type(Atom);
} = Rctx, | ||
PidRef = {term_to_flat_json(Pid), term_to_flat_json(Ref)}, | ||
MFA = case MFA0 of | ||
{M0, F0, A0} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{_, _, _} -> convert_mfa(MFA1)
also do we want guards is_atom(M)
, is_atom(F)
, is_integer(A)
, A > 0
?
|
||
|
||
create_context() -> | ||
is_enabled() andalso create_context(self()). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_enabled()
is called in create_context/1
end. | ||
|
||
create_coordinator_context(#httpd{path_parts=Parts} = Req) -> | ||
is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_enabled/0
is called in create_coordinator_context/2
ok; | ||
true -> | ||
catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of | ||
false -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
([email protected])25> element(2, erlang:process_info(self(), current_stacktrace)).
[{erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,748}]},
{erl_eval,expr_list,7,[{file,"erl_eval.erl"},{line,961}]},
{erl_eval,expr,6,[{file,"erl_eval.erl"},{line,465}]},
{shell,exprs,7,[{file,"shell.erl"},{line,693}]},
{shell,eval_exprs,7,[{file,"shell.erl"},{line,649}]},
{shell,eval_loop,3,[{file,"shell.erl"},{line,634}]}]
([email protected])26> Stack = try 12 / 0 catch Error:Reason:Stacktrace -> Stacktrace end.
[{erlang,'/',
[12,0],
[{error_info,#{module => erl_erts_errors}}]},
{erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,748}]},
{erl_eval,try_clauses,10,[{file,"erl_eval.erl"},{line,987}]},
{erl_eval,expr,6,[{file,"erl_eval.erl"},{line,492}]},
{shell,exprs,7,[{file,"shell.erl"},{line,693}]},
{shell,eval_exprs,7,[{file,"shell.erl"},{line,649}]},
{shell,eval_loop,3,[{file,"shell.erl"},{line,634}]}]
I think this is a debugging leftover. The stacktrace produced by process_info
should be almost the same as from try/catch
.
false -> | ||
ok; | ||
true -> | ||
catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These blocks look very similar in all of the functions. Maybe worth extracting out as a function update_element(Idx, Value)
. Although in this case you would need to remove one line from stacktrace.
Couch Stats Resource Tracker Overview
We currently lack visibility into the types of operations that are inducing
system resource usage in CouchDB. We can see in the IOQ stats the raw
quantities of IO operations being induced, at categories of essentially "normal
database reads", "database writes", "view index writes", "internal
replication", and "compaction". The problem is connecting the dots between the
data in IOQ and the corresponding database operation types inducing the work,
and going a step further, connecting that with individual requests to identify
the actual operations inducing all of the work.
In particular, I believe this is especially pronounced in aggregate
database read operations that perform a fold over the
id_tree
orseq_tree
ofthe underlying
.couch
file. We do not currently instrumentcouch_btree.erl
with
couch_stats
to track operations, so we do not have any data to correlatecouch_btree:fold*
operations with the volume of IOQ traffic flowing throughthe system. A basic first step would be to add counters for the amount of btree
fold reads performed, however, that would still be insufficient as it would not
allow us to distinguish between
_all_docs
,_view
,_find
, and_changes
unless we track which type of fold operation is being induced.
Ideally, we should be able to correlate the overall IOQ throughput with the
different types of API operations being induced. Beyond that, it would be
fantastic if we can extend CouchDB's introspection capabilities such that we
can track the types of operations being performed at the user/db/request level,
allowing users to understand what operations and requests are utilizing system
resources.
This writeup proposes an approach to alleviate these introspection blindspots by
collecting additional cluster level metrics at the RPC operation level providing
context about the overall volume of requests, the magnitude of data processed,
and the response payload magnitude. By extending the global stats collection
logic we can allow for process local tracking of stats and build a system for
tracking and exposing that data at the RPC level, the node local level, and at
an HTTP API level. I think it's essential we target cluster level workloads in
addition to request level so we can correlate resource usage with individual
requests. Given how variable API operations are in CouchDB (eg single doc read
reads one doc from 3 shard replicas, whereas a
_find
query failing to find anindex will do a full database scan every time), I think it's important to be
able to easily see what types of operations are consuming cluster resources, and
then readily identify the heavy hitting requests.
This data can be tracked in records in ETS tables allowing for realtime
introspection of active resource consumption across all operations. We can focus
on monotonically increasing counters for tracking core operations and avoid an
assortment of complexity from negative values and dealing with histograms. We
can leave histograms to other stacks by tracking start/etc/stop timestamps. By
only focusing on monotonically increasing values as a function of resource
consumption we can take a delta between any two timestamps and get a precise
rate of consumption for this process/coordinator during the given time window.
This also allows us to iteratively track stats between any start and stop point.
The simple case is from process start to process finish, but anywhere any
between is a valid delta and we can distinguish between live snapshots and final
full process workload. By having RPC worker processes store data to the node
local ETS tables in addition to sending resouce usage deltas in rexi
replies we can track node local RPC worker resource usage in addition to node
local coordinator processes and how much work they've induced across the
cluster. This provides both insight into the active heavy operations in the
cluster but also provides users with easy access to identify heavy hitter
requests for further optimization.
By focusing on only tracking monotonically increasing counters about core
resource usage we can utilize a delta based system allowing for resource
tracking on the desired stat. Essentially this allows us to create
recon:proc_window
butcouchdb:proc_window
and identify the most activeprocesses by account or docs reads or IOQ calls or rows read or JS filter
invocations or by data volume or whatever else we choose to track.
The delta based approach allows us to decouple from any particular time
interval based collection approach and instead focus on what provides the most
insight in any particular context. This allows at the base case to collect
stats time start to time end, but also provides intermediate values at whtaever
intervals/rates we desire. This also allows us to use the same stats collection
logic for both streaming and non streaming API endpoints as we can handle full
deltas or partials. It's key we support delta operations as we need to see long
running processes live, not when they're completed potentially hours later, so
delta updates are essential to live tracking on long running operations. (In
practice, long running
_find
query RPC workers can induce work forpotentially hours, so it's inappropriate to wait until the worker completes to
stream the resource usage, as otherwise it'll be hours behind when it actually
happened).
With the idea of using the existing
couch_stats:increment_counter
logic tohandle resource tracking, we can extend that
increment_counter
logic so thatin addition to the normal global tracking of the desired metric, we also begin
tracking a process local set of statistics allowing us so isolate resources
utilized to individual requests and even rpc workers.
This is straightforward to accomplish from within
couch_stats:increment_counter
as we have the exact stats being recorded butalso we're still in the process that invoked those stats, so this draft PR also
has the local process performing operations update an ets table entry for its
own usage levels. This results in all worker processes concurrently updating
ETS tables in realtime as a function of resource usage. Unlike our normal stats
collection where we have many processes writing to a small number of keys
creating lock contention, for process local stats each process only writes to a
singular key corresponding to the process ID, so there is no overlapping stats
collection across keys, allowing us to take full advantage of ETS
write_concurrency
.Each process updates in realtime its process usage stats as the normal
couch_stats:increment_counter
code paths are exercised. We utilize atimestamp based approach to track when each update occurs, and given we're only
interacting with monotonically increasing counters, we can take the delta
between any two time points and that will give us a reasonably accurate rate of
change. This allows us to create the equivalent of
recon:proc_window
but forcouch_db:proc_window
to find busiestprocesses/requests/databases/shards/users/etc.
The timestamp delta approach also allows us to send deltas from rpc worker back
to coordinator during the normally desired
rexi:reply
occurances by embeddingthe resource delta into the rexi reply, allowing us to incrementally send back
worker usage stats to be aggregated at the coordinator level so we can find
runaway processes prior to seeing them in the report logs. This approach also
allows us extend
rexi:ping
with the same logic so we can keep resource usagestats streaming independently of when rows are streamed back to the coodinator
(a major issue we've encountered with long running bad
_find
queries).This also allows us to create watchdog processes that kill a cluster process
chain in the event it has surpassed certain thresholds. This would be easily
achievable at the node local level or coordinator cluster level. We'll also
likely want some type of watchdog process to clear our old entries in the ETS
cache. The process being tracked itself shouldn't be accountable for the
longevity in the stats table, as we want to leave a small buffer of time so we
don't lose info about the processes the second they exit. This is a common
problem when using
recon:proc_window
where if you try andprocess_info
aprocess you found with
proc_window
it could be dead by the end of the procwindow, losing any potential insights. By having say a 10 second buffer on
letting processes remained completed in the stats table we can see more easily
track and interact with live work.
This system also allows us to generate a per process report of resources
consumed both for local processes invoked by way of
fabric_rpc
and alsocluster wide aggregations at the coordinator level allowing for realtime usage
reporting of active requests utilizing the new report engine. For example:
Core Tasks
extend stats collection into
fabric_rpc
(ormango_httpd
dreyfus_rpc
, etc)couch_stats:increment_counter([fabric_rpc, changes_feed])
couch_stats:increment_counter([fabric_rpc, changes_feed, row_processed])
couch_stats:increment_counter([fabric_rpc, changes_feed, rows_returned])
general we need direct visibility into core operations induced and what is
performing them; Cluster level metrics give us the former, and reports/real
time reporting gives us the latter
Extend
couch_stats
to do local process stat trackingcouch_stats:increment_counter
, do something likemaybe_track_local_stat(StatOp)
where we can track local process stats forthe subset of stats we're interested in having reports and real time stats
available for
we start tracking the core data we want so we can use that for cluster
status introspection and usage levels, then we gather the local stats for
identifying which requests are heaviest
without having to immediately introduce new metrics for all RPC operations we
want, while skipping undesired operations
Store local stats into ETS backend
shared set of ETS tables like we do with
couch_server(N)
and others) thatutilizies a combination of
read_concurrency
,write_concurrency
, anddistributed_counters
will provide us with a performant system that allowsfor isolated tracking of per process stats in a way that allows for
concurrent writers with no overlapping writers in addition to read
concurrency and distributed counters to minimize impact of writing to the
shared table across processes. Each process will be its only writer,
minimizing concurrency issues, yet we can still do a more expensive read
operation over the ETS table(s) to allow for aggregate real time tooling to
expose at the
remsh
and HTTP API (egcouch_debug
functions for findingheaviest requests or similar exposing of data over http to end up in a
Fauxton interface providing real time stats
{self(), make_ref()}
to ensure uniqueness in stats tableets:update_counter
operationsets:match
andets:select
to perform aggregate queriesabout overal system load. I think this is only possible with records in
ETS, I don't think we could use counters and select with maps?
Extend
rexi:{reply,stream,etc}
to send usage deltassince a
T0
stats record and provide a delta. We we make that delta westore the stats at the time of the delta
TDelta1
, then afterwards weperform our deltas against the last record snapshot and then we can continue
to send deltas over time and still return a full workload usage if desired.
RPC worker is done because sometimes that takes hours. Need to be able to get
iterative stats so we can find problematic processes live
are not accounted for in the usage as those responses are dropped, eg when
limit is reached in Mango find at the coordinator level before shard level
better served by always sending deltas with every rpc line
Introduce watchdog process around stats collection
This has two core jobs:
- we want to have data about recent processes that just exited to faciliate
human interaction with busy yet fast lived processes
- I think this is an important initial feature, at the very least we should
be able to tell a cluster to kill find requests that are still going after
an hour, for example.
- can easily find long running processes or heavy processes using ets:select
- can also key on user to find most active users
- I think with a bit of thought and a few configuration options we can make
this usable for helping out with problematic specific workflows on a per
user/request basis
- for example:
yet returned none"
couchdb:proc_window
and some other tools for filtering and sortingthe stats collection tables. This tooling should be readily available by way
of remsh but we should also expose the same introspection APIs over http
from the get go so that this can be utilized directly by users with curl and
end up in a nice Fauxton dashboard showing realtime usage levels.
local rpc node, all cluster rpc nodes, and all cluster coordinators. It
should be easy to get a cluster level view of overall usage, grouped by the
desired set of stats.
teardown. Once we make it easy to find the heavy hitters, we need to make it
easy to take action against them. Obviously being able to kill the processes
is essential, but I could see providing additional functionality like
perhaps allowing for lowering the IOQ priority for the given request if it's
a request that genuinely needs to complete but is impacting the system.
I've got a hacked together http API to demonstrate being able to query the stats
collection across the cluster directly over http:
Overall Approach
The key task here is to build out the mechanisms for collecting/tracking stats
locally, shipping them back over the wire to the coordinator for aggregation,
handling the deltas, exposing them realtime stats, and then generating reports
with the data. I believe this to be the core work at hand, where afterwards
extending out the API operations to collect the data is fairly straightforward;
for instance, once the functionality is in place, adding changes feed reports
is simply adding the new stats to collect to the relevant
fabric_rpc
functions and then ensuring we've flagged those new stats as desired fields to
expose in our resource usage tracking. We could easily extend compactions or
indexing with similar stats and reporting logic. Once we have a system in place
for tracking process local metrics, sending and aggregating the deltas, then
exposing the data over querying interfaces, we can easily extend this level of
introspection to any operations in the system, and in my opinion, we should
extend this coverage to 100% of database operations such that all resource
usage is accounted for and tracked.
This is still a bit rough in a few areas, and not everything is fully
operational, but essentially the core framework is operational and demonstrates
a working version of what I've detailed out. I believe this to be a viable
approach and that the proof of concept provides a clear path forward on how to
deliver this.