Neli
Not-only Exclusive Leader Induction in Highly Available Systems
Install / Use
/learn @obsidiandynamics/NeliREADME
Not-only Exclusive Leader Induction in Highly Available Distributed Systems
Published in https://github.com/obsidiandynamics/NELI under a BSD (3-clause) license.
Abstract
The Not-only Exclusive Leader Induction (NELI) protocol provides a mapping from a set of work roles to zero or more assignees responsible for fulfilling each role. The term 'not-only' in relation to the exclusivity property implies the ability to vary the degree of overlap between successive leader assignments. Overlap is permitted under non-exclusive mode, and forbidden otherwise. NELI can be employed in scenarios requiring exclusive leaders — where it is essential that at most one leader exists at any given time. It can equally be used where a finite number of leaders may safely coexist; for example, where it is imperative to maintain elevated levels of availability at the expense of occasional work duplication, and where the duplication of work, while generally undesirable, does not lead to an incorrect outcome.
NELI is a simple, unimodal protocol that is relatively straightforward to implement, building on a shared ledger service that is capable of atomic partition assignment, such as Kafka, Pulsar or Kinesis. This makes NELI particularly attractive for use in Cloud-based computing environments, where the middleware used by applications for routine operation is also employed in a secondary role for deriving leader state.
Overview
This text describes the Not-only Exclusive Leader Induction (NELI) protocol built on top of a shared, partitioned ledger capable of atomic partition balancing (such as Apache Kafka, Apache Pulsar or Amazon Kinesis). The protocol yields a leader in a group of contending processes for one of a number of notional roles, such that each role has a finite number of leaders assigned to it. The number of roles is dynamic, as is the number of contending processes.
In non-exclusive mode, this protocol is useful in scenarios where —
- There are several that need fulfilling, and it's desirable to share this load among several processes (likely deployed on different hosts);
- While it is undesirable that a role is simultaneously filled by two processes at any point in time, the system will continue to function correctly. In other words, this may create duplication of work but cannot cause harm (the safety property);
- Availability of the system is imperative; it is required that at least one leader is assigned to a role at all times so that the system as a whole remains operational and progress is made on every role (the liveness property);
- The number of processes and roles is fully dynamic, and may vary on an ad hoc basis. Processes and roles may be added and removed without reconfiguration of the group or downtime. This accommodates rolling deployments, auto-scaling groups, and so forth;
- The use of a dedicated Group Membership Service (GMS) is, for whatever reason, deemed intractable, and where an alternate primitive is sought. Perhaps the system is deployed in an environment where a robust GMS is not natively available, but other capabilities that may internally utilise a GMS may exist. Kinesis in AWS is one such example.
In exclusive mode, NELI is used like any conventional leader election protocol, where there is exactly one role that requires fulfilling, and it is imperative that no more than one process assumes this role at any given time.
Note: The term induction is used in favour of election to convey the notion of partial autonomy in the decision-making process. Under NELI, leaders aren't chosen directly, but induced through other phenomena that are observable by the affected group members, allowing them to independently infer that new leadership is in force. The protocol is also eventually consistent, in that, although members of the group may possess different views at discrete moments in time, these views will invariably converge. This is contrary to a conventional GMS, where leadership election is the direct responsibility of the GMS, and is directly imparted upon the affected parties through view changes or replication, akin to the mechanisms employed in Zab [3] and Raft [4], and their likes.
Protocol definition
A centrally-arbitrated topic C is established with a set of partitions M. (Assuming C is realised by a broker capable of atomic partition assignment, such as Kafka, Pulsar or Kinesis.) A set of discrete roles R is available for assignment, and a set of processes P contend for assignment of roles in R. The number of elements in the set M may vary from that of R which, in turn, may vary from P.
Each process in P continually publishes a message on all partitions in M (each successive message is broadcast a few seconds apart). The message has no key or value; the producing process explicitly specifies the partition number for each published message. As each process in P publishes a message to M, then each partition in the set M is continually subjected to messages from each process. Corollary to this, for as long as at least one process in P remains operational, there will be at least one message continually published in each partition in M. Crucial to the protocol is that no partition may 'dry up'.
Each process p in P subscribes to C within a common, predefined consumer group. As per the broker's partition assignment rules, a partition will be assigned to at most one consumer. Multiple partitions may be assigned to a single consumer, and this number may vary slightly from consumer to consumer. Note — this is a fundamental assumption of NELI, requiring a broker that is capable of arbitrating partition assignments. This dynamic set P fits the broad definition of dynamic membership as described in Birman [1]. The term NELI group is used to refer to a set P operating under a distinct consumer group. (An alternate consumer group for P implies a completely different NELI group, as partition assignment within the broker is distinctly bound to a consumer group.)
The relationship between P and M is depicted in Figure 1.
<img src="https://cdn.jsdelivr.net/gh/obsidiandynamics/neli@master/figures/Figure%201.png" width="70%" alt="Figure 1"/>Figure 1: Partition mapping from M to P
Each process p in P, now being a consumer of C, will maintain a vector V of size identical to that of M, with each vector element corresponding to a partition in M, and initialised to zero. V is sized during initialisation of p, by querying the brokers of M to determine the number of partitions in M, which will remain a constant. (As opposed to elements in P and R which may vary dynamically.) This implies that M may not be expanded while a group is in operation.
Upon receipt of a message m from C, p will assign the current machine time as observed by p to the vector element at the index corresponding to m's partition index.
Assuming no subsequent partition reassignments have occurred, each p's vector comprises a combination of zero and non-zero values, where zero values denote partitions that haven't been assigned to p, and non-zero values correspond to partitions that have been assigned to p at least once in the lifetime of p. If the timestamp at any of vector element i is current — in other words, it is more recent than some predefined constant threshold T that lags the current time — then p is a leader for M<sub>i</sub>. If partition assignment for M<sub>i</sub> is altered (for example, if p is partitioned from the brokers of M, or a timeout occurs), then V<sub>i</sub> will cease incrementing and will eventually be lapsed by T. At this point, p must no longer assume that it is the leader for M<sub>i</sub>.
Ownership of M<sub>i</sub> still requires a translation to a role assignment, as the number of roles in R may vary from the number of partitions in M, and in fact, may do so dynamically without prior notice. To determine whether p is a leader for role R<sub>j</sub>, p will compute k = j mod size(M) and check whether p is a leader for M<sub>k</sub> through inspection of its local V<sub>k</sub> value.
Where size(M) > size(R), ownership of a higher numbered partition in M does not necessarily correspond to a role in R — the mapping from R to M is injective. If size(M) < size(R), ownership of a partition in M corresponds to (potentially) multiple roles in R, i.e. R → M is surjective. And finally, if size(M) = size(R), the relationship is purely bijective. Hence the use of the modulo operation to remap the dynamic extent of R for alignment with M, guaranteeing totality of R → M.
Note: Without the modulo reduction, R → M will be partial when size(M) < size(R), resulting in an indefinitely dormant role and violating the liveness property of the protocol.
Under non-exclusive mode, the value of T is chosen such that T is greater than the partition reassignment threshold of the broker. In Kafka, this is given by the property session.timeout.ms on the consumer, which is 10 seconds by default — so T could be 30 seconds, allowing for up 20 seconds of overlap between successive leadership transitions. In other words, if the partition assignment is withdrawn from an existing leader, it may presume for a further 20 seconds that it is still leading, allowing for the emerging leader to take over. In that time frame, one or more roles may be fulfilled concurrently by both leaders — which is acceptable a priori. (In practice, the default 10-second session timeout may be too long for some HA systems — smaller values may be more appropriate.)
There is no hard relationship between the sizing of M, R and P
