Skip to content

Commit

Permalink
Ready for relese 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed Apr 20, 2020
1 parent b338537 commit 530475a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Not-only Exclusive Leader Induction in Highly Available Distributed Systems
===
![Release](https://img.shields.io/github/v/release/obsidiandynamics/neli?color=white)

Published in [https://github.com/obsidiandynamics/NELI](https://github.com/obsidiandynamics/NELI) under a BSD (3-clause) license.

Expand All @@ -14,8 +15,8 @@ This text describes the Not-only Exclusive Leader Induction (NELI) protocol buil

In *non-exclusive mode*, this protocol is useful in scenarios where —

* There are a number of roles that need fulfilling, and it's desirable to share this load among several processes (likely deployed on different hosts);
* While it's undesirable that a role is simultaneously filled by two processes at any point in time, the system will continue to function correctly. In other words, this may create duplication of work but cannot cause harm (the _safety_ property);
* There are several that need fulfilling, and it's desirable to share this load among several processes (likely deployed on different hosts);
* While it is undesirable that a role is simultaneously filled by two processes at any point in time, the system will continue to function correctly. In other words, this may create duplication of work but cannot cause harm (the _safety_ property);
* Availability of the system is imperative; it is required that at least one leader is assigned to a role at all times so that the system as a whole remains operational and progress is made on every role (the _liveness_ property);
* The number of processes and roles is fully dynamic, and may vary on an _ad hoc_ basis. Processes and roles may be added and removed without reconfiguration of the group or downtime. This accommodates rolling deployments, auto-scaling groups, and so forth;
* The use of a dedicated Group Membership Service (GMS) is, for whatever reason, deemed intractable, and where an alternate primitive is sought. Perhaps the system is deployed in an environment where a robust GMS is not natively available, but other capabilities that may internally utilise a GMS may exist. Kinesis in AWS is one such example.
Expand All @@ -30,7 +31,7 @@ A centrally-arbitrated topic _C_ is established with a set of partitions _M_. (A

Each process in _P_ continually publishes a message on all partitions in _M_ (each successive message is broadcast a few seconds apart). The message has no key or value; the producing process explicitly specifies the partition number for each published message. As each process in _P_ publishes a message to _M_, then each partition in the set _M_ is continually subjected to messages from each process. Corollary to this, for as long as at least one process in _P_ remains operational, there will be at least one message continually published in each partition in _M_. Crucial to the protocol is that no partition may 'dry up'.

Each process _p_ in _P_ subscribes to _C_ within a common, predefined consumer group. As per the broker's partition assignment rules, a partition will be assigned to at most one consumer. Multiple partitions may be assigned to a single consumer, and this number may vary slightly from consumer to consumer. Note — this is a fundamental assumption of NELI, requiring a broker that is capable of arbitrating partition assignments. This dynamic set _P_ fits the broad definition of dynamic membership as described in Birman [1]. The term _NELI group_ is used to refer to a set _P_ operating under a distinct consumer group. (An alternate consumer group for _P_ implies a completely different NELI group, as partition assignment within the broker is distinct to a consumer group.)
Each process _p_ in _P_ subscribes to _C_ within a common, predefined consumer group. As per the broker's partition assignment rules, a partition will be assigned to at most one consumer. Multiple partitions may be assigned to a single consumer, and this number may vary slightly from consumer to consumer. Note — this is a fundamental assumption of NELI, requiring a broker that is capable of arbitrating partition assignments. This dynamic set _P_ fits the broad definition of dynamic membership as described in Birman [1]. The term _NELI group_ is used to refer to a set _P_ operating under a distinct consumer group. (An alternate consumer group for _P_ implies a completely different NELI group, as partition assignment within the broker is distinctly bound to a consumer group.)

The relationship between _P_ and _M_ is depicted in Figure 1.

Expand All @@ -56,9 +57,9 @@ There is no hard relationship between the sizing of _M_, _R_ and _P_; however, t

* _R_ should be at least one in size, as otherwise there are no assignable roles.
* _M_ should not be excessively larger than _R_, so as to avoid processes that have no actual _role_ assignments despite owning one or more partitions (for high numbered partitions). When using Kafka, this avoids the problem when `partition.assignment.strategy` is set to `range`, which happens to be the default. To that point, it is strongly recommended that the `partition.assignment.strategy` property on the broker is set to `roundrobin` or `sticky`, so as to avoid injective _R_ → _M_ mappings that are asymmetric and poorly distributed among the processes.
* _M_ should be sized approximately equal to the steady state (anticipated) size of _P_, notwithstanding the fact that _P_ is determined dynamically, through the occasional addition and removal of deployed processes. When the size of _M_ approaches the size of _P_, the assignment load is shared evenly among the constituents of _P_.
* _M_ should be sized approximately equal to the steady-state (anticipated) size of _P_, notwithstanding the fact that _P_ is determined dynamically, through the occasional addition and removal of deployed processes. When the size of _M_ approaches the size of _P_, the assignment load is shared evenly among the constituents of _P_.

It is also recommended that the `session.timeout.ms` property on the consumer is set to a very low value, such as `100` for rapid consumer failure detection and sub-second rebalancing. This requires setting of `group.min.session.timeout.ms` on the broker to `100` or lower, as the default value is `6000`. The `heartbeat.interval.ms` property on the consumer should be set to sufficiently small value, such as `10`.
It is also recommended that the `session.timeout.ms` property on the consumer is set to a very low value, such as `100` for rapid consumer failure detection and sub-second rebalancing. This requires setting `group.min.session.timeout.ms` on the broker to `100` or lower, as the default value is `6000`. The `heartbeat.interval.ms` property on the consumer should be set to a sufficiently small value, such as `10`.

## Exclusivity of role assignment
It can also be shown that the non-exclusivity property can be turned into one of exclusivity through a straightforward adjustment of the protocol. In other words, the at-least-one leader assignment can be turned into an at-most-one. This would be done in systems where non-exclusivity cannot be tolerated.
Expand All @@ -83,7 +84,7 @@ Another, more subtle problem stems from the invariant that the check for leaders


# Variations
## NELI 'Lite'
## Fast NELI
Conventional NELI derives a leader state through observing phenomena that appears distinctly for each process. This implies constant publishing of messages and registering their receipt, to infer the present state. This algorithm is sufficiently generic to work with any streaming platform that supports partition exclusivity. In theory, it can be adapted to any broker that has some notion of observable exclusivity; for example, SQS FIFO queues, RabbitMQ, Redis Streams, AMQP-based products supporting *Exclusive Consumers* and NATS.

Certain streaming platforms, such as Kafka, support rebalance notifications that expressly communicate the state of partition assignments to the consumers. (Support is subject to the client library implementation.)
Expand All @@ -94,7 +95,9 @@ The rebalance callback straightforwardly induces leadership through partition as

In addition to observing partition assignment changes, the partition owner periodically publishes a heartbeat message to the monitored topic. It also consumes messages from that topic — effectively observing its own heartbeats, and thereby asserting that it is connected to the cluster *and* still owns the partition in question. If no heartbeat is received within a set deadline, the leader will take the worst-case assumption that the partition will be imminently reassigned, and will proactively relinquish leadership. (The deadline is chosen to be a fraction of `session.timeout.ms`, typically a third or less — giving the outgoing leader sufficient time to fence itself.) If connectivity is later resumed while the leader is still the owner of the partition on the broker, it will again receive a heartbeat, allowing the client to resume the leader role. If the partition has been subsequently reassigned, no heartbeat will be received upon reconnection and the client will be forced to rejoin the group — the act of which will invoke the rebalance callback, effectively resetting the client.

The difference between this variation of the protocol and the 'full-fat' version is that the latter requires all members to continuously publish messages, thereby ensuing continuous presence of heartbeats. By comparison, the 'lite' version only requires the owner of the partition to emit heartbeats, whereby the ownership status is communicated via the callback.
One notable difference between this variation of the protocol and the full version is that the latter requires all members to continuously publish messages, thereby ensuing continuous presence of heartbeats. By comparison, the 'fast' version only requires the assigned owner of the partition to emit heartbeats, whereby the ownership status is communicated via the callback.

The main difference, however, is in the transition of leader status during a routine partition rebalancing event, where there is no failure _per se_, but the partitions are reassigned as a result of a group membership change. Under exclusive mode, the full version of the protocol requires that the new leader allows for a grace period before assuming the leader role, thereby according the outgoing leader an opportunity to complete its work. The reduced form relies on the blocking nature of the rebalance callback, effectively acting as a synchronization barrier — whereby the rebalance operation is halted on the broker until the outgoing leader confirms that it has completed any in-flight work. The fast version is called as such because it is more responsive during routine rebalancing. It also requires fewer configuration parameters.


# Further considerations
Expand Down

0 comments on commit 530475a

Please sign in to comment.