Skip to content
forked from fishcakez/sbroker

Sojourn-time based active queue management library

License

Notifications You must be signed in to change notification settings

miniclip/sbroker

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

sbroker GitHub Actions CI

sbroker is a library that provides the building blocks for creating a pool and/or a load regulator. The main goals of the library are to minimise upper percentile latency by smart queuing, easily change the feature set live with minimal changes, easily inspect a live system and provide confidence with property based testing.

Example

Add a broker to the sbroker application env brokers and it will be started when the application starts. Below we use a CoDel queue for the ask side, a timeout queue for the ask_r side and no meters. Processes then call sbroker:ask/1 and sbroker:ask_r to find a match. A process calling sbroker:ask/1 will only match with a process that calls sbroker:ask_r and vice versa.

ok = application:load(sbroker),
Broker = broker,
Brokers = [{{local, Broker},
            {{sbroker_codel_queue, #{}}, {sbroker_timeout_queue, #{}}, []}}],
ok = application:set_env(sbroker, brokers, Brokers),
{ok, _} = application:ensure_all_started(sbroker).

Pid = spawn_link(fun() -> {go, Ref, _, _, _} = sbroker:ask_r(Broker) end),
{go, Ref, Pid, _, _} = sbroker:ask(Broker).

Matches can also be requested without queuing, asynchronously or using a dynamic approach that is synchronous but becomes asynchronous if a match isn't immediately available.

Requirements

The minimum OTP version supported is 18.0.

The sasl application is required to start the sbroker application. The sasl error_logger handler can be disabled by setting the sasl application env sasl_error_logger to false.

Installing

For rebar3 add sbroker as a depencency in rebar.config:

{deps, [sbroker]}.

Other build tools may work if they support rebar3 dependencies but are not directly supported.

Testing

$ rebar3 ct
...

Documentation

Documentation is hosted on hex: http://hexdocs.pm/sbroker/.

From overview.edoc (previous version)

Sojourn Broker is an alternative to traditional pooling approaches, it focuses on the queues involved. Both clients and workers are explicitly enqueued into the relevant queues and a match occurs once both a client and a worker are queued. Both queues have many configuration options (see "Smart Queues" below) that can reconfigured live as part of a code update or config change in a release upgrade, or from the shell without dropping requests.

There is a simple interface to match processes. One party calls sbroker:ask/1 and the counterparty sbroker:ask_r/1. If a match is found both return {go, Ref, Pid, RelativeTime, SojournTime}. Ref is the transaction reference. Pid is the other process in the match, which can be changed to any term using sbroker:ask/2 and sbroker:ask_r/2. SojournTime is the time spent waiting for a match and RelativeTime is difference between when the counterparty and caller sent their requests. If no match is found, returns {drop, SojournTime}.

SojournTime and RelativeTime are in native time units to allow conversion to the unit of choice without loss of accuracy. However the time may not be read for every request so these values are not as accurate as the precision suggests. The time is read using erlang:monotonic_time/0 and so the minimum OTP version is 18.0. For more information on RelativeTime see "Relative Time and Regulation" below.

Processes calling sbroker:ask/1 are matched with a process calling sbroker:ask_r/1 and vice versa. See sbroker for alternative functions to request a match, such as the asynchronous sbroker:async_ask/1.

=== Usage ===

sbroker provides configurable queues defined by sbroker:handler_spec()s. A handler_spec() takes the form:

{Module, Args}

Module is an sbroker_queue callback module to queue. The following callback modules are provided: sbroker_drop_queue, sbroker_timeout_queue, sbroker_codel_queue and sbroker_fair_queue.

Args is the argument passed to the callback module. Information about the different backends and their arguments are avaliable in the documentation.

An sbroker is started using sbroker:start_link/3,4:

sbroker:start_link(Module, Args, Opts).
sbroker:start_link(Name, Module, Args, Opts).

The sbroker will call Module:init(Args), which should return the specification for the sbroker:

init(_) ->
    {ok, {AskQueueSpec, AskRQueueSpec, [MeterSpec]}}.

AskQueueSpec is the handler_spec() for the queue containing processes calling ask/1. The queue is referred to as the ask queue. Similarly AskRQueueSpec is the handler_spec() for the queue contaning processes calling ask_r/1, and the queue is referred to as the ask_r queue.

MeterSpec is a handler_spec() for a meter running on the broker. Meters are given metric information and are called when the time is updated, see sbroker_meter. The following sbroker_meter modules are provided: sbroker_overload_meter, sbetter_meter, sprotector_pie_meter, sregulator_update_meter and sregulator_underload_meter.

For example:

-module(sbroker_example).

-behaviour(sbroker).

-export([start_link/0]).
-export([init/1]).

start_link() ->
    sbroker:start_link(?MODULE, undefined, []).

init(_) ->
    QueueSpec = {sbroker_timeout_queue, #{timeout => 200}},
    {ok, {QueueSpec, QueueSpec, []}}.

sbroker_example:start_link/0 will start an sbroker with queues configured by QueueSpec. This configuration uses the sbroker_timeout_queue callback module which drops requests when they have been in the queue for longer than a time limit (200 milliseconds). To use this sbroker:

{ok, Broker} = sbroker_example:start_link(),
Self = self(),
Pid = spawn_link(fun() -> {go, _Ref, Self, _, _} = sbroker:ask_r(Broker) end),
{go, _Ref, Pid, _, _} = sbroker:ask(Broker).

=== Error Isolation ===

When a worker process holds a resource, especially sockets, it can be difficult or impossible to provide the guarantee to client processes that the resource will always be available when the worker is called. This can be achieved by creating the resource inside the worker's init/1 callback and terminating as soon as the resource is no longer available. Often this is not practical because the supervision tree would not withstand a resource worker constantly failuring but you want to handle the failure. For example, if a remote server went down.

Therefore the worker must handle the error. Normally this involves backing off and retrying until the resouce is created. Usually this means that the worker must handle requests when the resource isn't available and client must handle the error when the resource isn't available.

If the worker does backoff it is possible that a client gets unlucky gets a worker that doesn't have a resource when other workers are available that have a resource. It is also possible that the client calls the worker when a worker is busy creating the resource and has to wait for the worker to either create the resource or fail/time out and get an error.

Sojourn broker tries to alleviate this issue by requiring the worker to choose when it is enqueued. Therefore only available workers or resources should be in the queue waiting for a client. This isolates the errors and allows the client to wait for a resource to become available. If a resource becomes unavailable while in the queue the worker can remove itself from the queue with sbroker:cancel/3.

A common pattern is as follows:

client() ->
    case sbroker:ask(sbroker_example) of
        {go, Ref, {Resource, Pid}, _, _} ->
            % stuff happens here using Resource, 3rd element set by counterparty
            Pid ! {Ref, done},
            ok;
        {drop, _} ->
            {error, drop}
    end.

init() ->
    % create any resources or backoff and retry, e.g. open gen_tcp socket
    loop(Resource).

loop(Resource) ->
    % use reverse ask call to counterparty
    case sbroker:ask_r(sbroker_example, {Resource, Pid}) of
        {go, GRef, Pid, _, _} ->
            MRef = monitor(process, Pid),
            receive
                {'DOWN', MRef, _, _, _} ->
                    % counterparty crashed, handle error before enqueuing again
                    stop(Resource);
                {GRef, done} ->
                    % counterparty finished with resource
                    demonitor(MRef, [flush]),
                    loop(Resource)
            after
                5000 ->
                    % timeout if don't hear from counterparty
                    demonitor(MRef, [flush]),
                    stop(Resource)
            end
        {drop, _} ->
            % dropped by queue
            stop(Resource)
    end.

stop(Resource) ->
    % clean up any resources
    init().

Usually the looping worker/resource process will be an OTP process, e.g. a gen_server, and uses asynchronous queue requests using sbroker:async_r/2,3.

However if a resource becomes unavailable and workers are backing off then client requests won't match and will wait in the queue until they are dropped. In a simple timeout queue, like a gen_server:call/3, this means waiting for the timeout and then failing. When an end user client is waiting in a queue it can not distinguish between complete unavailability of the resource or a slow/congested queue in front of the resource. Therefore queue congestion avoidance techniques in "Smart Queues" below can be used to avoid the problem. This may mean that the client waits for a very short period before failing, possibly allowing for the resource to become available.

=== Smart Queues ===

The primary goal of Sojourn Broker is to reduce upper tail latency. The first step to achieving this is by only queuing a request when ready to handle a match. Once in the queue the request has to wait for a match, which may not occur in a reasonable time if a resource is unavailable or there are a lot of other requests causing the queue to move slowly.

The simplest approach is to limit the length of the queue and either prevent new requests joining the queue or drop the oldest request in the queue when the maximum is reached. The later can prevent head of the line blocking, which can be especially useful when newer requests overwrite the data from older requests. Unfortunately choosing a suitable limit can be difficult because some queues are short and slow and others are long and fast. This can mean that requests stay in the queue for a long time, or indefinitely if the resource is not available.

The most common technique to avoid this is to add a timeout to the queue, see sbroker_timeout_queue, so requests have an upper bound on the time they can spend waiting for a match. When a timeout queue gets heavily congested the length of the queue in time, SojournTime, can approach this upper bound as all requests either timeout or get a match just before the timeout would occur because the request immediately before it timed out.

A similar issues can occur when setting a queue size and dropping the oldest process at the maximum. Requests are either dropped for reaching the maximum size or get a match because the request immediately before it got dropped. This leads to every request taking approximately the time it takes to fill the queue.

In both cases if requests are going to be dropped it would be better to drop them sooner, rather than later, so successful requests do not have to wait so long. Sojourn Broker provides two modern active queue managent algorithms to drop requests earlier that would be dropped later when using simpler strategies. Both are configured using a Target sojourn time and an Interval, where requests will only be dropped once the queue has been continuously above the target for the initial interval. After this point the active queue management will try to drop a suitable number of requests so that the queue remains near the target.

The Interval value is should usually be in the 95%-99% percentile range of the time it would take the end user client to discover the request had been dropped and get a new request in the queue. The Target would then be 5%-10% of that value for the application to feel responsive to a client.

One approach is to use the sbroker_codel_queue, which uses the Controlled Delay (CoDel) active queue management algorithm. The second approach is to use the sprotector_pie_meter, which uses the basic Proportional Integral controller Enchanced (PIE) algorithm.

The CoDel queue will try to evenly but unpredictable space out drops so that dropped clients do not retry at the same time by dropping the oldest request. During big bursts this can lead to higher SojournTime values if a suitable queue size is not set because the dropping algorithm is trying to spread out the drops. CoDel reduces the interval between drops while the queue is above target. This means if the counterparties are not trying to match, e.g. resources can't be created, then the drop interval will become very small and requests will be dropped frequently.

The PIE meter tries to only allow requests that will find a match reach the broker, and short circuit other requests based on a self tuning probability. If the queue is slow because no counterparties are trying to match then the drop probability will reach 100% quickly and all requests will short circuit. This means the queue management will also act as a circuit breaker if all requests would be dropped/fail. The PIE meter does not combine well with a last in, first out queue because newer requests are dequeued first and can make the queue appear fast.

If a (misbehaving) client or application is sending significantly more requests then it will gain a similar proportion of matches. To avoid this situation sbroker_fair_queue can be used to load balance clients (or workers) on the broker itself. The fair queue creates one queue per application, client or other value and dequeues using a round robin strategy. Queues are created on demand and removed when no longer used.

The CoDel algorithm works better with the fair queue than without because it can differentiate between flows. However the PIE meter does not combine well because it can not differentiate between the different flows and will see heavily fluctuating SojournTime values.

It is also possible to load balance requests between multiple brokers with srand and sscheduler. These choose a random broker or a broker based on scheduler id. One of these can be combined with the sbetter load balancer that tries to even out the random load balancing by occasionally comparing the sojourn times on two random brokers and choosing the broker with the shortest sojourn time. All three of these load balancing modules can be used to load balance any OTP process.

=== Relative Time and Regulation ===

A negative RelativeTime means the counterparty sent their request before the caller and the counterparty waited for the caller. A positive value means the counterparty sent their request after the caller and the caller waited for the counterparty. A value of 0 means the match occured "immediately" for both requests.

Therefore RelativeTime is a measure of the difference in queue speeds if the broker was so fast that it used zero processing time. This value can be used to measure the queue congestion, e.g. more ask than ask_r requests would mean ask request are waiting and get postive RelativeTime on matches and ask_r are getting matched immediately and get negative RelativeTime values.

Unfortunately the broker is not infinitely fast and there is a process delaying. The total processing time while waiting for a match can then be calculated with SojournTime - max(RelativeTime, 0).

If the broker is handling a lot of requests the SojournTime can grow even if RelativeTime is negative, giving the false impression that the queue is congested (though it may also be congested). The contribution of load and queue congestion are indistinguishable and likely irrelevant to an end user client. Therefore the sbroker_queue callback modules use SojournTime when deciding whether to drop requests.

A group of "worker" processes whose role is to serve end user clients as quickly as possible should try to minimise the SojournTime of the clients. The only thing the worker's can do to keep the client's queue fast and uncongested is to keep worker requests in the workers queue so clients match immediately and aren't queued, or are queued for as little as possible. Therefore the goal of the workers is to keep their queue slow so the client queue is fast. The simplest method to minimise client sojourn time is to queue workers as much as possible.

Unfortunately if workers are used then it is safe to assume the workers are holding a resource that is expensive to create and/or maintain - otherwise they are not required. Therefore a second goal of the workers is keep their size minimised and so don't want to queue as many workers as possible on the broker.

A simple approach to this is to maintain a minimal group of workers and to increase their size if a client request arrives when a worker isn't queued. In the most naive method the client is matched with the new worker but then it has to wait for the worker to create its resource. For example, wait for a TCP socket to connect and possibly more handshaking, such as TLS or other authorisation and setup. Alternatively the client waits for the next worker to be ready, which could be the newly created worker.

However with RelativeTime the group of workers can continuously monitor the queue congestion and pre-emptively create a worker before the empty worker queue situation occurs. One approach is to try to maintain a RelativeTime above a certain value by creating a worker every time it is below a certain value. Effectively trying to keep the worker queue slightly slow so the client queue is always fast. This is an extension of the previous approach where a worker was created when the RelativeTime is less than or equal to 0. For example, create a worker when workers send a request less than 100 milliseconds before clients send a request, i.e. RelativeTime < 100 milliseconds. See sregulator and sregulator_relative_valve for more information.

If the SojournTime is used to gauge queue congestion in the worker's queue, the same issue occurs in the workers as the clients: unable to distinguish between load and queue congestion. Therefore a load spike could be misinterpretted as an increase in worker queue congestion, i.e. a decrease in client queue congestion or a fast client queue. The opposite could be true if the increase in load is due to a burst of client requests.

The RelativeTime can fluctate due to short bursts and the new workers may get created but once created are no longer required. To differentiate between a short burst and a longer term change in request rate the minimum RelativeTime over an interval time can be used instead. If the local maximum is below the target create a worker. This can be repeated with decreasing intervals until the no longer need more workers. Decreasing consecutive intervals ensures that long term changes are adapted too without reacting too quickly to an initial burst. See sregulator_codel_valve for more information.

A sregulator is similar to a sbroker, except the regulator acts as the ask_r side and controls matching with ask requests using an sregulator_valve.

-module(sregulator_example).

-behaviour(sregulator).

-export([start_link/0]).
-export([init/1]).

start_link() ->
    sregulator:start_link(?MODULE, undefined, []).

init(_) ->
    QueueSpec = {sbroker_timeout_queue, #{timeout => 200}},
    ValveSpec = {sregulator_relative_valve,
                 #{target => 100, min => 4, max => 16},
    {ok, {QueueSpec, ValveSpec, []}}.

The sregulator_relative_valve will increase the size above the minimum 4 when updated with a relative time below 100 milliseconds up to a maximum size 16.

Then to use the regulator:

{go, Ref, Pid, _, _} = sregulator:ask(sregulator_example).

The sregulator is updated using the sregulator_update_meter in either a sbroker or sregulator, or explicitly using sregulator:update/3 and sregulator:cast/2:

{sregulator_update_meter, [{sregulator_example, ask_r, #{update => 200}}]}

This will meter will update sregulator_example with the RelativeTime of the ask_r queue around every 200 milliseconds.

A common pattern is as follows:

init() ->
    {go, Ref, Pid, _, _} = sregulator:ask(sregulator_example),
    % create any resources after go from sregulator
    loop(Ref, Pid).

loop(Ref, Pid) ->
    % The regulated queue is `ask_r` so sregulator_update_meter uses `ask_r` too
    case sbroker:ask_r(sbroker_example) of
        {go, _, _, _, _} ->
            % stuff happens
            loop(Ref, Pid);
        {drop, _} ->
            % dropped by queue, maybe shrink
            drop(Ref, Pid)
    end.

drop(Ref, Pid) ->
    case sregulator:continue(Pid, Ref) of
        {go, Ref, Pid, _, _} ->
            % continue loop with same Ref as before
            loop(Ref, Pid);
        {stop, _} ->
            % process should stop its loop and Ref is removed from sregulator
            stop()
    end.

stop() ->
    % clean up any resources
    init().

Motivation

The main roles of a pool are: dispatching, back pressure, load shedding, worker supervision and resizing.

Existing pooling solutions assume if a worker is alive it is ready to handle work. If a worker isn't ready a client must wait for it be ready, or error immediately, when another worker might be ready to successfully handle the request. If workers explicitly control when they can are available then the pool can always dispatch to workers that are ready.

Therefore in an ideal situation clients are requesting workers and workers are requesting clients. This is the broker pattern, where both parties are requesting a match with the counter party. For simplicity the same API can be used for both and so to the broker both parties are clients.

Existing pooling solutions that support back pressure use a timeout mechanism where clients are queued for a length of time and then give up. Once clients start timing out, the next client in the queue is likely to have waited close to the time out. This leads to the situation where clients are all queued for approximately the time out, either giving up or getting a worker. If clients that give up could give up sooner then all clients would spend less time waiting but the same number would be served.

Therefore in an ideal situation a target queue time would be chosen that keeps the system feeling responsive and clients would give up at a rate such that in the long term clients spend up to the target time in the queue. This is sojourn (queue waiting) time active queue management. CoDel and PIE are two state of the art active queue management algorithms with a target sojourn time, so should use those with defaults that keep systems feeling responsive to a user.

Existing pooling solutions that support load shedding do not support back pressure. These use ETS as a lock system and choose a worker to try. However other workers might be available but are not tried or busy wait is used to retry multiple times to gain a lock. If clients could use ETS to determine whether a worker is likely to be available we could use existing dispatch and back pressure mechanisms.

Therefore we want to limit access to the dispatching process by implementing a sojourn time active queue management algorithm using ETS in front of the dispatching process. Fortunately this is possible with the basic version of PIE.

Existing pooling solutions either don't support resize or grow the pool when no workers are immediately available. However that worker may need to setup an expensive resource and is unlikely to be ready immediately. If workers are started early then the pool will be less likely to have no workers available.

However the same pools that start workers "too late" also start new workers for every client that tries to checkout when no workers are available. However old workers will become available again, perhaps before new workers are ready. This often leads to too many workers getting started and wastes resources until they are reaped for being idle. If workers are started at intervals then temporary bursts would not start too many workers but persistent increases would still cause adequate growth.

Therefore we want workers to be started when worker availability is running low but with intervals between starting workers. This can be achieved by sampling the worker queue at intervals and starting a worker based on the reading. This is the load regulator pattern, where the concurrency limit of tasks changes based on sampling. For simplicity the same API as the broker could be used, where the regulator is also the counterparty to the workers.

Existing pooling solutions that also support resizing use a temporary a supervisor and keep restarting workers if they crash, equivalent to using max restarts infinity. Unfortunately these pools can't recover from faults due to bad state because the error does not bubble up the supervision tree and trigger restarts. They are "too fault tolerant" because the error does not spread far enough to trigger recovery. A pool where workers crash every time is not useful.

Therefore we want workers to be supervised using supervisors with any configuration so the user can decide exactly how to handle failures. Fortunately using both the broker and regulator patterns allows workers to be started under user defined supervisors.

License

Copyright 2014 James Fish

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Roadmap

  • 1.1 - Add circuit breaker sregulator valves
  • 1.2+ - Add improved queue management algorithms when possible, if at all

About

Sojourn-time based active queue management library

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Erlang 99.8%
  • Makefile 0.2%