Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Couch stats resource tracker v2 #5213

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

chewbranca
Copy link
Contributor

Couch Stats Resource Tracker

This is a rework of PR: #4812

Overview and Motivation

Couch Stats Resource Tracker (CSRT) is a new engine for tracking the amount of
resource operations induced by processes within CouchDB's Erlang VM. This PR
specifically targets coordinator processes and RPC workers induced by said
coordinator processes, but the underlying stats collection framework is designed
to be usable by anything consuming resources in CouchDB, such that we can extend
this out to background jobs like indexing, compaction, and replication. The long
term stretch goal is to be able to account for all system level activity induced
by CouchDB, but the practical goal is to be able to understand where and why
most of the resources in the system are being utilized, at a request/job level
granularity.

This PR is primarily motivated by the current lack of visibility into
identifying what operations are inducing large quantities of work. We have
node/cluster level visibility into the amounts of IO operations being induced,
but we lack the ability to identify what request/operation induced that
workload.

This is especially problematic when dealing with large fold operations that do
local filtering (eg filtered _changes feeds or filtered _find queries)
because these operations lack the normal coordinator induced rate limiting that
results naturally from funneling individual results back to the coordinator node
to sort and process. In the local filtering case, we essentially do a direct
fold over the shard and invoke a filter function on that doc to find a matching
result, but in the event the docs fail to match this is essentially a local
tight loop over the shard that when run in parallel can easily dominate IO
operations. The _find logic has been extended to generate report's to log
and identify the heavy hitter requests, especially the degenerative _find
queries that do a full database scan and find zero results to return.

Approach in this PR and differences from mango reports

This PR takes the idea of the mango reports and creates a unified framework for
tracking these statistics in real time allowing for global querying of node and
cluster level resource usage in the live processes. This PR reflexes an
opinionated deviation from the approach in the mango reports, and instead of
introducing new stats tracking, it proposes the core approach of:

Any stat worth tracking for reporting system usage is clearly worth tracking
properly as a proper CouchDB metric.

So instead of embedding new stats like in the mango find reports, this system
hooks into the couch_stats:increment_counter logic to piggy back off of the
stats being collected in real time by the process doing the work, and then
funnels those updates into an ets table keyed off of the local process, and
joined at a cluster level by the coordinator ref, allowing for cluster level
aggregation of individual http requests, live. These tracked stats are forwarded
back to the coordinator process by way of embedding in the rexi RPC messages
such that long running find queries and other heavy weight processes can be
identified and tracked.

We then log a report detailing the total resources induced by the http request
so we can retroactively identify which requests are consuming the most
resources. The reporting by default is configured to only happen at the
coordinator level, but if you're able to handle the log volume it can be enabled
for all workers too. Down the road a nice feature would be supporting writing
reports directly to ClickHouse, some binary format, or even just a terse text
format to allow for increased report volumes; currently high throughput report
generation for coordinator and all rpc workers on high Q databases is
substantial in data volume, but there's much room for optimization given the
verbose nature of the current reports and how well they gzip up.

New metrics introduced

As a result of the above mentioned philosophy of properly tracking the stats
worth tracking, this PR introduces a handful of new stats, predominantly in one
of two forms, listed below. I've also included sample screenshots of the new
metrics plotted during a 30 minute benchmark run that started with an empty
cluster and aggressively created new databases/indexes/documents while growing
worker count progressively during that run. All http traffic was ceased after 30
minutes, and you can clearly see the phase change in operations when that
happened.

  1. Core stats for previously missing metrics

eg new stats for counting couch_btree reads and writes on kp/kv nodes

Screenshot 2024-08-23 at 5 53 42 PM
  1. RPC work induced

The idea here is that we should be tracking 1-3 things for all induced RPC work:

  1. RPC worker spawned type
  2. RPC worker docs processed
  3. RPC worker docs returned

Item 1) is the primary item for core RPC work, this allows us to see the volume
of RPC workers spawned over time per node. Items 2) and 3) are specific to
aggregate operations, with 3) specific to aggregate operations that can perform
local filtering.

The idea is that we can a) see the types of work being induced on nodes over
time, observe how much documents are being processed by the aggregate worker
operations, and then b) directly observe major discrepancies between docs
processed and docs returned, as that's indicative of a missing index or poorly
designed workflows.

Here's a full cluster view of all nodes rpc traffic:

Screenshot 2024-08-23 at 5 53 23 PM

In the case of our benchmark above, the workload was evenly distributed so all
nodes performed similarly. This is a lot of data, but can easily be aggregated
by node or type to identify non-homogeneous workloads. Here's a simpler view
showing per node RPC workloads:

Screenshot 2024-08-23 at 5 53 32 PM

Tracking table for accumulating and querying metrics

The central tracking table is a global ets table utilizing read_concurrency,
write_concurrency, and distributed_counters, which results in an
impressively performant global table in which all processes update their local
stats. Writes are isolated to the process doing the work, so there is no
contention of parallel writes to the same key. Aggregations are performed
against the full ets table, but updates are constrained to a given key are
constrained to the corresponding worker process.

Previous design that failed to scale

In previous PR I attempted to utilize a singular gen_server for monitoring the
processes and performing some cleanup operations. This was optimized down to
only being a dedicated server doing handle_call({monitor, Pid},..) -> monitor_pid(), {reply, ok, State}). handle_info({DOWN, ..., REF, ...}) -> ets:delete(maps:get(Ref, RefToPid)) and that was insufficient to handle the
load. I tried various approaches but I was able to melt a singular gen_server
easily. It's necessary to have a process monitor outside of the local process
because coordinator/worker processes can and will get killed mid operation,
therefore after clause/function based approaches are insufficient.

Even with that minimal of a workload, I was able to melt the gen_server:

Screenshot 2024-04-04 at 3 45 15 PM

and that's with it really doing a minimum workload:

Screenshot 2024-04-09 at 5 19 37 PM

This was my final attempt to make a singular gen_server architecture, but with
80 core nodes I'm now fully convinced it's no longer viable to do singular
gen_server systems in hot code paths and we must take more distributed
approaches, either by way of sharding the servers or fully distributed.

Distributed tracker appraoch in CSRT v2

In the case of CSRT, I engaged a fully distributed approach that spawns a
dedicated monitor process when a CSRT context is created by a coordinator or
worker. This monitor process handles the lifetime of a given entry in the ets
table so that we delete the worker entry when the worker is done. This dedicated
monitor process also generates the report afterwards. Switching to the dedicated
monitor approach eliminated the scaling issues I encountered, and the current
architecture is able to readily handle max throughput load.

The CSRT context is created in the coordinator process directly in
chttpd:process_request_int, and in the worker process directly in the
spawned process's initialization of rexi_server:init_p. The context is
basically just erlang:put(?CONTEXT_MARKER, {self(), make_ref()}) which is then
the ets key used for tracking the coordinator process while it handles the given
request.

The make_ref() ensures that the coordinator processes that are reused in the
Mochiweb worker pool distinguish between individual http requests. More
generally, this allows a long lived process to isolate subsets of its own work.
This is essential if we want to add the ability to funnel the CSRT context
through IOQ/couch_file/couch_mrview/etc to accumulate

A note on PidRef vs nonce for identification

Currently we don't funnel the coordinator's PidRef across the wire and instead
rely on the nonce as a global aggregator key, and then the coordinator
aggregations happen directly when the RPC responses are received and the deltas
are extracted. We could add this fairly easily in rexi:cast_ref, but I do
wonder if we'd be better off skipping the ref entirely and instead using
{self(), Nonce} as the key given we already funnel it around. That won't work
for background operations, so we'd need a make_ref() fallback for tracking
those jobs, but I do like the idea of consolidating down to using the nonce
given it's already the unique reference to the request inducing the workload,
and we already send it over the wire ubiquitously for all coordinator/worker
operations through rexi:cast_ref.

Context creation and lifecycle

We create the initial context in
chttpd:process_request_int/rexi_server:init_p for the coordinator/workers,
respectively, and then we progressively fill in the details for things like
dbname/username/handler_fun so that we can track those data points naturally as
they arise in the request codepath, for example adding the chttp_db handler when
entering those functions, or setting the username after chttpd_auth:authorize
returns. Similarly, in fabric_rpc we piggy back off of
fabric_rpc:set_io_priority called by every callback function to cleanly set
the dbname involved in the RPC request. We could also extend this to track the
ddoc/index involved, if any.

The idea is to make it easy for the local process to update its global tracked
state at the appropriate points in the codebase so we can iteratively extend out
the tracking throughout the codebase. Background indexing and compaction are
apt targets for wiring in CSRT and we could even extend the /_active_tasks
jobs to include status about resource usage.

When we initiate the context, for workers, coordinators, or any future job
types, we spawn a dedicated tracking monitor process that sits by idly until it
gets a stop message from normal lifecycle termination, or it gets a DOWN
message from the process doing work. In either case, the tracker process cleans
up the corresponding ets table entry (the only form of two processes writing to
the same key in the ets tracker, but handed off with no interweaving) and then
conditionally generates a report to log the work induced.

The default, when CSRT is enabled, is to log a report for the coordinator
process totalling the tracked work induced by the RPC workers to fulfill the
given request. It's configurable to also log workers, and there's some
rudimentary filtering capabilities to allow for logging of only a specific rpc
worker type, but this could be improved upon considerably. In general, the
volume of these logs can be sizable, for example a singular http view request
against a Q=64 database on a healthy cluster induces 196 RPC workers, all
inducing their own volume of work and potentially logging a report. A compact
form or additional filtering capabilities to log interesting reports would be
beneficial.

Status and next steps

Overall I'm happy with the performance of the core tracking system, the modern
ETS improvements with distributed counters on top of atomic increments are
really impressive! I had the test suite fully passing recently but I've done a
decent bit of cleanup and restructuring recently so I haven't checked out a full
CI run in a minute, I'll see how it looks on this PR and address anything that
comes up. I'd like to start getting a review here and see what folks think, I
think the structure of the code is in a good place to discuss and get feedback
on. A few next steps to do:

  • add more tests
  • add Dialyzer specs couch_stats_resource_tracker.erl at the very least
  • fix time tracking, the tnow() is not a positive_integer()
  • add some standard query functions using ets:fun2ms
    • The parse transform makes these more challenging to handle dynamically, so
      let's add a handful of standard performant functions, eg:
      • sort_by({dbname, shard, user}, ioq_calls)
      • sort_by(user, ioq_calls)
      • sort_by({user, request_type}, docs_processed)
      • sort_by({request_type, user}, docs_processed)
      • sort_by(user, get_kv_nodes)
      • etc
  • think about additional filtering on logs:
    • skip report fields with zero values?
    • set minimum thresholds for reports?
    • allow skipping of some coordinator reports? eg welcome handler
  • design on metrics namespacing from rexi_server:init_p
    • eg should_increment([M, F, spawned]) vs
      • should_increment([rexi_rpc, M, F, spawned]
      • or should_increment([couchdb, rpc_worker, M, F, spawned]

Sample report

Filtered changes http request with ?include_docs=true and JS filter:

[report] 2024-09-02T23:46:08.175264Z [email protected] <0.1012.0> -------- [csrt-pid-usage-lifetime changes_returned=1528 db_open=7 dbname="foo" docs_read=1528 from="null" get_kp_nodes=14 get_kv_nodes=180 ioq_calls=3254 js_filter=1528 js_filter_error=0 js_filtered_docs=1528 mfa="null" nonce="bce5d2ce6e" path="/foo/_changes" pid_ref="<0.965.0>:#Ref<0.2614010930.743440385.194505>" rows_read=1528 started_at=-576460745696 type="coordinator:GET:fun chttpd_db:handle_changes_req/2" updated_at=-576460745696 username="adm"]

end
end,

{Resp, _Bad} = rpc:multicall(erlang, apply, [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +228 to +234
handle_resource_status_req(#httpd{method = 'POST'} = Req) ->
ok = chttpd:verify_is_server_admin(Req),
chttpd:validate_ctype(Req, "application/json"),
{Props} = chttpd:json_body_obj(Req),
Action = proplists:get_value(<<"action">>, Props),
Key = proplists:get_value(<<"key">>, Props),
Val = proplists:get_value(<<"val">>, Props),
Copy link
Contributor

@nickva nickva Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're introducing a new API add some docs or some description (examples) how it works, and the intent behind it. What would CouchDB users use it for, how it is different than metrics, and active tasks, etc...

end,
[]
]),
%%io:format("{CSRT}***** GOT RESP: ~p~n", [Resp]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left-over debug comment

end,
[]
]),
%% TODO: incorporate Bad responses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using erpc, the handling of bad responses is a bit cleaner there, I think

ok = chttpd:verify_is_server_admin(Req),
{Resp, Bad} = rpc:multicall(erlang, apply, [
fun() ->
{node(), couch_stats_resource_tracker:active()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having to run in a closure just to get node() in the response, consider using an explicit list of nodes [node() | nodes()] with a plain [M, F, A] and then zipping over the nodes in the response. It's a bit longer but it's less fragile.

Comment on lines 425 to 428
{[couchdb, query_server, js_filter], [
{type, counter},
{desc, <<"number of JS filter invocations">>}
]}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a .query_server.calls.ddoc_filter counter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh good catch, I didn't see those were hidden in https://github.com/apache/couchdb/blob/main/src/couch/src/couch_os_process.erl#L257-L259

I've reworked this in 4db08c9f2 to use the existing metrics and only add a new metric [couchdb, query_server, volume, ddoc_filter] for counting the number of docs funneled through the filter. I also reworked the logic so that volume metric is captured in the same module and style as the bump_time_stat function above.

Comment on lines +445 to +452
{[couchdb, btree, get_node, kp_node], [
{type, counter},
{desc, <<"number of couch btree kp_nodes read">>}
]}.
{[couchdb, btree, get_node, kv_node], [
{type, counter},
{desc, <<"number of couch btree kv_nodes read">>}
]}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if this is a bit too low level? Or rather, btrees are used for view and db files. It's not clear which btrees the metrics refer to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear which btrees the metrics refer to.

So that's the thing, we don't have current visibility into the volume of couch_btree:get_node calls or what type of operations are inducing them. These could come from normal db doc reads, all_docs folds, compaction folds, index build folds, all these different systems induce heavy couch_btree operations and we don't know where/when/why.

The idea here is to start tracking the amount of core btree operations by the workers, allowing us to track it at a process level like we're doing with the rest of the stats in this PR, and then we'll at least be able to see the total volume of couch_btree operations induced, and then use the reports to quantify the requests/dbs/etc that performed those operations.

A key aspect of the tracking here is that couch_btree:* are still inside the caller processes, eg the workers themselves, so we can do the process local tracking without having to extend beyond process boundaries. This CSRT engine does not yet handle nested work tracking, so for instance we can't funnel back information from the couch_file pid back through IOQ. Tracking the data here is a solid boundary of per request work being induced that we can track, while also providing much needed insight into overall database operations.

As for tracking of kv_node separately from kp_node, I think this is important because we're in a btree and performance is proportional to the log of the size of the btree, and we currently don't have any metrics illuminating the impact of btree size on read throughput. In my experience it's a fairly common experience for users to forget that while logN in a B+tree is really good, it's not a constant value and once you get into databases with billions of documents it takes longer to get through the btree, on every request. My hope here is that by tracking kp_node reads separately from kv_nodes we can more easily understand the workloads induced by larger databases, and more easily quantify that, something that I personally find challenging to do today.

I thought about trying to funnel down into couch_btree an indicator variable of the datatype, so for instance we could track changes btree operations independently of compaction folds, but that'd either require passing through a more informative context into couch_btree:* or some ugly pdict hacks. I think this approach of getting as close to the caller boundary in as we can before we funnel off to another process gives us low level insight into couch_btree operations while also allowing us to track it at a request/worker level so that retroactively we'll be able to go look at the report logs and quantify what requests were responsible for the induced btree operations. It's important to note that this approach will also extend naturally to background worker CSRT tracking, eg we can easily extend compaction, replication, and indexing to create a CSRT context and then we'll be able to quantify the work induced by those background jobs too.

Comment on lines +545 to +546
%% TODO: wire in csrt tracking
couch_stats:increment_counter([couchdb, query_server, js_filter_error]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a general counter for os process errors, normal exits and error exits. Probably don't need an explicit one just for filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4db08c9.

%% Only potentially track positive increments to counters
-spec maybe_track_local_counter(any(), any()) -> ok.
maybe_track_local_counter(Name, Val) when is_integer(Val) andalso Val > 0 ->
%%io:format("maybe_track_local[~p]: ~p~n", [Val, Name]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug left-over?

%% If after, only currently tracked metrics declared in the app's
%% stats_description.cfg will be trackable locally. Pros/cons.
%io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
ok = maybe_track_local_counter(Name, Value),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is enabled for all requests, a bit worried about the performance here as we add an ets operation over a simple integer counter bump. Was there any noticeable performance impact from it, during perf runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An essential part of the work done in this PR is to ensure that this is as minimally impactful as possible. We add ets config lookup to check if it's enabled, and then we perform the same type of simple integer counter bump utilizing ets:update_counter with distributed_counters enabled and no concurrent writes to the same key, all writes to any given key are only written to from the process tracking its own work, and then finally the tracker process will delete the entry from ets when the process dies and closes the context.

@nickva you did have me change my hard coded atom function to one utilizing maps (although I did mange to use static maps), so I'll make sure to do some more testing with the final version to compare. I plan on doing some comparisons between the start of this PR and the final version, to make sure there weren't any regressions induced by the changes.

%% Should maybe_track_local happen before or after notify?
%% If after, only currently tracked metrics declared in the app's
%% stats_description.cfg will be trackable locally. Pros/cons.
%io:format("NOTIFY_EXISTING_METRIC: ~p || ~p || ~p~n", [Name, Op, Type]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left-over debug statement

Comment on lines 28 to 33
-export([
get_pid_ref/0,
set_pid_ref/1,
create_pid_ref/0,
close_pid_ref/0, close_pid_ref/1
]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider just doing one per-line for consistency with most of the code-base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 46fba8e.

end.

%% monotonic time now in millisecionds
tnow() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: consider adding units to the function name. now_msec or something like that.

Comment on lines 215 to 217
active() -> active_int(all).
active_coordinators() -> active_int(coordinators).
active_workers() -> active_int(workers).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using one-line functions. It's shorter, but it's unusual for our code base. They may work for a larger table of values sometimes but for these cases we put the body on a new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5ac8a91.

named_table,
public,
{decentralized_counters, true},
{write_concurrency, true},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using auto here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSRT is predicated on ETS write_concurrency and decentralized_counters being essential. I can't think of a scenario where we wouldn't ever want write_concurrency on the table, especially given Erlang no longer supports non SMP builds. I think it's worth being explicit with our intentions here, and probably worth it for me to add a comment here in the code around our expectations.

Copy link
Contributor

@nickva nickva Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about non-smp builds, but I guess I don't see why for OTP25+ versions we wouldn't use {write_concurency, auto}.

This is just from taking the Erlang docs at their word here:

{decentralized_counters,boolean()}(Since OTP 23.0) Performance tuning. Defaults to true for all tables with the write_concurrency option set to auto.

Are you thinking maybe they didn't document it properly and we should set both anyway? We should take a look a their ets source then, to verify.

catch evict(PidRef),
demonitor(MonRef),
ok;
{'DOWN', MonRef, _Type, _0DPid, _Reason0} ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_0DPid looks a bit odd just like _Reason0 can just use Pid to match exactly or _Pid or just _

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was a bit of old code, but if you look below I left a TODO to get some feedback on and I could use some thoughts:

            %% TODO: should we pass reason to log_process_lifetime_report?
            %% Reason = case Reason0 of
            %%     {shutdown, Shutdown0} ->
            %%         Shutdown = atom_to_binary(Shutdown0),
            %%         <<"shutdown: ", Shutdown/binary>>;
            %%     Reason0 ->
            %%         Reason0
            %% end,

I want to record the shutdown reason when the process dies, but there's so many different formats to the potential Reason tuple that I wasn't sure of a good way to address that. I had a few approaches to translate some standard error formats, but it felt clunky and incompletely. Anyone have any good ideas on this front?

%% TODO: do we need cleanup here?
log_process_lifetime_report(PidRef),
catch evict(PidRef),
demonitor(MonRef),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're stopping probably not worth bothering demonitoring

%% TODO: decide on which naming scheme:
%% {[fabric_rpc, get_all_security, spawned], [
%% {[fabric_rpc, spawned, get_all_security], [
{[fabric_rpc, get_all_security, spawned], [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-streaming requests the spawn seem redundant. For instance we don't have get_all_security spawns, then row sends. Could just have them all as fabric_rpc.$call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's spawned because it directly corresponds to rexi_server spawning the RPC worker processes by way of rexi_server:init_p here https://github.com/apache/couchdb/blob/couch-stats-resource-tracker-v2/src/rexi/src/rexi_server.erl#L143 and we handle that here https://github.com/apache/couchdb/blob/couch-stats-resource-tracker-v2/src/couch_stats/src/couch_stats.erl#L126-L131.

The core goal here is to track the volume or RPC work being spawned on the system, the amount of data they're processing, and the amount of data returned, as appropriate. All RPC workers are spawned processes, and tracking the rate of arrival is an important metric we're missing, so we track spawned for the core API operations (the should_track logic keeps it to a targeted subset of API operations to focus on commonly useful metrics, but it's arguable we should extend this tracking to all induced RPC work). Many of the API operations stream out many rows, so we want to be able to track the volume of returned rows/data, and then similarly, some of the API operations filter rows and return a subset of the data.

Currently it's way too difficult to correlate a CouchDB node's IOQ usage with the underlying database operations being performed. Seeing the rate of incoming work, the amount of data processed, and the amount returned opens up a lot of visibility on that front, especially around things like filtered operations where we haven't had metrics around the skipped work that is tricky to introspect.

If you're comment is more around the naming hierarchy, my approach was to have a consistent naming hierarchy with [fabric_rpc, $call, spawned] for all the tracked RPC workers, and then similarly with processed or returned, I figured this naming hierarchy works well with various metrics stacks for doing things like graph([fabric_rpc, *, spawned]) to easily see the rate of induced workers, like I did in the Graphite links in the top description of the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another note, spawned is ubiquitous as these processes are all being spawned in the same manner that we track directly, but the processed vs returned vs etc is far more operation specific and we should look at what other types of data are important to track here, for instance, what would be pertinent data to track out of Clouseau/Nouveau requests? Similarly, even singular doc API operations might have relevant magnitudes, eg we should probably track the number of doc revisions the worker had to load to complete the request.

Comment on lines +160 to +162
_ ->
%% Got a message from an old Ref that timed out, try again
await_shard_response(Ref, Name, Rest, Opts, Factor, Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the logic a bit to drop any messages. This doesn't seem to be related to the stats tracking, but it's a good change and might be better do it as a separate PR to discuss there.

This goes back to the effort some years back to use {rexi, ...} prefix for rexi messages to avoid dropping or consuming any 2 items tuple message. Wonder if we should finish that work first and incorporate message cleanup with it. But overall it does seems like a problem for a different PR.

Comment on lines +404 to +406
Other ->
?LOG_UNEXPECTED_MSG(Other),
wait_message(Node, Ref, Mon, Timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the logic here by dropping unknown(stale?) message. See a similar comment for get_shard. It might be good to do that in a separate PR.

@@ -35,6 +35,7 @@ start(Procs) ->
%% messages from our mailbox.
-spec stop(pid()) -> ok.
stop(MonitoringPid) ->
unlink(MonitoringPid),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder why we have to do this? Was there a crash that happened with stats tracking? We're exiting the monitor pid anyway. If this is not related to the stats tracking might be a good one for a separate PR.

Comment on lines 173 to 174
after
couch_stats_resource_tracker:destroy_context()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're handling all the exit cases do we need an extra after clause?

@@ -60,6 +62,16 @@ process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->

process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
receive
Msg ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the rexi message receive pattern by accepting all messages, while previously we left unknown ones in the mailbox. We may crash in case Payload if we get an unknown message, while previously we left it in the mailbox.

It might be better if we explicitly receive only the messages we expect and not all of them. That was actually the impetus for {rexi, ...} message patterns. There we bugs related to rexi utils receiving 2 items tuples and ignoring them. That effort only went half-way by adding clauses to receive here but nowhere else. Perhaps we can resurrect it and enhance those {rexi, ... } message to take an extra map with options where delta or other context can do into?

Comment on lines +112 to +118
add_delta({A}, Delta) -> {A, Delta};
add_delta({A, B}, Delta) -> {A, B, Delta};
add_delta({A, B, C}, Delta) -> {A, B, C, Delta};
add_delta({A, B, C, D}, Delta) -> {A, B, C, D, Delta};
add_delta({A, B, C, D, E}, Delta) -> {A, B, C, D, E, Delta};
add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta};
add_delta({A, B, C, D, E, F, G}, Delta) -> {A, B, C, D, E, F, G, Delta};
Copy link
Contributor

@nickva nickva Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first sight this looks a bit awkward. Do we have any add_delta({A, B, C, D, E, F}, Delta) -> {A, B, C, D, E, F, Delta}; messages sent to rexi?

It might be better to be explicit about what message rexi will actually get. We may have to do a two-stage PR: One to prep the receivers to accept both old and new patterns, then another one to start sending the new messages.



select_by_type(coordinators) ->
ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {coordinator, _, _}} = R) -> R end));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be hard to maintain. Because we might forget to update the tuple shape when we change it in other place. I suggest to define a record. The data representation would be the same, However record syntax is easier to read and maintain.

-record(worker, {
    module :: atom()  | '_',
    function :: atom()  | '_'
}).

-record(coordinator, {
    module :: atom()  | '_',
    function :: atom()  | '_'
}).

select_by_type(coordinators) ->
    ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = #coordinator{}} = R) -> R end));
select_by_type(workers) ->
    ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = #coordinator{}} = R) -> R end)).

OR

-record(worker, {
    module :: atom()  | '_',
    function :: atom()  | '_'
}).

-record(coordinator, {
    module :: atom()  | '_',
    function :: atom()  | '_'
}).

coordinators() -> 
    ets:fun2ms(fun(#rctx{type = #coordinator{}} = R) -> R end).

workers() -> 
    ets:fun2ms(fun(#rctx{type = #worker{}} = R) -> R end).

select_by_type(coordinators) ->
    ets:select(?MODULE, coordinators());
select_by_type(workers) ->
    ets:select(?MODULE, workers()).

create_worker_context(From, {M,F,_A} = MFA, Nonce) ->
case is_enabled() of
true ->
create_context(MFA, {worker, M, F}, null, From, Nonce);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If worker record would be defined (as suggested in another comment) this can be written.

create_context(MFA, #worker{module = M, function = F}, null, From, Nonce);

This is more verbose. One option to make it less verbose is to define constructor.

worker({M, F, _A}) ->  
    #worker{module = M, function = F}.

create_worker_context(From, {M,F,_A} = MFA, Nonce) ->
     ....
           create_context(MFA, worker(MFA), null, From, Nonce);
     ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback in this comment and the above comment around filtering by types. I was curious how ets would handle nested record types but everything works fine, so I've reworked things to use defined record types in 4c091bc and it cleaned things up nicely.

The only awkward bit there is that it complicates finding all workers and the coordinator in a single query, as I'm not sure match specifications will allow you to match against two different records. I've still got the nonce value in the base #rctx{} record so all related (CSRT tracked) processes to a given http request can be found by way of matching on #rctx{nonce=Nonce}, and then if you know the PidRef or a coordinator, you can find all related workers by way of filtering with: #rctx{type=#rpc_worker{from=PidRef}}.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion was based on the previous code where shape of worker and coordinator was the same (a tuple with three elements). You invalidated that assumption by adding extra fields into each records. I didn't thought about this possibility.

Technically you can match on two different shapes using something like

ets:fun2ms(fun
      (#rctx{type = #coordinator{}} = R) -> R;
      (#rctx{type = #rpc_worker{}} = R) -> R
end).

However this would not be efficient. Since it is translated into this spec:

2> ets:fun2ms(fun
2>       (#rctx{type = #coordinator{}} = R) -> R;
2>       (#rctx{type = #rpc_worker{}} = R) -> R
2>     end).
[{#rctx{started_at = '_',updated_at = '_',exited_at = '_',
        pid_ref = '_',mon_ref = '_',mfa = '_',nonce = '_',
        from = '_',
        type = #coordinator{mod = '_',func = '_',method = '_',
                            path = '_'},
        state = '_',dbname = '_',username = '_',db_open = '_',
        docs_read = '_',rows_read = '_',btree_folds = '_',
        changes_processed = '_',changes_returned = '_',
        ioq_calls = '_',io_bytes_read = '_',io_bytes_written = '_',
        js_evals = '_',js_filter = '_',js_filter_error = '_',
        js_filtered_docs = '_',mango_eval_match = '_',...},
  [],
  ['$_']},
 {#rctx{started_at = '_',updated_at = '_',exited_at = '_',
        pid_ref = '_',mon_ref = '_',mfa = '_',nonce = '_',
        from = '_',
        type = #rpc_worker{mod = '_',func = '_',from = '_'},
        state = '_',dbname = '_',username = '_',db_open = '_',
        docs_read = '_',rows_read = '_',btree_folds = '_',
        changes_processed = '_',changes_returned = '_',
        ioq_calls = '_',io_bytes_read = '_',io_bytes_written = '_',
        js_evals = '_',js_filter = '_',js_filter_error = '_',
        js_filtered_docs = '_',...},
  [],
  ['$_']}]

However in your specific case the performance hit can be acceptable. Since you are matching on nonce first. I think matching process of a tuple stops on first non matching element. So you wouldn't reach the record check for cases where Nonce is different from the value you've provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous code where shape of worker and coordinator was the same (a tuple with three elements). You invalidated that assumption by adding extra fields into each records.

Yeah, it kind of fell out naturally from switching the types to records, for instance, path is clearly only relevant for the http coordinator, but we stored that for all #rctx{} entries, and similarly, the RPC side has a from context tracking the coordinator.

I think there's a balance with keeping common things like nonce, dbname, username, etc in the top level record for easy global querying, while storing context specific information in the type records. For example, this will make it easy to add additional worker types for indexing, compaction, replication, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's nice to know that ets:fun2ms can handle multiple function heads, I wasn't sure as it's a "pseudo function".

is_logging_enabled() ->
logging_enabled() =/= false.

logging_enabled() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause confusion. The _enabled suffix reads like a boolean, however this function returns three different values. I am already confused. When I reviewed the function above this one it looked fine, however now I need to go back and think more

is_logging_enabled() ->
    logging_enabled() =/= false.

What is the intent ^^^? Do we want to enable logging regardless of it being coordinator or true?

In such cases named variants are better.

tracker_subject() ->
   case conf_get("log_pid_usage_report", "coordinator") of
        "coordinator" ->
            coordinator;
        "true" ->
            all;
         _ ->
            no_tracking
    end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intent ^^^? Do we want to enable logging regardless of it being coordinator or true?

The intent here is two fold:

First, the basic intent is to provide a mechanism to selectively enable the report logging, as enabling full logging for all induced RPC workers is useful information but can be sizable given the current verbose output format, so I think we need to have some approaches for selectively enabling desired output. I don't think my current approach here is sufficient, and you can see in should_log I've got some similar logic around selectively enabling logging for particular fabric_rpc worker types, but both of these approaches are one dimensional and leave much to be desired. Feedback welcome on approaches here, but I'm currently contemplating an ets:fun2ms based conditional logger, allowing us to utilize match specs as a mechanism for declaring logging filters. This would make it easy to have some default logging filters declared, yet allow for custom filters to easily be declared and registered in a performant manner (although local.ini based definitions might be tricky). Then when the final record is loaded out of ets, we can match it directly against the match funs with https://www.erlang.org/docs/23/man/ets#match_spec_run-2 and use the logic for querying and log filtering.

Secondly, the idea is to short circuit out of logging as soon as possible to avoid a) loading the record and b) the additional logic checks and ets lookups around worker specific filtering (eg enabling logging for only {fabric_rpc, map_doc} workers on a specific node). That's why the logic here is a somewhat convoluted, there's a bit of a chicken and egg situation of determining whether or not to log without loading the record from ets to, for instance, a) check if it's a worker type, and b) that worker is of type {worker, fabric_rpc, map_doc}.

Again, this type of filtering again would be nicely expressed in terms of match funs, but pragmatically speaking we could do the first level filtering by way of a local.ini section [couch_stats_resource_tracker.should_log] and entries like fabric_rpc.map_doc = true to construct a map in a gen_server of workers to log. This would be useful, but as earlier is still fairly one dimensional in that it makes it complicated to express complex filters, like reduce workers on a particular database, let alone more complex queries like "find all rpc workers against $shard for $ddoc with docs loaded greater than 50 and IOQ calls greater than 700".

The ets:match_spec_run occurred to me while writing this up, so I'll explore that a bit more, but feedback welcome here too, as I agree the logging enablement and filtering here is not clean or sufficient.

ok
end.

is_logging_enabled() ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment here that is_logging_enabled() suppose to be true for both true and coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I hacked together a csrt_logger module in 1702115 that provides two extended mechanisms for dynamically enabling logging, in both a simpler and more complex manner. I just hacked this together, there's no compiler warnings but I haven't actually run it yet or anything wild like that.

The first dynamic logging approach utilizes @iilyak's idea for dedicated type records, initially focusing on #worker{} type here 1702115#diff-ae4228aa19a3865ec48888021914ba2db0cf64dca17138497c8db885222f4699R77-R90 eg #rpc_worker{mod=fabric_rpc, func=map_view}. We then translate that to a key string like worker.fabric_rpc.map_view so we can then do config:get_boolean("csrt.should_log", Key, true) to have a direct toggle switch for the core default rpc operations we want to log, and we could use a similar mechanism to dynamically enable logging for additional rpc endpoints we don't enable by default, eg looking at dreyfus_rpc or mem3_rpc.

We generate that mapping in a gen_server so that we can easily do direct atom comparisons on module/function names rather than have to repeatedly do string conversion and comparisons, the strings are only changed at configuration initialization/update time and not used in the should log checks. We can then check to see if that module worker type is tracked directly 1702115#diff-ae4228aa19a3865ec48888021914ba2db0cf64dca17138497c8db885222f4699R220-R221

The next mechanism and core idea with this experimental hack in the commit is to create a way to dynamically register match spec funs to evaluate against the stream of reports. The idea here is to ets:match_spec_compile to generate a compiled match_spec we can store a reference to in the gen_server, then we can dynamically enable filtered logging by way of declaring and registering match_specs. For example, here's our matcher registration handling 1702115#diff-ae4228aa19a3865ec48888021914ba2db0cf64dca17138497c8db885222f4699R171-R174 along with the compilation 1702115#diff-ae4228aa19a3865ec48888021914ba2db0cf64dca17138497c8db885222f4699R209-R218 and then we can determine if we should logic the particular #rctx{} by way of checking if it matches any of the registered filteres here 1702115#diff-ae4228aa19a3865ec48888021914ba2db0cf64dca17138497c8db885222f4699R133-R142

For example, here's a function to dynamically generate a matchspec parameterized by username:

(node1@127.0.0.1)4> F = fun(User) -> ets:fun2ms(fun(#rctx{username=User0}=R) when User =:= User0 -> R end) end.
#Fun<erl_eval.42.3316493>
(node1@127.0.0.1)5> F(bar).
[{#rctx{started_at = '_',updated_at = '_',pid_ref = '_',
        mfa = '_',nonce = '_',from = '_',type = '_',dbname = '_',
        username = '$1',path = '_',db_open = '_',docs_read = '_',
        rows_read = '_',changes_processed = '_',
        changes_returned = '_',ioq_calls = '_',io_bytes_read = '_',
        io_bytes_written = '_',js_evals = '_',js_filter = '_',
        js_filtered_docs = '_',mango_eval_match = '_',
        get_kv_node = '_',get_kp_node = '_',write_kv_node = '_',
        write_kp_node = '_'},
  [{'=:=',{const,bar},'$1'}],
  ['$_']}]

Then we can compile that match spec and store the reference in the gen_server, and check to see if any of the compiled matchers match the given #rctx{} record. This opens up considerable options for doing dynamic filtered logging of a subset of the csrt report stream, although if we adopt a model like this I think this would be great to extend out to the main logging systems as well, so for instance we could dynamically enable debug logs for a particular database shard. I think it might be trickier to dynamically define relations, eg username=Foo and docs_opened>100, but it is easy to predefine a series of those types of functions for common use cases like filter on user and doc magnitude, or user and db shard plus magnitude, or whatever. Perhaps there's some other clever tricks we can do here too, it's a little tricky with having to define the match record within the macro expansion call, but as I demonstrated above we can dynamically generate parameterized match specs that are then compiled and shareable, so I think there's a lot of potential with this current approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to ets:match_spec_compile to generate a compiled match_spec we can store a reference to in the gen_server, then we can dynamically enable filtered logging by way of declaring and registering match_specs.

A combination of ets:match_spec_compile and ets:match_spec_run is a very neat idea. Did you consider https://www.erlang.org/doc/apps/erts/persistent_term.html instead of gen_server? I wish persistent_term had persistent_term:from_map/1.

Copy link
Contributor

@iilyak iilyak Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked briefly into bif implementation of persistent_term. It is nice:

  • hash map internally
  • the term sharing is preserved for complex containers (tuples, lists)
  • the literals like numbers and strings are copied
  • only one allocation during get (to create reference for GC)

My conclusion is - it should be way faster than ets or gen_server based approach. When updates are rare. Updates causing locking of a data structure so all concurrent reads are on hold.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhhh that's an interesting idea with using persistent_term, I need to remember that's a thing now.

The purpose of the gen_server was two fold: 1) to have a way of registering and preserving the filters; 2) to have a mechanism for getting the registered filters applied to the reports as the the CSRT contexts are cleaned up.

There's various ways to do 1), but for 2) my experiment above used a gen_server so that it could store the filters and then have the logs funneled through the gen_server, which likely should use sharded gen_servers like we've done elsewhere.

This isn't strictly necessary though, as the current PR approach uses the dedicated monitor process to generate the report https://github.com/apache/couchdb/blob/couch-stats-resource-tracker-v2/src/couch_stats/src/couch_stats_resource_tracker.erl#L734 and avoids needing dedicated servers to filter the operations. The tricky bit was about how to get the registered filters down to the monitor processes so they could determine locally whether to log the report or not.

The use of persistent terms is an interesting approach for declaring the filter matchers, especially since the output ets:match_spec_compile is a reference to an internal representation, so fetching the set of references across the various workers and coordinators to then apply locally seems worth exploring more, especially since it allows us to stick with the decentralized monitor processes rather than creating new high throughput gen_server's.

We still have the aspect of discovery, I'm curious what you had in mind on this front @iilyak, but from the persistent term API unless we use some other mechanism for dynamically declaring the set of enabled filters, we'll have to do something with an expected naming scheme and iterate over the entries. Maybe something like

register_filter(Name, CompMatchSpec) when is_binary(Name0) ->
    Name = <<"csrt-logging-filter::", Name0/binary>>,
    persistent_term:put(Name, CompMatchSpec).

get_active_filters() ->
    lists:filter(fun (<<"csrt-logging-filter::", _/Binary>>) -> true; (_) -> false end, persistent_term:get()).

Basically using https://www.erlang.org/doc/apps/erts/persistent_term.html#get/0 to iterate over all declared persistent terms and then looking for an expected prefix naming scheme. Did you have something else in mind? I suppose we could also have a gen_server for handling the state and declarations, but there still needs to be some mechanism for publishing what is active. That might also be useful if we want to have the ability to register TTL filtering, but I'm not convinced that's worth complicating the design over. I'm curious what thoughts you had on this front, how would a given CSRT monitor process query the registered filters?

Hmmm... I suppose we could make it even simpler and just do persistent_term:put(all_the_filters, #{...} and then just store and fetch a map of those entries. That might be a decent idea, we need to iterate through all the filters so we might as well grab them in one query. That also avoids having to do the dynamic naming sequences. Then our earlier functions become:

register_filter(Name, CompMatchSpec) ->
    Filters = persistent_term:get(all_the_filters, #{}),
    persistent_term:put(all_the_filters, maps:put(Name, CompMatchSpec, Filters)).

get_active_filters() ->
    persistent_term:get(all_the_filters, #{}).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the second approach (persistent_term:put(all_the_filters, maps:put(....)) would be better. Since as you've mentioned we would need to retrieve all the filters anyway. My only suggestion would be to follow the recommendation from the docs and name the key as {?MODULE, all_the_filters}.

@@ -222,7 +222,11 @@ stream_ack(Client) ->
%%
ping() ->
{Caller, _} = get(rexi_from),
erlang:send(Caller, {rexi, '$rexi_ping'}).
%% It is essential ping/0 includes deltas as otherwise long running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment.

@@ -53,6 +53,8 @@
-define(INTERACTIVE_EDIT, interactive_edit).
-define(REPLICATED_CHANGES, replicated_changes).

-define(LOG_UNEXPECTED_MSG(Msg), couch_log:warning("[~p:~p:~p/~p]{~p[~p]} Unexpected message: ~w", [?MODULE, ?LINE, ?FUNCTION_NAME, ?FUNCTION_ARITY, self(), element(2, process_info(self(), message_queue_len)), Msg])).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised linter doesn't complain about such long line.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants