Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-13761: [PIP-143] Support split paritions belonging to specified topics in a bundle #3569

Closed
sijie opened this issue Jan 14, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Jan 14, 2022

Original Issue: apache#13761


Motivation

As we all know, a namespace bundle may contain lots of partitions belonging to different topics.
The throughput of these topics may vary greatly. Some topics may with very high rate/throughput while other topics have a very low rate/throughput.

These partitions with high rate/throughput can cause broker overload and bundle unloading.
At this point, if we split bundle manually with range_equally_divide or topic_count_equally_divide split algorithm, there may need many times split before these high rate/through partitions assigned to different new bundles.

For convenience, we call these high throughput topics outstanding topic and their partitions outstanding partition in this PIP.

Goal

Our goal is to make it easier to split outstanding partition into new bundles.

There are two alternative ways to achieve this. Either of them will add a new algorithm for bundle split. The difference is how the new bundle split algorithm is implemented.

One algorithm is to split bundle by outstanding topic which will split the bundle into two new bundles and each new bundle contains an equally outstanding partition once a time.
E.g, a bundle contains lots of topic partitions, and only one outstanding topic(T) with 2 outstanding partition (T-partition-n, Tpartition-n+1). This algorithm split this bundle at the middle point of these two partition's hashcode. This algorithm has a disadvantage, it can only deal with one outstanding topic.

So we raised up another algorithm.

The other algorithm is to split the bundle at the hashcode point of the outstanding partition which will split the bundle into three bundles once a time. The middle one contains the only point the hashcode of the outstanding partition, the left one is less than the hashcode, the right one is more than the hashcode. E.g. if we have a bundle 0x00_0x10 contains two outstanding partition` (partition-x and partition-y) whose hashcode is 0x03 and 0x07, this algorithm is going to split bundle the bundle into five new bundles, 0x00_0x03, 0x03_0x04( for parition-x), 0x04_0x07, 0x07_0x08( for partition-y), 0x08_0x10.

API Changes

The Admin CLI bin/pulsar-admin namespaces split-bundle -b ${bundle_range} will add a new parameter "--topic" or "-t" for outstanding topic name.

The split interface changed from

void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)throws PulsarAdminException;

to

void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
                              String splitAlgorithmName, String topic) throws PulsarAdminException;

Implementation

There are changes both from the Admin CLI and the broker side.

First, Admin CLI for split bundle should support to specify the outstanding topic,

/**
     * Split namespace bundle.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     * @param topic
     * @throws PulsarAdminException
     */
    void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
                              String splitAlgorithmName, String topic) throws PulsarAdminException;
/**
     * Split namespace bundle asynchronously.
     *
     * @param namespace
     * @param bundle range of bundle to split
     * @param unloadSplitBundles
     * @param splitAlgorithmName
     */
    CompletableFuture<Void> splitNamespaceBundleAsync(
            String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName, String topic);

And for the broker side, first encapsulates the parameters for bundle split into a new class BundleSplitOption

public class BundleSplitOption {
    private NamespaceService service;
    private NamespaceBundle bundle;
    private String topic;
}

add a new split algorithm

ublic class SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
    @Override
    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
        
        });
    }
}

add the new algorithm to NamespaceBundleSplitAlgorithm

String SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE = "specified_topic_count_equally_divide";

List<String> AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
            TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE);

 NamespaceBundleSplitAlgorithm SPECIFIED_TOPIC_COUNT_EQUALLY_DIVIDE_ALGO =
            new SpecifiedTopicCountEquallyDivideBundleSplitAlgorithm();

modify the splitAndOwnBundle and splitAndOwnBundleOnceAndRetry for [NamespaceService.java](https://github.com/apache/pulsar/pull/13796/files#diff-99867fd2e328e0a689daa8f60d174aa96d87dd0d0fd29b9e68fe2e1f377b55a1)

public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
                                                     NamespaceBundleSplitAlgorithm splitAlgorithm, String topic) {

        final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
        final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
        splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, topic);

        return unloadFuture;
    }
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                       boolean unload,
                                       AtomicInteger counter,
                                       CompletableFuture<Void> completionFuture,
                                       NamespaceBundleSplitAlgorithm splitAlgorithm,
                                       String topic) {

Also, we change the REST api and broker.conf

public void splitNamespaceBundle(
            @Suspended final AsyncResponse asyncResponse,
            @PathParam("property") String property,
            @PathParam("cluster") String cluster,
            @PathParam("namespace") String namespace,
            @PathParam("bundle") String bundleRange,
            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
            @QueryParam("unload") @DefaultValue("false") boolean unload,
            @QueryParam("topic") @DefaultValue("") String topic) {}
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_topic_count_equally_divide
@sijie sijie changed the title ISSUE-13761: Support split paritions belonging to specified topic in a bundle ISSUE-13761: [PIP-143] Support split paritions belonging to specified topics in a bundle Feb 17, 2022
@sijie sijie added the PIP label Feb 17, 2022
@sijie sijie closed this as completed Mar 11, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant