Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning #93

Merged
merged 29 commits into from
Aug 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9e710af
Implement partitioning
tylertreat Jul 25, 2019
afa7f49
Change partition subjects
tylertreat Jul 28, 2019
7338faf
Partitioning WIP
tylertreat Aug 16, 2019
935f76c
Rename name to stream
tylertreat Aug 22, 2019
1fb3fd1
Merge branch 'master' of github.com:liftbridge-io/liftbridge into par…
tylertreat Aug 22, 2019
e7945b2
Rename variable name for consistency
tylertreat Aug 22, 2019
0568492
Remove TotalPartitions field from partition
tylertreat Aug 22, 2019
49f823e
Fix tests
tylertreat Aug 23, 2019
77e76b5
Add CreateStream test for partitioned streams
tylertreat Aug 26, 2019
d181277
Reorder logging statement
tylertreat Aug 26, 2019
dab57c4
Fix finishedRecovery count logic
tylertreat Aug 26, 2019
27c85dc
Change raft snapshot restore logging
tylertreat Aug 26, 2019
373a03b
Clean up metadata api
tylertreat Aug 28, 2019
1998e27
Update metadata response
tylertreat Aug 28, 2019
d582a3b
Remove reference to StreamDescriptor
tylertreat Aug 28, 2019
8c11573
Use liftbridge-grpc repo for proto code
tylertreat Aug 30, 2019
4aa3031
Move stream struct to stream.go
tylertreat Aug 30, 2019
f3ae727
Fix CommitLog log messages
tylertreat Aug 30, 2019
2041c01
Update go-liftbridge test dependency
tylertreat Aug 30, 2019
16bf10c
Update README with partitioning info
tylertreat Aug 30, 2019
876c2ef
Add note on partitions
tylertreat Aug 30, 2019
a291ee4
Updated README FAQ
tylertreat Aug 30, 2019
b597028
Updated FAQ
tylertreat Aug 30, 2019
c42336a
Update concepts.md
tylertreat Aug 30, 2019
104b73c
Update streams documentation diagram
tylertreat Aug 30, 2019
6e3e061
Update diagram in documentations
tylertreat Aug 30, 2019
f0decfa
Fix documentation example
tylertreat Aug 30, 2019
60deaa7
Update replication protocol documentation
tylertreat Aug 30, 2019
1cb6740
Fix link in README
tylertreat Aug 31, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ Liftbridge is a server that implements a durable, replicated message log for
which is attached to a NATS subject. The stream then records messages on that
subject to a replicated write-ahead log. Multiple consumers can read back
from the same stream, and multiple streams can be attached to the same
subject. Liftbridge provides a Kafka-like API in front of NATS.
subject. Liftbridge provides a Kafka-like API in front of NATS. See the
Liftbridge [overview](./documentation/overview.md) for more information.

### Why was it created?

Expand Down Expand Up @@ -79,29 +80,37 @@ The key features that differentiate Liftbridge are the shared message namespace,
wildcards, log compaction, and horizontal scalability. NATS Streaming replicates
channels to the entire cluster through a single Raft group. Liftbridge allows
replicating to a subset of the cluster, and each stream is replicated
independently. This allows the cluster to scale horizontally.
independently. This allows the cluster to scale horizontally. NATS Streaming
also does not support channel partitioning, requiring it to be implemented at
the application layer. Liftbridge has built-in support for stream partitioning.

### How does it scale?

Liftbridge scales horizontally by adding more brokers to the cluster and
creating more streams which are distributed among the cluster. In effect, this
splits out message routing from storage and consumption, which allows
Liftbridge to scale independently and eschew subject partitioning.
Alternatively, streams can join a load-balance group, which effectively load
balances a NATS subject among the streams in the group without affecting
delivery to other streams.
Liftbridge has several mechanisms for horizontal scaling of message consumption.
Brokers can be added to the cluster and additional streams can be created which
are then distributed among the cluster. In effect, this splits out message
routing from storage and consumption, which allows Liftbridge to scale
independently and eschew subject partitioning.

Streams can also join a load-balance group, which effectively load balances a
NATS subject among the streams in the group without affecting delivery to
other streams.

Finally, streams can be partitioned, allowing messages to be divided up among
the brokers in a cluster. In fact, all streams are partitioned with the default
case being a single partition.

### What about HA?

High availability is achieved by replicating the streams. When a stream is
created, the client specifies a `replicationFactor`, which determines the
number of brokers to replicate the stream. Each stream has a leader who is
responsible for handling reads and writes. Followers then replicate the log
from the leader. If the leader fails, one of the followers can set up to
replace it. The replication protocol closely resembles that of Kafka, so there
is much more nuance to avoid data consistency problems. See the
[replication protocol documentation](./documentation/replication_protocol.md) for
more details.
number of brokers to replicate the stream's partitions. Each stream partition
has a leader who is responsible for handling reads and writes. Followers then
replicate the log from the leader. If the leader fails, one of the followers
will step up to replace it. The replication protocol closely resembles that of
Kafka, so there is much more nuance to avoid data consistency problems. See the
[replication protocol documentation](./documentation/replication_protocol.md)
for more details.

### What about performance?

Expand Down Expand Up @@ -227,7 +236,7 @@ be generated quite easily using the
- [x] Log retention by message age
- [x] Log retention by number of messages
- [x] Log compaction by key
- [ ] Consumer-offset checkpointing in the log
- [ ] Consumer groups
- [x] Minimum ISR support
- [x] Additional subscribe semantics
- [x] Oldest
Expand All @@ -236,6 +245,7 @@ be generated quite easily using the
- [x] By timestamp
- [x] By time delta
- [ ] Single-stream fanout
- [x] Stream partitioning
- [ ] Opt-in ISR replica reads
- [ ] Read-replica support
- [ ] Authentication and authorization
Expand Down
121 changes: 72 additions & 49 deletions documentation/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,88 +7,108 @@ pub/sub messaging system that centers around the concept of *subjects*. Clients
publish messages to subjects and receive messages from *subscriptions* to
subjects.

## Stream
## Streams and Partitions

Fundamentally, Liftbridge is just a consumer of NATS subjects. It receives
messages received from NATS subjects and records them in a durable log which
is then exposed to subscribers. Specifically, Liftbridge centers around the
concept of a *stream*, which is a durable message stream attached to a NATS
subject. A stream is ordered, replicated, and durably stored on disk and serves
as the unit of storage and parallelism in Liftbridge.
subject. A stream consists of one or more *partitions*, which are ordered,
replicated, and durably stored on disk and serve as the unit of storage and
parallelism in Liftbridge.

Streams have a few key properties: a subject, which is the corresponding NATS
subject, a name, which is a human-readable identifier for the stream, and a
replication factor, which is the number of nodes the stream should be
replicated to for redundancy. Optionally, there is a group which is the name of
a load-balance group for the stream to join. When there are multiple streams in
the same group, messages will be balanced among them.
replication factor, which is the number of nodes the stream's partitions should
be replicated to for redundancy. Optionally, there is a group which is the name
of a load-balance group for the stream to join. When there are multiple streams
in the same group, messages will be balanced among them.

There can be multiple streams attached to the same NATS subject, but within a
given subject, the name must be unique. Thus, a stream can be uniquely
identified by the combination of its subject and name.
There can be multiple streams attached to the same NATS subject, but stream
names must be unique within a cluster.

By default, streams have a single partition. This partition maps directly to
the stream's NATS subject. If a stream has multiple partitions, each one maps
to a different NATS subject derived from the stream subject. For example, if a
stream with three partitions is attached to the subject "foo", the partitions
will map to the subjects "foo", "foo.1", and "foo.2", respectively.

Each partition has its own message log, leader, and set of followers.

### Write-Ahead Log

Each stream is backed by a durable write-ahead log. All reads and writes to the
log go through the stream *leader*, which is selected by the cluster
[controller](#controller). The leader sequences each message in the stream and
sends back an acknowledgement to publishers upon committing a message to the
log. A message is committed to the log once it has been replicated to the
stream's [in-sync replica set (ISR)](#in-sync-replica-set-isr).
Each stream partition is backed by a durable write-ahead log. All reads and
writes to the log go through the partition *leader*, which is selected by the
cluster [controller](#controller). The leader sequences each message in the
partition and sends back an acknowledgement to publishers upon committing a
message to the log. A message is committed to the log once it has been
replicated to the partition's
[in-sync replica set (ISR)](#in-sync-replica-set-isr).

Consumers read committed messages from the log through a subscription on the
stream. They can read back from the log at any arbitrary position, or *offset*.
Additionally, consumers can wait for new messages to be appended to the log.
partition. They can read back from the log at any arbitrary position, or
*offset*. Additionally, consumers can wait for new messages to be appended
to the log.

### Scalability

Liftbridge is designed to be clustered and horizontally scalable. The
[controller](#controller) is responsible for creating streams. When a stream is
created, the controller selects replicas based on the replication factor and
replicates the stream to the cluster. Once this replication completes, the
stream has been created and the leader begins processing messages.
[controller](#controller) is responsible for creating stream partitions.
When a partition is created, the controller selects replicas based on the
stream's replication factor and replicates the partition to the cluster.
Once this replication completes, the partition has been created and the
leader begins processing messages.

As mentioned above, there can exist multiple streams attached to the same NATS
subject or even subjects that are semantically equivalent e.g. "foo.bar" and
"foo.*". Each of these streams will receive a copy of the message as NATS
handles this fan-out. However, the stream name is unique within a given
subject. For example, creating two streams for the subject "foo.bar" named
"foo" and "bar" respectively will create two streams which will independently
sequence all of the messages on the NATS subject "foo.bar", but attempting to
create two streams for the same subject both named "foo" will result in
creating just a single stream (creation is idempotent).
handles this fan-out.

With this in mind, we can scale linearly by adding more nodes to the Liftbridge
cluster and creating more streams which will be distributed amongst the
cluster members. This has the advantage that we don't need to worry about
partitioning so long as NATS is able to withstand the load.

Alternatively, streams can join a named load-balance group, load balances
partitioning so long as NATS is able to withstand the load. The downside of this
is that it results in redundant processing of messages. Consumers of each stream
are all processing the same set of messages. The other issue is because each
stream is independent of each other, they have separate guarantees. Separate
leaders/followers, separate ISRs, and separate acking means the logs for each
stream are not guaranteed to be identical, even though they are bound to the
same NATS subject.

To accommodate this, streams are partitioned. By default, a stream consists of
just a single partition, but multiple partitions can be created for increased
parallelism. Messages can then be delivered to partitions based on their key,
in a round-robin fashion, randomly, or with some other partitioning strategy
on the client.

Additionally, streams can join a named load-balance group, which load balances
messages on a NATS subject amongst the streams in the group. Load-balance
groups do not affect message delivery to other streams.
groups do not affect message delivery to other streams not participating in
the group.

Currently, replicas in Liftbridge act only as a mechanism for high availability
and not scalability. However, there may be work in the future to allow them to
act as read replicas for further scale out.

The diagram below shows a cluster of three servers with a set of streams.
Streams in yellow indicate the server is the leader for the stream.
Partitions in yellow indicate the server is the leader for the partition.

![streams](./images/streams.png)
![cluster](./images/cluster.png)

### In-Sync Replica Set (ISR)

The In-Sync Replica set (ISR) is a key aspect of the replication protocol in
Liftbridge. The ISR consists of the set of stream replicas that are currently
caught up with the leader. It is equivalent to the [ISR concept in
Liftbridge. The ISR consists of the set of partition replicas that are
currently caught up with the leader. It is equivalent to the [ISR concept in
Kafka](https://kafka.apache.org/documentation/#design_replicatedlog), and the
replication protocol works very similarly.
[replication protocol](./replication_protocol.md) works very similarly.

In order for a message to be committed to a stream's write-ahead log, it must
be acknowledged by all brokers in the ISR. To prevent a single slow broker from
blocking progress, replicas that fall too far behind the leader are removed
from the ISR. The leader does this by making a request to the controller. In
this case, the cluster enters an under-replicated state for the stream.
In order for a message to be committed to a partition's write-ahead log, it
must be acknowledged by all brokers in the ISR. To prevent a single slow
broker from blocking progress, replicas that fall too far behind the leader
are removed from the ISR. The leader does this by making a request to the
controller. In this case, the cluster enters an under-replicated state for
the partition.

Being "too far behind" is controlled by the `replica.max.lag.time`
configuration. This refers to both the maximum amount of time a replica can go
Expand All @@ -99,7 +119,7 @@ added back into the ISR and the cluster goes back into its fully replicated
state.

Under normal conditions, only a replica from the ISR can be elected the leader
of a stream. This favors data consistency over availability since if the ISR
of a partition. This favors data consistency over availability since if the ISR
shrinks too far, there is a risk of being unable to elect a new leader.

### Acknowledgement
Expand Down Expand Up @@ -130,16 +150,19 @@ you must first create a topic and then you publish to that topic.
### Subscription

Subscriptions are how Liftbridge streams are consumed. A client subscribes to a
stream and specifies a starting offset to begin consuming from. At this point,
the server creates an ephemeral data stream for the client and begins sending
messages to it. Once it consumes up to the end of the log, the server will wait
for more messages to be published until the subscription is closed by the
client.
stream partition and specifies a starting offset to begin consuming from. At
this point, the server creates an ephemeral data stream for the client and
begins sending messages to it. Once it consumes up to the end of the log, the
server will wait for more messages to be published until the subscription is
closed by the client.

Subscriptions are not stateful objects. When a subscription is created, there
is no bookkeeping done by the server, aside from the in-memory objects tied to
the lifecycle of the subscription. As a result, the server does not track the
position of a client in the log beyond the scope of a subscription.
position of a client in the log beyond the scope of a subscription. Stateful
consumer groups will be coming in the near future which will allow a consumer
to pick up where it left off and provide fault-tolerant consumption of
streams.

### Stream Retention and Compaction

Expand Down
41 changes: 28 additions & 13 deletions documentation/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,41 @@ features such as durable subscriptions, queue groups, pluggable storage
backends, and multiple fault-tolerance modes. Liftbridge aims to have a small
API surface area.

The key features that differentiate Liftbridge are the shared message namespace,
wildcards, log compaction, and horizontal scalability. NATS Streaming replicates
channels to the entire cluster through a single Raft group. Liftbridge allows
replicating to a subset of the cluster, and each stream is replicated
independently. This allows the cluster to scale horizontally. NATS Streaming
also does not support channel partitioning, requiring it to be implemented at
the application layer. Liftbridge has built-in support for stream partitioning.

## How does it scale?

Liftbridge scales horizontally by adding more brokers to the cluster and
creating more streams which are distributed among the cluster. In effect, this
splits out message routing from storage and consumption, which allows
Liftbridge to scale independently and eschew subject partitioning.
Alternatively, streams can join a load-balance group, which effectively load
balances a NATS subject among the streams in the group without affecting
delivery to other streams.
Liftbridge has several mechanisms for horizontal scaling of message consumption.
Brokers can be added to the cluster and additional streams can be created which
are then distributed among the cluster. In effect, this splits out message
routing from storage and consumption, which allows Liftbridge to scale
independently and eschew subject partitioning.

Streams can also join a load-balance group, which effectively load balances a
NATS subject among the streams in the group without affecting delivery to
other streams.

Finally, streams can be partitioned, allowing messages to be divided up among
the brokers in a cluster. In fact, all streams are partitioned with the default
case being a single partition.

## What about HA?

High availability is achieved by replicating the streams. When a stream is
created, the client specifies a `replicationFactor`, which determines the
number of brokers to replicate the stream. Each stream has a leader who is
responsible for handling reads and writes. Followers then replicate the log
from the leader. If the leader fails, one of the followers can set up to
replace it. The replication protocol closely resembles that of Kafka, so there
is much more nuance to avoid data consistency problems. This will be documented
in more detail in the near future.
number of brokers to replicate the stream's partitions. Each stream partition
has a leader who is responsible for handling reads and writes. Followers then
replicate the log from the leader. If the leader fails, one of the followers
will step up to replace it. The replication protocol closely resembles that of
Kafka, so there is much more nuance to avoid data consistency problems. See the
[replication protocol documentation](./documentation/replication_protocol.md)
for more details.

## What about performance?

Expand Down
15 changes: 5 additions & 10 deletions documentation/ha_and_consistency_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ out that these characteristics are often at odds with each other.

## Replication Factor

The replication factor of a stream controls the number of nodes the stream
should be replicated to for redundancy. This value is set by the client when
creating a stream. The default replication factor is 1.
The replication factor of a stream controls the number of nodes the stream's
partitions should be replicated to for redundancy. This value is set by the
client when creating a stream. The default replication factor is 1.

For high availability and durability of data, it is recommended to use a
replication factor of at least 3.

```go
// Create a stream with a replication factor of 3.
stream := liftbridge.StreamInfo{
Subject: "foo",
Name: "foo-stream",
ReplicationFactor: 3,
}
if err := client.CreateStream(context.Background(), stream); err != nil {
if err != liftbridge.ErrStreamExists {
if err := client.CreateStream(context.Background(), "foo", "foo-stream", lift.ReplicationFactor(3)); err != nil {
if err != lift.ErrStreamExists {
panic(err)
}
}
Expand Down
Binary file added documentation/images/cluster.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed documentation/images/streams.png
Binary file not shown.
5 changes: 5 additions & 0 deletions documentation/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ write-ahead log. Multiple consumers can read back from the same stream, and
multiple streams can be attached to the same subject. Liftbridge provides a
Kafka-like API in front of NATS.

Liftbridge streams are partitioned for horizontal scalability. By default,
streams have a single partition. Each partition maps to a separate NATS
subject derived from the stream subject. Each has a leader and is replicated
to some set of followers for fault-tolerance.

NATS is a lightweight, high-performance pub/sub messaging system. It differs
from traditional messaging middleware in that it does not provide queuing or
message durability. Instead, NATS is a fire-and-forget system with at-most-once
Expand Down
Loading