Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-16691: PIP-192: New Pulsar Broker Load Balancer #4581

Open
sijie opened this issue Jul 20, 2022 · 0 comments
Open

ISSUE-16691: PIP-192: New Pulsar Broker Load Balancer #4581

sijie opened this issue Jul 20, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Jul 20, 2022

Original Issue: apache#16691


Proposal: New Pulsar Broker Load Balancer

Motivation

As previously shared with the community, we observed many improvement areas around the Pulsar load balancer[1]. Since the improvement requires significant changes, first, we would like to share the overall goals for this project and the high-level components to design. This doc will highlight the architecture of the new broker load balancer.

Goals

We set up the project goals in the following areas.

User-facing goals

Logic

  • Balance cluster utilization as uniform as possible with minimal delays

Logs / Metrics

  • Show transparent load balance decisions with logs and metrics.

Admin API / Configurations

  • Provide ways to override system decisions.
  • Reduce the number of configurations to tune.
  • Provide the better configuration default values with explanations.
  • (second-phase) Provide ways to set custom load balance strategy.

Internal Implementation goals

Logic

  • Keep the three major load balance logics but make them more efficient and faster. We will discuss detailed algorithm improvements separately.
    • Topic(Bundle)-broker-assignment: improve the randomization and assignment distribution
    • Bundle-split: revisit the current threshold-based strategy
    • Bundle-unload: revisit the frequency of unloading

Implementation

  • Distribute bundle-broker assignment and bundle-split decisions to local brokers.
  • Synchronize bundle unloading decisions by the leader broker.
  • Reduce load data replication among brokers.
  • Replace load data’s metadata stores with topic table-views.
  • Remove the client dependency in the load balance logic.
    • Remove the client redirection in the assignment logic.
    • Add the bundle transfer option in the unload logic instead of relying on clients’ broker discovery calls
  • Minimize the topic unavailability from bundle unloading with the bundle transfer option
  • Introduce the bundle state channel(table-view) to make bundle load balance operations consistent and fault-tolerant among brokers.
  • Isolate the new load balancer code in new classes.
  • Replace the bundle ownership metadata store(ZK znodes) with the bundle state channel.

Logs / Metrics

  • Add meaningful logs and metrics for all major load balance events.
  • Add documentation about how to read load balancer metrics and logs.

Admin API / Configurations

  • Add an admin CLI to transfer a bundle to a specific broker.
  • Add necessary configurations to override the load balance decisions.
  • Dynamically adjust internal configuration thresholds based on the load data.
  • Make the admin APIs fault-tolerant and easy to monitor.

Testing

  • Document the testing plan and coverage status.
  • Add unit tests for happy and unhappy cases.
  • Add global load balance logic tests, and compare the current load manager with the new load manager.

API Changes

We will add the transfer unload option --dest to specifically unload the topic(bundle) to the destination broker.

pulsar-admin topics unload persistent://tenant/namespace/topic --dest ${destination_broker}

Implementation (High-Level Components)

New Load Manager

  • It refactors the existing load balance logic with better modularity.
  • It isolates the new code in the new classes without breaking the existing logic.
  • This new load manager will be disabled in the first releases until proven stable.

Load Data Models

LocalBrokerData: broker’s factual data
  • e.g.) {webServiceUrl, pulsarServiceUrl, …}
  • Persisted in MetadataStore(ZK)

BrokerLoadData: broker’s load data

  • e.g.) {cpu, memory, io, msgIn/Out, ...}
  • Published in BrokerLoadDataStore(TableView)

BundlesLoadData: bundle’s load data

  • e.g.) { bundleName, msgIn/Out, ...}
  • Cached in the local broker only

TopBundlesLoadData: top-n high-loaded bundle load data from the broker

  • e.g.) {brokerUrl, high_load_bundles :[{bundleName, …}], …}
  • Published in TopBundlesLoadDataStore(TableView)

Load Data Write-Read Flow

LocalBrokerData

Write:

  • Upon init, each broker stores LocalBrokerData in its ephemeral znode in MetaDataStore(ZK) to monitor live brokers(same as now)
  • The broker-level load data moved to BrokerLoadData

Read:

  • All brokers check LocalBrokerData to confirm the list of live brokers.

BrokerLoadData

Write:

  • Each broker periodically computes local load(BrokerLoadData) and publishes it to BrokerLoadDataStore(TableView)(non-persistent)
    • Because non-persistent TableView can often lose data, we will add a TTL policy to tombstone old KVs in BrokerLoadDataStore.

Read:

  • All brokers consume BrokerLoadDataStore
  • With aggregated BrokerLoadData, all brokers perform bundle assignments without going through the leader.

BundlesLoadData

Write:

  • Each broker monitors the allocated bundles' load and stores them in the local cache BundlesLoadData(In-memory-HashMap).
  • BundlesLoadData will not be replicated to other brokers’ caches.

Read:

  • Each broker locally reads BundlesLoadData and computes top n high load bundles, TopBundlesLoadData.
  • With the local BundlesLoadData, all brokers perform bundle splits without going through the leader.

TopBundlesLoadData

Write:

  • Each broker periodically compute TopBundlesLoadData and publishes it to TopBundlesLoadDataStore(TableView)(non-persistent)
    • We will add a TTL policy to tombstone old KVs in TopBundlesLoadDataStore.

Read:

  • Only the leader broker consumes TopBundlesLoadDataStore
    • With the aggregated TopBundlesLoadData and BrokerLoadData, the leader initiates bundle unload(transfer) operations.

Load Data Flow

loadDataModel_v2

Major Modifications on Bundle Split, Unload, and Assignment Flow

  • With the local BundlesLoadData, all brokers perform bundle splits without going through the leader. By default, newly split bundles will be the target to unload(transfer).
  • With aggregated BrokerLoadData, all brokers perform bundle assignments without going through the leader.
  • With aggregated TopBundlesLoadData and BrokerLoadData, the leader makes decisions to unload(transfer) bundles.
  • We will add a new bundle unload option, transfer, which transfers bundles from one broker to another.
  • We will introduce a global channel(Bundle State Channel) to share consistent/linearized bundle state changes with brokers.

Bundle State Channel

This bundle state channel is a persistent topic table-view used as a WAL to broadcast the total order of all bundle state changes in the cluster. All brokers will asynchronously consume messages in this channel in the same order and react to bundle state changes(sequential consistency). With the table-view compaction, the bundle state channel will eventually materialize the current bundle-broker ownership. Read operations on this channel can be deferred(e.g., clients’ topic lookup requests) in a few seconds, depending on the current state of the bundle.

Bundle State Lifecycles

We define the following states and actions and linearize the bundle state changes.
(This is a high-level design to explain the concept here. The final version may differ.)

Bundle States (5)

Bundle Actions

  • Own: Own the bundle ownership
    • The owner broker is selected by the local load manager.
  • Transfer: Transfer the bundle ownership to the destination broker.
    • The source broker internally disables the bundle ownership.
    • The destination broker owns the bundle.
  • Return: Return deferred client connections with the destination broker URL
    • Close the connections if already being served
  • Split: Split the target(parent) bundle into child bundles.
  • Create: Create the child bundle entries in the channel, initially assigned to the local broker.
  • Discard: Discard the bundle entry in the channel(tombstone operation)
  • Unload: Unload the bundle ownership from the owner broker
    • Disable the bundle ownership
    • Close client connections under the bundle
    • Run the Discard action

Bundle States

  • Assigned: assigned to a broker
  • Assigning: in the process of assigning the ownership
  • Splitting: in the process of splitting a bundle range.
  • Unassigned: unassigned to any broker (removed from the channel)

*New client connections to the bundle are deferred(with timeouts) in the Assigning state.

Bundle State Change Examples

The bundle state channel can be used like the followings.

Bundle Transfer Example

(State, Action) Sequence:
(Assigned, Transfer) => (Assigning, Return) => (Assigned,)

  1. The leader finds target bundles from TopBundlesLoadData and initiates a bundle unload(a transfer) by broadcasting the unload state change to the bundle state channel, keyed by the bundleName.
    e.g. {key:bundleName, value:{flow:transfer, action:transfer, state:assigning, from:A, to:B}}}
  2. All brokers will consume the state change message in the channel.
  3. Upon consuming the message from the channel, if any state change involves the local broker, the broker performs its role and updates the state back in the channel to continue the state change. If there are conflicting state changes with the ongoing one, ignore them.
  4. Meanwhile, if other brokers(broker C) receive lookup requests for the bundle, the client's connections will be deferred(with timeouts) until they receive the “Return” action. When the “Return” action is broadcasted, all brokers will return the pending connections with the owner broker’s URL. Also, the existing connections from the source broker will be closed.
Bundle Split Example

(State, Action) Sequence:
(Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) | (Assigned, ), (Assigned, )}

  1. Each owner broker monitors local BundlesLoadData and initiates a bundle split by broadcasting the transfer state change to the bundle state channel, keyed by the bundleName.
    e.g. {key:bundleName, value:{flow: split, action:split, state: splitting, from: A, to: B, transfer: true}}}
  2. Same as Bundle Transfer Example step 2.
  3. Same as Bundle Transfer Example step 3.
    a. After the “Split,” the owner broadcasts the children bundles’ ownership creation(state=assigned) and the parent bundle’s ownership unload(empty message).
    b. By default, the owner publishes a message to the TopBundlesLoadData store asking the leader to unload(or transfer) the children bundles.
Bundle Assignment Example

(State, Action) Sequence:
(Unassigned, Own) => (Assigning, Return) => (Assigned,)

  1. When requested by clients, the first connected brokers check if any broker in the state channel owns the bundle. Return the owner broker URL if found. Else, initiate a bundle assignment by broadcasting the assignment state change.
    e.g. {key:bundleName, value:{flow: assignment, action:own, state:assigning, to: B}}}
  2. Same as Bundle Transfer Example step 2.
  3. Same as Bundle Transfer Example step 3.
  4. Same as Bundle Transfer Example step 4.

Bundle-Broker Ownership State

Because the bundle state channel shows the current bundle-broker ownership, we can remove the redundant bundle ownership store(ZK znodes). Each broker will look up the bundle ownership channel to check which broker currently owns the requested bundles or is in the ownership assignment/unload(transfer) process. Besides, before return, the broker availability metadata store(LocalBrokerData znode existence) could be checked to confirm the owner brokers' availability further.

Bundle State Channel Owner Selection and Discovery

Bundle State Channel(BSC) is another topic, and because of its circular dependency, we can't use the BundleStateChannel to find the owner broker of the BSC topic. For example, when a cluster starts, each broker needs to initiate BSC TopicLookUp(to find the owner broker) in order to consume the messages in BSC. However, initially, each broker does not know which broker owns the BSC.

The ZK leader election can be a good option to break this circular dependency, like the followings.

Channel Owner Selection

The cluster can use the ZK leader election to select the owner broker. If the owner becomes unavailable, one of the followers will become the new owner. We can elect the owner for each bundle state partition.

Channel Owner Discovery

Then, in brokers’ TopicLookUp logic, we will add a special case to return the current leader(the elected BSC owner) for the BSC topics.

Conflict State Resolution(Race Conditions)

Without distributed locks, we can resolve conflicting state changes by a conflict state resolution algorithm in an optimistic and eventual manner. Brokers can take the first valid state change in the linearized view as the winner state and ignore the later ones.

One caveat is that because the current table-view compaction takes only the last ones as the result values, we need to introduce an internal compaction algo for this channel to follow the conflict resolution algorithm(the first valid state change as the result value).

Bundle State Conflict Resolution Algorithm Example

For each bundle:

    // A[i] is a linearized bundle state change action at i, and
    // S is the current bundle state after A[i-1],
    // where the sequence number i monotonically increases.
    for each A[i] and S:
	
        // no arrows in the state diagram
        If A[i] is invalid from S: 
            Reject A[i]

        Else: Accept A[i]

For instance, let’s say for bundle x, there are two conflicting assignments initiated. The linearized state change messages will be like the following.
(own, to:B), (own, to:A)
By the conflict resolution algorithm, the second state change (own, to:A) will be ignored by all brokers(and by the compaction algorithm). Eventually, the “return” message will be broadcasted by declaring that the owner is “B.”
(own, to:B), (own, to:A), (return, to:B)

Let’s take another example. Let’s say bundle x is already assigned to broker B, but another broker initiates the “own” action(before consuming the “return” action). This last “own” state change will be ignored since this action “own” is invalid from the previous state “assigned.” (in the above state diagram, there is no “own” action arrow from the “assigned” state.)
(own, to:B), (return, to:B), (own, to:A)

Failure Recovery

When a broker is down

When state change participants(brokers) are suddenly unavailable, the state change could become an orphan, as the participants do not play the role. For these orphan state changes, the leader broker will run orphan state clean-up logic. For instance, the leader can add the bundle state clean-up logic in the broker unavailability notification handler(znode watcher) in order to clean the pending bundle state changes and ownerships from unavailable brokers. Also, to make the clean-up logic further fault-tolerant, the leader broker will run the clean-up function when it initializes. Additionally, we could make the leader periodically call the clean-up in a separate monitor thread(we shouldn’t redundantly call this cleanup too often).

When the entire ZK is down and comes back

Every broker will be notified when its ZK session undergoes the connection issue. Then, the brokers will be in the "safe" mode, serving the existing topics as-is, but not allowing the ZK-related operations. The leader won't run the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is down.

When ZK comes back, each broker will know ZK sessions are re-established. They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then, they will recover the bundle state table-view and return to the normal mode.

Bundle State and Load Data TableView Scalability

Expected read/write traffic:
Write: there will be relatively fewer messages from the write path with occasional spikes
Read: the fan-out broadcast could cause bottlenecks when the cluster is enormous.

This bundle state channel is relatively lightweight from the producers because bundle state change is relatively less frequent. Still, message dispatch to consumers could be heavier if the cluster is very large. The same issue can happen to other table-views(BrokerLoadDataStorage) introduced in this proposal. We could consider the following methods to scale the table views’ produce/consume rates in a large cluster.

Split Broker Cluster to multiple clusters

Simply, one can split a massive broker cluster into multiple clusters with different endpoints. The bookkeeper and configuration layer can be shared among the broker clusters.

Partitioned Table-View (short-term)

One can make the table views based on partitioned topics. Then, we can distribute message load to multiple partition owner brokers.

Sharding (long-term)

As the conventional scalability method, one could shard the cluster to multiple groups of brokers. Then, we can create a separate channel for each shard of brokers. This means we need an additional discovery layer to map topics to broker shards(also need to align with Namespace Isolation Policies)

We need to mention that this metadata sync scalability issue is not new in Pulsar, as the current Pulsar uses n-replication. For instance, all brokers' and all bundles' load metadata are replicated to all brokers via ZK watchers. Currently, distributed ZK servers send znode watch notifications to its clients(brokers). In this proposal, multiple table-view owner brokers(with partitioned table-views) can dispatch metadata change messages to the participants(brokers).

We think this metadata sync scalability is relatively low-priority, as only a few customers run Pulsar clusters on such a large scale. We could ask the customers first to split the cluster into multiple clusters and then enable partitioned table views. It is not practical for a single cluster to have thousands of brokers. However, we still want to ensure this design is seamlessly extensible, as a two-way-door decision.

Reject Alternatives

  • why we can not enhance current load balancer

As the PIP changes almost every place (data models, event handlers, cache/storage, logs/metrics), creating a new load balancer and isolating the new code is safer and cleaner. Then, customers could safely enable/disable the new load balancer
by a configuration before deprecating the old one.

It gives the flexibility to start fresh without the existing baggage of choices and try a significantly different approach. The current ModularLoadManagerImpl will not go away. Once the new load manager will be ready and considered stable enough, there might be a new discussion on whether to change the default implementation. Even then, users will still be able to opt for the old load manager.

Modification Summary

The followings exclude logic and algorithm modifications as this pip does not focus on the logic and algorithm improvement.

Goals Before After
Make load balance operations fault-tolerant and consistent among brokers The leader broker sends load balance commands to the owner brokers via RPC with retries. We introduce a global bundle state channel(a persistent topic table-view), where a total order of bundle commands is reliably persisted and broadcasted by all brokers.
Distribute load balance operations The leader broker decides on bundle assignment, unload, and splitting. The owner brokers run the unload and split operations notified via RPC. Each broker decides and runs bundle assignment and split operations. The leader decides bundle unload(transfer), and the owner brokers run the unload operation, notified via the bundle state channel.
Reduce load data replication among brokers All brokers’ and all bundles’ load data are stored in ZK and replicated to all brokers via ZK watchers. All brokers’ load data is replicated to all brokers via a non-persistent topic(table-view). Only top-n bundles’ load data from each broker is replicated to the leader broker via a non-persistent topic(table-view).
Minimize the topic unavailability from unloading After topic connections are closed, clients reconnect to a new broker, and the new broker initiates a new topic assignment. The leader broker assigns a new owner, and eventually, the client will be redirected to the new owner broker. We introduce a new unload option, “transfer”, where the new owner is pre-assigned before the topic connections are closed. Clients immediately redirect to the new owner broker without the client-initiated topic assignments.
Share bundle-broker ownership metadata among brokers for owner broker discovery The bundle-broker ownership data are stored in ZK. All brokers read bundle ownership info upon TopicLookUp requests(with caching local bundle ownership info). The global ownership data is stored in the bundle state channel(a persistent topic table-view). With compaction, all brokers read its latest global ownership table-view(cached in memory) upon TopicLookUp requests.
Show transparent load balance decisions with logs and metrics Emit logs best-effort basis. We design logging/metrics as separate logical components. We document and share major log messages and metrics for all important load balance events
Provide ways to override load balance decisions. There is no way to transfer a topic(bundle) to a particular broker. We introduce “--dest” option in the unload command, using the unload “transfer” option.
@sijie sijie added the PIP label Jul 20, 2022
@sijie sijie added the Stale label Sep 23, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant