Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC Couch Stats Resource Tracker for tracking process local resource usage #4812

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

chewbranca
Copy link
Contributor

Couch Stats Resource Tracker Overview

We currently lack visibility into the types of operations that are inducing
system resource usage in CouchDB. We can see in the IOQ stats the raw
quantities of IO operations being induced, at categories of essentially "normal
database reads", "database writes", "view index writes", "internal
replication", and "compaction". The problem is connecting the dots between the
data in IOQ and the corresponding database operation types inducing the work,
and going a step further, connecting that with individual requests to identify
the actual operations inducing all of the work.

In particular, I believe this is especially pronounced in aggregate
database read operations that perform a fold over the id_tree or seq_tree of
the underlying .couch file. We do not currently instrument couch_btree.erl
with couch_stats to track operations, so we do not have any data to correlate
couch_btree:fold* operations with the volume of IOQ traffic flowing through
the system. A basic first step would be to add counters for the amount of btree
fold reads performed, however, that would still be insufficient as it would not
allow us to distinguish between _all_docs, _view, _find, and _changes
unless we track which type of fold operation is being induced.

Ideally, we should be able to correlate the overall IOQ throughput with the
different types of API operations being induced. Beyond that, it would be
fantastic if we can extend CouchDB's introspection capabilities such that we
can track the types of operations being performed at the user/db/request level,
allowing users to understand what operations and requests are utilizing system
resources.

This writeup proposes an approach to alleviate these introspection blindspots by
collecting additional cluster level metrics at the RPC operation level providing
context about the overall volume of requests, the magnitude of data processed,
and the response payload magnitude. By extending the global stats collection
logic we can allow for process local tracking of stats and build a system for
tracking and exposing that data at the RPC level, the node local level, and at
an HTTP API level. I think it's essential we target cluster level workloads in
addition to request level so we can correlate resource usage with individual
requests. Given how variable API operations are in CouchDB (eg single doc read
reads one doc from 3 shard replicas, whereas a _find query failing to find an
index will do a full database scan every time), I think it's important to be
able to easily see what types of operations are consuming cluster resources, and
then readily identify the heavy hitting requests.

This data can be tracked in records in ETS tables allowing for realtime
introspection of active resource consumption across all operations. We can focus
on monotonically increasing counters for tracking core operations and avoid an
assortment of complexity from negative values and dealing with histograms. We
can leave histograms to other stacks by tracking start/etc/stop timestamps. By
only focusing on monotonically increasing values as a function of resource
consumption we can take a delta between any two timestamps and get a precise
rate of consumption for this process/coordinator during the given time window.
This also allows us to iteratively track stats between any start and stop point.
The simple case is from process start to process finish, but anywhere any
between is a valid delta and we can distinguish between live snapshots and final
full process workload. By having RPC worker processes store data to the node
local ETS tables in addition to sending resouce usage deltas in rexi
replies we can track node local RPC worker resource usage in addition to node
local coordinator processes and how much work they've induced across the
cluster. This provides both insight into the active heavy operations in the
cluster but also provides users with easy access to identify heavy hitter
requests for further optimization.

By focusing on only tracking monotonically increasing counters about core
resource usage we can utilize a delta based system allowing for resource
tracking on the desired stat. Essentially this allows us to create
recon:proc_window but couchdb:proc_window and identify the most active
processes by account or docs reads or IOQ calls or rows read or JS filter
invocations or by data volume or whatever else we choose to track.

The delta based approach allows us to decouple from any particular time
interval based collection approach and instead focus on what provides the most
insight in any particular context. This allows at the base case to collect
stats time start to time end, but also provides intermediate values at whtaever
intervals/rates we desire. This also allows us to use the same stats collection
logic for both streaming and non streaming API endpoints as we can handle full
deltas or partials. It's key we support delta operations as we need to see long
running processes live, not when they're completed potentially hours later, so
delta updates are essential to live tracking on long running operations. (In
practice, long running _find query RPC workers can induce work for
potentially hours, so it's inappropriate to wait until the worker completes to
stream the resource usage, as otherwise it'll be hours behind when it actually
happened).

With the idea of using the existing couch_stats:increment_counter logic to
handle resource tracking, we can extend that increment_counter logic so that
in addition to the normal global tracking of the desired metric, we also begin
tracking a process local set of statistics allowing us so isolate resources
utilized to individual requests and even rpc workers.

This is straightforward to accomplish from within
couch_stats:increment_counter as we have the exact stats being recorded but
also we're still in the process that invoked those stats, so this draft PR also
has the local process performing operations update an ets table entry for its
own usage levels. This results in all worker processes concurrently updating
ETS tables in realtime as a function of resource usage. Unlike our normal stats
collection where we have many processes writing to a small number of keys
creating lock contention, for process local stats each process only writes to a
singular key corresponding to the process ID, so there is no overlapping stats
collection across keys, allowing us to take full advantage of ETS
write_concurrency.

Each process updates in realtime its process usage stats as the normal
couch_stats:increment_counter code paths are exercised. We utilize a
timestamp based approach to track when each update occurs, and given we're only
interacting with monotonically increasing counters, we can take the delta
between any two time points and that will give us a reasonably accurate rate of
change. This allows us to create the equivalent of recon:proc_window but for
couch_db:proc_window to find busiest
processes/requests/databases/shards/users/etc.

The timestamp delta approach also allows us to send deltas from rpc worker back
to coordinator during the normally desired rexi:reply occurances by embedding
the resource delta into the rexi reply, allowing us to incrementally send back
worker usage stats to be aggregated at the coordinator level so we can find
runaway processes prior to seeing them in the report logs. This approach also
allows us extend rexi:ping with the same logic so we can keep resource usage
stats streaming independently of when rows are streamed back to the coodinator
(a major issue we've encountered with long running bad _find queries).

This also allows us to create watchdog processes that kill a cluster process
chain in the event it has surpassed certain thresholds. This would be easily
achievable at the node local level or coordinator cluster level. We'll also
likely want some type of watchdog process to clear our old entries in the ETS
cache. The process being tracked itself shouldn't be accountable for the
longevity in the stats table, as we want to leave a small buffer of time so we
don't lose info about the processes the second they exit. This is a common
problem when using recon:proc_window where if you try and process_info a
process you found with proc_window it could be dead by the end of the proc
window, losing any potential insights. By having say a 10 second buffer on
letting processes remained completed in the stats table we can see more easily
track and interact with live work.

This system also allows us to generate a per process report of resources
consumed both for local processes invoked by way of fabric_rpc and also
cluster wide aggregations at the coordinator level allowing for realtime usage
reporting of active requests utilizing the new report engine. For example:

[report] 2023-10-06T23:24:35.974642Z [email protected] <0.296.0> -------- [csrt-pid-usage-lifetime changes_processed=0 docs_read=0 mfa="{fabric_rpc,open_doc,3}" nonce="null" pid_ref="<0.1558.0>#Ref<0.1796228611.1272446977.134363>" rows_read=0 state="{down,noproc}" updated_at="{1696,634675,974598}"]

Core Tasks

  • extend stats collection into fabric_rpc (or mango_httpd dreyfus_rpc, etc)

    • target core streaming API operations; changes feed as first step
    • each operation should include at least metrics for:
      1. number of rpc invocations
      2. number of documents processed (request magnitude)
      3. number of documents streamed out (response magnitude)
    • for example, changes feed should do:
      1. couch_stats:increment_counter([fabric_rpc, changes_feed])
      • on rpc invocation
      1. couch_stats:increment_counter([fabric_rpc, changes_feed, row_processed])
      • on rows read; perhaps also for docs read
      1. couch_stats:increment_counter([fabric_rpc, changes_feed, rows_returned])
      • when streaming rows back to the client
      1. we should also include JS filter invocations for relevant API operations
    • somewhat awkward with rows vs docs and needing js filters
    • we'll likely need to play around with what all to collect here, but in
      general we need direct visibility into core operations induced and what is
      performing them; Cluster level metrics give us the former, and reports/real
      time reporting gives us the latter
  • Extend couch_stats to do local process stat tracking

    • when we do couch_stats:increment_counter, do something like
      maybe_track_local_stat(StatOp) where we can track local process stats for
      the subset of stats we're interested in having reports and real time stats
      available for
    • this is the key second step from tracking the additional RPC data. Basically
      we start tracking the core data we want so we can use that for cluster
      status introspection and usage levels, then we gather the local stats for
      identifying which requests are heaviest
    • this provides a mechanism to track any additional stats on RPC we want
      without having to immediately introduce new metrics for all RPC operations we
      want, while skipping undesired operations
  • Store local stats into ETS backend

    • several counters backend options available. I believe an ETS table (or
      shared set of ETS tables like we do with couch_server(N) and others) that
      utilizies a combination of read_concurrency, write_concurrency, and
      distributed_counters will provide us with a performant system that allows
      for isolated tracking of per process stats in a way that allows for
      concurrent writers with no overlapping writers in addition to read
      concurrency and distributed counters to minimize impact of writing to the
      shared table across processes. Each process will be its only writer,
      minimizing concurrency issues, yet we can still do a more expensive read
      operation over the ETS table(s) to allow for aggregate real time tooling to
      expose at the remsh and HTTP API (eg couch_debug functions for finding
      heaviest requests or similar exposing of data over http to end up in a
      Fauxton interface providing real time stats
    • key off of {self(), make_ref()} to ensure uniqueness in stats table
    • also can store:
      • user
      • db/shard
      • request type
      • coordinator vs rpc vs background task
      • these are specifically for introspection and grouping on with ets queries
    • use records for tracking data
      • this allows for use of atomic ets:update_counter operations
      • easily interact with particular fields
      • also allows for ets:match and ets:select to perform aggregate queries
        about overal system load. I think this is only possible with records in
        ETS, I don't think we could use counters and select with maps?
  • Extend rexi:{reply,stream,etc} to send usage deltas

    • we can take the current stats record for the process and delta with that
      since a T0 stats record and provide a delta. We we make that delta we
      store the stats at the time of the delta TDelta1, then afterwards we
      perform our deltas against the last record snapshot and then we can continue
      to send deltas over time and still return a full workload usage if desired.
    • key thing here is we need to stream the stats back, we can't wait until the
      RPC worker is done because sometimes that takes hours. Need to be able to get
      iterative stats so we can find problematic processes live
    • need to address related issue where responses coming in from other shards
      are not accounted for in the usage as those responses are dropped, eg when
      limit is reached in Mango find at the coordinator level before shard level
    • alternative is specifically sending stats lines, but I think we'll be far
      better served by always sending deltas with every rpc line
  • Introduce watchdog process around stats collection

This has two core jobs:

  1. clear out old process stats after desired delay
    - we want to have data about recent processes that just exited to faciliate
    human interaction with busy yet fast lived processes
  2. provide an automated mechanism for killing processes
    - I think this is an important initial feature, at the very least we should
    be able to tell a cluster to kill find requests that are still going after
    an hour, for example.
    - can easily find long running processes or heavy processes using ets:select
    - can also key on user to find most active users
    - I think with a bit of thought and a few configuration options we can make
    this usable for helping out with problematic specific workflows on a per
    user/request basis
    - for example:
    • "automatically kill all find requests that have processed a million docs
      yet returned none"
  • Introduce remsh/http level introspection tooling
    • eg make couchdb:proc_window and some other tools for filtering and sorting
      the stats collection tables. This tooling should be readily available by way
      of remsh but we should also expose the same introspection APIs over http
      from the get go so that this can be utilized directly by users with curl and
      end up in a nice Fauxton dashboard showing realtime usage levels.
    • this doesn't have to be complicated, but should easily be able to query the
      local rpc node, all cluster rpc nodes, and all cluster coordinators. It
      should be easy to get a cluster level view of overall usage, grouped by the
      desired set of stats.
    • provide tooling to teardown RPC workers and coordinators with full worker
      teardown. Once we make it easy to find the heavy hitters, we need to make it
      easy to take action against them. Obviously being able to kill the processes
      is essential, but I could see providing additional functionality like
      perhaps allowing for lowering the IOQ priority for the given request if it's
      a request that genuinely needs to complete but is impacting the system.

I've got a hacked together http API to demonstrate being able to query the stats
collection across the cluster directly over http:

Screenshot 2023-10-06 at 3 29 50 PM

Overall Approach

The key task here is to build out the mechanisms for collecting/tracking stats
locally, shipping them back over the wire to the coordinator for aggregation,
handling the deltas, exposing them realtime stats, and then generating reports
with the data. I believe this to be the core work at hand, where afterwards
extending out the API operations to collect the data is fairly straightforward;
for instance, once the functionality is in place, adding changes feed reports
is simply adding the new stats to collect to the relevant fabric_rpc
functions and then ensuring we've flagged those new stats as desired fields to
expose in our resource usage tracking. We could easily extend compactions or
indexing with similar stats and reporting logic. Once we have a system in place
for tracking process local metrics, sending and aggregating the deltas, then
exposing the data over querying interfaces, we can easily extend this level of
introspection to any operations in the system, and in my opinion, we should
extend this coverage to 100% of database operations such that all resource
usage is accounted for and tracked.

This is still a bit rough in a few areas, and not everything is fully
operational, but essentially the core framework is operational and demonstrates
a working version of what I've detailed out. I believe this to be a viable
approach and that the proof of concept provides a clear path forward on how to
deliver this.

add_delta(T, get_delta())
end.

add_delta({A}, Delta) -> {A, Delta};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same could be expressed with the help of erlang:insert_element/3 and erlang:size/1:

add_delta(T, Delta) -> erlang:insert_element(erlang:size(T) + 1, T, Delta).

If maximum tuple size matters (semantically equivalent):

add_delta(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
    erlang:insert_element(erlang:size(T) + 1, T, Delta);
add_delta(T, _Delta) -> T.

Copy link
Contributor

@pgj pgj Dec 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not it be better to make it the inverse of rexi_utils:get_delta/1? (And place this function in rexi_utils as well?) That is, the Delta argument would not be expected to be like {delta, D}, but only D and the delta tag would be automatically applied on that:

add_delta(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
    erlang:insert_element(erlang:size(T) + 1, T, {delta, Delta});
add_delta(T, _Delta) -> T.

This way the following property would hold:

?FORALL(T, tuple(),
    ?FORALL(D, any(),
        ?IMPLIES(0 < erlang:size(T) andalso erlang:size(T) < 8,
            extract_delta(add_delta(T, D)) == {T, D}))).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also https://www.erlang.org/doc/apps/erts/erlang#append_element/2. However in the way it is written there might be an opportunity for structural sharing. Since in this case the new tuple is constructed from existing pointers. I am not sure whether VM would eliminate copying in this case.

I did a quick experiment just to see what gets emited.

% erlc +time +to_core add_delta.erl
-module(add_delta).

-export([test/0]).

add_delta0({A}, Delta) -> {A, Delta};
add_delta0({A, B}, Delta) -> {A, B, Delta}.

add_delta1(T, Delta) when 0 < erlang:size(T), erlang:size(T) < 8 ->
    erlang:insert_element(erlang:size(T) + 1, T, Delta);
add_delta1(T, _Delta) -> T.

add_delta2(T, Delta) -> erlang:append_element(T, Delta).

test() ->
    {1,2,3, {delta, 4}} = add_delta0({1,2,3}, {delta, 4}),
    {1,2,3, {delta, 4}} = add_delta1({1,2,3}, {delta, 4}),
    {1,2,3, {delta, 4}} = add_delta2({1,2,3}, {delta, 4}),
    ok.

The resulting core erlang is

module 'add_delta' ['module_info'/0,
		    'module_info'/1,
		    'test'/0]
    attributes [%% Line 1
		'file' =
		    %% Line 1
		    [{[97|[100|[100|[95|[100|[101|[108|[116|[97|[46|[101|[114|[108]]]]]]]]]]]]],1}]]
'add_delta0'/2 =
    %% Line 6
    ( fun (_0,_1) ->
	  ( case <_0,_1> of
	      <{A},Delta> when 'true' ->
		  {A,Delta}
	      %% Line 7
	      <{A,B},Delta> when 'true' ->
		  {A,B,Delta}
	      ( <_3,_2> when 'true' ->
		    ( primop 'match_fail'
			  (( {'function_clause',_3,_2}
			     -| [{'function',{'add_delta0',2}}] ))
		      -| [{'function',{'add_delta0',2}}] )
		-| ['compiler_generated'] )
	    end
	    -| [{'function',{'add_delta0',2}}] )
      -| [{'function',{'add_delta0',2}}] )
'add_delta1'/2 =
    %% Line 9
    ( fun (_0,_1) ->
	  ( case <_0,_1> of
	      <T,Delta>
		  when try
			let <_2> =
			    call 'erlang':'size'
				(T)
			in  let <_3> =
				call 'erlang':'<'
				    (0, _2)
			    in  let <_4> =
				    call 'erlang':'size'
					(T)
				in  let <_5> =
					call 'erlang':'<'
					    (_4, 8)
				    in  call 'erlang':'and'
					    (_3, _5)
		    of <Try> ->
			Try
		    catch <T,R> ->
			'false' ->
		  let <_6> =
		      call %% Line 10
			   'erlang':%% Line 10
				    'size'
			  (%% Line 10
			   T)
		  in  let <_7> =
			  call %% Line 10
			       'erlang':%% Line 10
					'+'
			      (%% Line 10
			       _6, %% Line 10
				   1)
		      in  %% Line 10
			  call 'erlang':'insert_element'
			      (_7, T, Delta)
	      %% Line 11
	      <T,_X_Delta> when 'true' ->
		  T
	    end
	    -| [{'function',{'add_delta1',2}}] )
      -| [{'function',{'add_delta1',2}}] )
'add_delta2'/2 =
    %% Line 13
    ( fun (_0,_1) ->
	  call 'erlang':'append_element'
	      (_0, _1)
      -| [{'function',{'add_delta2',2}}] )
'test'/0 =
    %% Line 15
    ( fun () ->
	  %% Line 16
	  case apply 'add_delta0'/2
		   ({1,2,3}, {'delta',4}) of
	    <{1,2,3,{'delta',4}}> when 'true' ->
		%% Line 17
		case apply 'add_delta1'/2
			 ({1,2,3}, {'delta',4}) of
		  <{1,2,3,{'delta',4}}> when 'true' ->
		      %% Line 18
		      case apply 'add_delta2'/2
			       ({1,2,3}, {'delta',4}) of
			<{1,2,3,{'delta',4}}> when 'true' ->
			    %% Line 19
			    'ok'
			( <_2> when 'true' ->
			      primop 'match_fail'
				  ({'badmatch',_2})
			  -| ['compiler_generated'] )
		      end
		  ( <_1> when 'true' ->
			primop 'match_fail'
			    ({'badmatch',_1})
		    -| ['compiler_generated'] )
		end
	    ( <_0> when 'true' ->
		  primop 'match_fail'
		      ({'badmatch',_0})
	      -| ['compiler_generated'] )
	  end
      -| [{'function',{'test',0}}] )
'module_info'/0 =
    ( fun () ->
	  call 'erlang':'get_module_info'
	      ('add_delta')
      -| [{'function',{'module_info',0}}] )
'module_info'/1 =
    ( fun (_0) ->
	  call 'erlang':'get_module_info'
	      ('add_delta', ( _0
			      -| [{'function',{'module_info',1}}] ))
      -| [{'function',{'module_info',1}}] )
end

In the first case there is a direct reuse of tuple elements. Most likely no data copying.

In the second case there are calls to size, construction of a new frame in try/catch and 'erlang':'insert_element'. AFAIK the 'erlang':'insert_element'(BIF written in C) copies the values https://github.com/erlang/otp/blob/1afaa459eaae8d51585acc2a363dd2714b98c9c2/erts/emulator/beam/bif.c#L2866.

In the third case 'erlang':'append_element' which is a call to BIF written in C. This BIF copies the values https://github.com/erlang/otp/blob/1afaa459eaae8d51585acc2a363dd2714b98c9c2/erts/emulator/beam/bif.c#L2833

I think the way it is written would be the fastest. I think Russell wanted to optimize this code path. If I am right Russell could you add a comment about it. Something like "The tuple match is used to enable subterm sharing and avoid data copying.".

Also there might be a middle ground.

add_delta({A, B}, Delta) -> {A, B, Delta};
add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
add_delta(T, Delta) when is_tuple(T) -> erlang:append_element(T, Delta).

end.

extract_delta({A, {delta, Delta}}) -> {{A}, Delta};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider this shorter (and somewhat more flexible) version:

extract_delta(T) ->
    Size = erlang:size(T),
    case erlang:element(Size, T) of
        {delta, Delta} when 0 < Size, Size < 9 -> {erlang:delete_element(Size, T), Delta};
        _ -> {T, undefined}
    end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not modify add_delta?

add_delta(Tuple, Delta) -> {Delta, Tuple};

This ensures that the coordinator process flushes on demonitoring of the
attachment refs in chttpd_db. The problem here is that it's possible to
receive a 'DOWN' message for the monitor ref that is not receive'ed,
causing it to stick around in the coordinator message queue while the
next http request is handled. The pending message will not become
apparent until the next fabric call is invoked, as fabric expects to
have full access to all messages in the calling process, an expectation
which is violated by the pending message and causes a case clause crash
in the fabric receive message callbacks.

I noticed this during eunit runs with stubbed attachment handles that
generate an immediate noproc message on the monitor call. Normal
operations should not result in an immediate noproc result on monitoring
the attachment process, however, any failure that causes the attachment
process to fail between acquisition of the refs and the demonitor calls
will induce this bug, causing the next http request handled by the
particular chttpd coordinator pool processs to fail on whatever next
fabric call is made.
We need to potentially extract the usage delta from the incoming RPC
message. Rather than pattern match on all possible message formats that
could potentially include usage deltas, we instead utilize
rexi_util:extract_delta which matches against tuples ending in
`{delta, Delta}`, and separates that out from the underlying message.

The subtlety here is that receiving the message to extract the delta
changes the behavior as this was previously doing a selective receive
keyed off of the Ref, and then ignoring any other messages that arrived.
I don't know if the selective receive was intended, but I don't think
it's appropriate to leave unexpected messages floating around,
especially given things like issue #4909.

Instead of utilizing a selective receive, this switches to extracting
the message and delta like we need to do, and then in the event it finds
unexpected messages they're logged and skipped.

This selective receive was masking the lack of unlink on the linked
rexi_mon pid in fix #4906. I've also noticed some rpc responses arriving
late as well, but I haven't tracked that down, so let's log when it does
happen.
Most tests should not persist config changes. This is no exception.
While the test deletes the change afterwards, system halts between setup
and teardown result in a persisted max doc setting that causes
unexpected failures on the next test suite that writes a document larger
than 50 bytes.
@@ -323,6 +324,9 @@ handle_request_int(MochiReq) ->
% Save client socket so that it can be monitored for disconnects
chttpd_util:mochiweb_client_req_set(MochiReq),

%% This is probably better in before_request, but having Path is nice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is fine IMO

@@ -25,7 +25,7 @@
setup() ->
Hashed = couch_passwords:hash_admin_password(?PASS),
ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist = false),
ok = config:set("couchdb", "max_document_size", "50"),
ok = config:set("couchdb", "max_document_size", "50", false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_Persist = false for consistency.

@@ -1982,6 +1983,7 @@ increment_stat(#db{options = Options}, Stat, Count) when
->
case lists:member(sys_db, Options) of
true ->
%% TODO: we shouldn't leak resource usage just because it's a sys_db
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm. How we can do it? Are you planing to store IS_SYSTEM_DB key on process dictionary or something?

Final solution to this question is out of scope for the first PR. TODO is good enough for first version of the feature.

row_read() -> inc(rows_read).
btree_fold() -> inc(?COUCH_BT_FOLDS).
%% TODO: do we need ioq_called and this access pattern?
ioq_called() -> is_enabled() andalso inc(ioq_calls).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do check is_enabled in updated_counter/3 are you doing a shortcut for performance reasons? If so let's add a comment.



update_counter(Field, Count) ->
is_enabled() andalso update_counter(get_pid_ref(), Field, Count).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_counter/2 implemented in terms of update_counter/3 which does call is_enabled().

ets:select(couch_stats_resource_tracker,
[{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]);
select_by_type(workers) ->
ets:select(couch_stats_resource_tracker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(node1@127.0.0.1)17> ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end).
[{#rctx{started_at = '_',updated_at = '_',exited_at = '_',
        pid_ref = '_',mon_ref = '_',mfa = '_',nonce = '_',
        from = '_',
        type = {worker,'_','_'},
        state = '_',dbname = '_',username = '_',db_open = '_',
        docs_read = '_',rows_read = '_',btree_folds = '_',
        changes_processed = '_',changes_returned = '_',
        ioq_calls = '_',io_bytes_read = '_',io_bytes_written = '_',
        js_evals = '_',js_filter = '_',js_filter_error = '_',
        js_filtered_docs = '_',mango_eval_match = '_',...},
  [],
  ['$_']}]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would only work with -include_lib("stdlib/include/ms_transform.hrl").

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an experiment

% erlc +time +to_core add_delta.erl
-module(ets_select).
-include_lib("stdlib/include/ms_transform.hrl").

-export([test/0]).


%% TODO: switch to:
%% -record(?RCTX, {
-record(rctx, {
    %% Metadata
    started_at,
    updated_at,
    exited_at, %% TODO: do we need a final exit time and additional update times afterwards?
    pid_ref,
    mon_ref,
    mfa,
    nonce,
    from,
    type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
    state = alive,
    dbname,
    username,

    %% Stats counters
    db_open = 0,
    docs_read = 0,
    rows_read = 0,
    btree_folds = 0,
    changes_processed = 0,
    changes_returned = 0,
    ioq_calls = 0,
    io_bytes_read = 0,
    io_bytes_written = 0,
    js_evals = 0,
    js_filter = 0,
    js_filter_error = 0,
    js_filtered_docs = 0,
    mango_eval_match = 0,
    %% TODO: switch record definitions to be macro based, eg:
    %% ?COUCH_BT_GET_KP_NODE = 0,
    get_kv_node = 0,
    get_kp_node = 0
}).

select() ->
    ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end)).

test() ->
    (catch ets:new(?MODULE, [named_table, {keypos, #rctx.started_at}])),
    true = ets:insert(?MODULE, [#rctx{started_at = 1, type = {worker, 1, 2}}]),
    true = ets:insert(?MODULE, [#rctx{started_at = 2, type = {other, 1, 2}}]),
    io:format("selected = ~p~n", [select()]),
    io:format("all = ~p~n", [ets:tab2list(?MODULE)]).

The output.

27> c(ets_select).
{ok,ets_select}
28> ets_select:test().
selected = [{rctx,1,undefined,undefined,undefined,undefined,undefined,
                  undefined,undefined,
                  {worker,1,2},
                  alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]
all = [{rctx,1,undefined,undefined,undefined,undefined,undefined,undefined,
             undefined,
             {worker,1,2},
             alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0},
       {rctx,2,undefined,undefined,undefined,undefined,undefined,undefined,
             undefined,
             {other,1,2},
             alive,undefined,undefined,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]
ok
29>
BREAK: (a)bort (A)bort with dump (c)ontinue (p)roc info (i)nfo
       (l)oaded (v)ersion (k)ill (D)b-tables (d)istribution

The core erlang generated by compiler

module 'ets_select' ['module_info'/0,
		     'module_info'/1,
		     'test'/0]
    attributes ['compile' =
		    [{'nowarn_unused_record',[['rctx']]}],
		%% Line 1
		'file' =
		    %% Line 1
		    [{[101|[116|[115|[95|[115|[101|[108|[101|[99|[116|[46|[101|[114|[108]]]]]]]]]]]]]],1}],
		%% Line 1
		'file' =
		    %% Line 1
		    [{[47|[85|[115|[101|[114|[115|[47|[105|[105|[108|[121|[97|[107|[47|[46|[97|[115|[100|[102|[47|[105|[110|[115|[116|[97|[108|[108|[115|[47|[101|[114|[108|[97|[110|[103|[47|[50|[53|[46|[51|[46|[50|[46|[57|[47|[108|[105|[98|[47|[115|[116|[100|[108|[105|[98|[45|[52|[46|[51|[46|[49|[46|[51|[47|[105|[110|[99|[108|[117|[100|[101|[47|[109|[115|[95|[116|[114|[97|[110|[115|[102|[111|[114|[109|[46|[104|[114|[108]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]],1}],
		%% Line 4
		'file' =
		    %% Line 4
		    [{[101|[116|[115|[95|[115|[101|[108|[101|[99|[116|[46|[101|[114|[108]]]]]]]]]]]]]],4}],
		%% Line 26
		'record' =
		    %% Line 26
		    [{'rctx',[{'record_field',{28,5},{'atom',{28,5},'started_at'}}|[{'record_field',{29,5},{'atom',{29,5},'updated_at'}}|[{'record_field',{30,5},{'atom',{30,5},'exited_at'}}|[{'record_field',{31,5},{'atom',{31,5},'pid_ref'}}|[{'record_field',{32,5},{'atom',{32,5},'mon_ref'}}|[{'record_field',{33,5},{'atom',{33,5},'mfa'}}|[{'record_field',{34,5},{'atom',{34,5},'nonce'}}|[{'record_field',{35,5},{'atom',{35,5},'from'}}|[{'record_field',{36,5},{'atom',{36,5},'type'},{'atom',{36,12},'unknown'}}|[{'record_field',{37,5},{'atom',{37,5},'state'},{'atom',{37,13},'alive'}}|[{'record_field',{38,5},{'atom',{38,5},'dbname'}}|[{'record_field',{39,5},{'atom',{39,5},'username'}}|[{'record_field',{42,5},{'atom',{42,5},'db_open'},{'integer',{42,15},0}}|[{'record_field',{43,5},{'atom',{43,5},'docs_read'},{'integer',{43,17},0}}|[{'record_field',{44,5},{'atom',{44,5},'rows_read'},{'integer',{44,17},0}}|[{'record_field',{45,5},{'atom',{45,5},'btree_folds'},{'integer',{45,19},0}}|[{'record_field',{46,5},{'atom',{46,5},'changes_processed'},{'integer',{46,25},0}}|[{'record_field',{47,5},{'atom',{47,5},'changes_returned'},{'integer',{47,24},0}}|[{'record_field',{48,5},{'atom',{48,5},'ioq_calls'},{'integer',{48,17},0}}|[{'record_field',{49,5},{'atom',{49,5},'io_bytes_read'},{'integer',{49,21},0}}|[{'record_field',{50,5},{'atom',{50,5},'io_bytes_written'},{'integer',{50,24},0}}|[{'record_field',{51,5},{'atom',{51,5},'js_evals'},{'integer',{51,16},0}}|[{'record_field',{52,5},{'atom',{52,5},'js_filter'},{'integer',{52,17},0}}|[{'record_field',{53,5},{'atom',{53,5},'js_filter_error'},{'integer',{53,23},0}}|[{'record_field',{54,5},{'atom',{54,5},'js_filtered_docs'},{'integer',{54,24},0}}|[{'record_field',{55,5},{'atom',{55,5},'mango_eval_match'},{'integer',{55,24},0}}|[{'record_field',{58,5},{'atom',{58,5},'get_kv_node'},{'integer',{58,19},0}}|[{'record_field',{59,5},{'atom',{59,5},'get_kp_node'},{'integer',{59,19},0}}]]]]]]]]]]]]]]]]]]]]]]]]]]]]}]]
'select'/0 =
    %% Line 62
    ( fun () ->
	  %% Line 63
	  call 'ets':'select'
	      ('ets_select', [{{'rctx','_','_','_','_','_','_','_','_',{'worker','_','_'},'_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_'},[],['$_']}])
      -| [{'function',{'select',0}}] )
'test'/0 =
    %% Line 65
    ( fun () ->
	  do  try
		  %% Line 66
		  call 'ets':'new'
		      ('ets_select', ['named_table'|[{'keypos',2}]])
	      of <_catch_value> ->
		  _catch_value
	      catch <Class,Reason,Stk> ->
		  'ok'
	      %% Line 67
	      case call 'ets':'insert'
		       ('ets_select', [{'rctx',1,'undefined','undefined','undefined','undefined','undefined','undefined','undefined',{'worker',1,2},'alive','undefined','undefined',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]) of
		<'true'> when 'true' ->
		    %% Line 68
		    case call 'ets':'insert'
			     ('ets_select', [{'rctx',2,'undefined','undefined','undefined','undefined','undefined','undefined','undefined',{'other',1,2},'alive','undefined','undefined',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0}]) of
		      <'true'> when 'true' ->
			  let <_2> =
			      apply %% Line 69
				    'select'/0
				  ()
			  in  do  %% Line 69
				  call 'io':'format'
				      ([115|[101|[108|[101|[99|[116|[101|[100|[32|[61|[32|[126|[112|[126|[110]]]]]]]]]]]]]]], [_2|[]])
				  let <_3> =
				      call %% Line 70
					   'ets':%% Line 70
						 'tab2list'
					  (%% Line 70
					   'ets_select')
				  in  %% Line 70
				      call 'io':'format'
					  ([97|[108|[108|[32|[61|[32|[126|[112|[126|[110]]]]]]]]]], [_3|[]])
		      ( <_1> when 'true' ->
			    primop 'match_fail'
				({'badmatch',_1})
			-| ['compiler_generated'] )
		    end
		( <_0> when 'true' ->
		      primop 'match_fail'
			  ({'badmatch',_0})
		  -| ['compiler_generated'] )
	      end
      -| [{'function',{'test',0}}] )
'module_info'/0 =
    ( fun () ->
	  call 'erlang':'get_module_info'
	      ('ets_select')
      -| [{'function',{'module_info',0}}] )
'module_info'/1 =
    ( fun (_0) ->
	  call 'erlang':'get_module_info'
	      ('ets_select', ( _0
			       -| [{'function',{'module_info',1}}] ))
      -| [{'function',{'module_info',1}}] )
end

As you can see the magic syntax get expanded

call 'ets':'select'
	      ('ets_select', [{{'rctx','_','_','_','_','_','_','_','_',{'worker','_','_'},'_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_','_'},[],['$_']}])

F = atom_to_binary(F0),
A = integer_to_binary(A0),
<<M/binary, ":", F/binary, "/", A/binary>>;
convert_mfa(undefined) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we handle a case when it is something else.

convert_mfa(Else) -> 
    list_to_binary(lists:flatten(io_lib:format("{unexpected, ~p}", [Else]))).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not. Let it crash.

Fold(FoldFun, #{}, ?MODULE).


%% Sorts largest first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

term_to_flat_json({shutdown, Reason0}) when is_atom(Reason0) ->
Reason = atom_to_binary(Reason0),
<<"shutdown: ", Reason/binary>>;
term_to_flat_json({type, Atom}) when is_atom(Atom) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not collapse these three cases into one

term_to_flat_json({type, Type}=_Type) -> convert_type(Atom);

} = Rctx,
PidRef = {term_to_flat_json(Pid), term_to_flat_json(Ref)},
MFA = case MFA0 of
{M0, F0, A0} ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{_, _, _} -> convert_mfa(MFA1)

also do we want guards is_atom(M), is_atom(F), is_integer(A), A > 0?



create_context() ->
is_enabled() andalso create_context(self()).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_enabled() is called in create_context/1

end.

create_coordinator_context(#httpd{path_parts=Parts} = Req) ->
is_enabled() andalso create_coordinator_context(Req, io_lib:format("~p", [Parts])).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_enabled/0 is called in create_coordinator_context/2

ok;
true ->
catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
false ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

([email protected])25> element(2, erlang:process_info(self(), current_stacktrace)).
[{erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,748}]},
 {erl_eval,expr_list,7,[{file,"erl_eval.erl"},{line,961}]},
 {erl_eval,expr,6,[{file,"erl_eval.erl"},{line,465}]},
 {shell,exprs,7,[{file,"shell.erl"},{line,693}]},
 {shell,eval_exprs,7,[{file,"shell.erl"},{line,649}]},
 {shell,eval_loop,3,[{file,"shell.erl"},{line,634}]}]
([email protected])26> Stack = try 12 / 0 catch Error:Reason:Stacktrace -> Stacktrace end.
[{erlang,'/',
         [12,0],
         [{error_info,#{module => erl_erts_errors}}]},
 {erl_eval,do_apply,7,[{file,"erl_eval.erl"},{line,748}]},
 {erl_eval,try_clauses,10,[{file,"erl_eval.erl"},{line,987}]},
 {erl_eval,expr,6,[{file,"erl_eval.erl"},{line,492}]},
 {shell,exprs,7,[{file,"shell.erl"},{line,693}]},
 {shell,eval_exprs,7,[{file,"shell.erl"},{line,649}]},
 {shell,eval_loop,3,[{file,"shell.erl"},{line,634}]}]

I think this is a debugging leftover. The stacktrace produced by process_info should be almost the same as from try/catch.

false ->
ok;
true ->
catch case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These blocks look very similar in all of the functions. Maybe worth extracting out as a function update_element(Idx, Value). Although in this case you would need to remove one line from stacktrace.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants