Skip to content

Latest commit

 

History

History
280 lines (224 loc) · 10.6 KB

pip-378.md

File metadata and controls

280 lines (224 loc) · 10.6 KB

PIP-378: Add ServiceUnitStateTableView abstraction (ExtensibleLoadMangerImpl only)

Background

ExtensibleLoadMangerImpl uses system topics to event-source bundle ownerships

PIP-192 introduces a new broker load balancer using a persistent system topic to event-source bundle ownerships among brokers.

PIP-307 introduces graceful ownership change protocol over the system topic (from PIP-192).

However, using system topics to manage bundle ownerships may not always be the best choice. Users might need an alternative approach to event-source bundle ownerships.

Motivation

Add ServiceUnitStateTableView abstraction and make it pluggable, so users can customize ServiceUnitStateTableView implementations and event-source bundles ownerships using other stores.

Goals

In Scope

  • Add ServiceUnitStateTableView interface
  • Add ServiceUnitStateTableViewImpl implementation that uses Pulsar System topic (compatible with existing behavior)
  • Add ServiceUnitStateMetadataStoreTableViewImpl implementation that uses Pulsar Metadata Store (new behavior)
  • Refactor related code and test code

High-Level Design

  • Refactor ServiceUnitStateChannelImpl to accept ServiceUnitStateTableView interface and ServiceUnitStateTableViewImpl system topic implementation.
  • Introduce MetadataStoreTableView interface to support ServiceUnitStateMetadataStoreTableViewImpl implementation.
  • MetadataStoreTableViewImpl will use shadow hashmap to maintain the metadata tableview. It will initially fill the local tableview by scanning all existing items in the metadata store path. Also, new items will be updated to the tableview via metadata watch notifications.
  • Add BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer in MetadataCacheConfig to listen the automatic cache async reload. This can be useful to re-sync the the shadow hashmap in MetadataStoreTableViewImpl in case it is out-dated in the worst case(e.g. network or metadata issues).
  • Introduce ServiceUnitStateTableViewSyncer to sync system topic and metadata store table views to migrate to one from the other. This syncer can be enabled by a dynamic config, loadBalancerServiceUnitTableViewSyncer.

Detailed Design

Design & Implementation Details

/**
 * Given that the ServiceUnitStateChannel event-sources service unit (bundle) ownership states via a persistent store
 * and reacts to ownership changes, the ServiceUnitStateTableView provides an interface to the
 * ServiceUnitStateChannel's persistent store and its locally replicated ownership view (tableview) with listener
 * registration. It initially populates its local table view by scanning existing items in the remote store. The
 * ServiceUnitStateTableView receives notifications whenever ownership states are updated in the remote store, and
 * upon notification, it applies the updates to its local tableview with the listener logic.
 */
public interface ServiceUnitStateTableView extends Closeable {

    /**
     * Starts the tableview.
     * It initially populates its local table view by scanning existing items in the remote store, and it starts
     * listening to service unit ownership changes from the remote store.
     * @param pulsar pulsar service reference
     * @param tailItemListener listener to listen tail(newly updated) items
     * @param existingItemListener listener to listen existing items
     * @throws IOException if it fails to init the tableview.
     */
    void start(PulsarService pulsar,
               BiConsumer<String, ServiceUnitStateData> tailItemListener,
               BiConsumer<String, ServiceUnitStateData> existingItemListener) throws IOException;


    /**
     * Closes the tableview.
     * @throws IOException if it fails to close the tableview.
     */
    void close() throws IOException;

    /**
     * Gets one item from the local tableview.
     * @param key the key to get
     * @return value if exists. Otherwise, null.
     */
    ServiceUnitStateData get(String key);

    /**
     * Tries to put the item in the persistent store.
     * If it completes, all peer tableviews (including the local one) will be notified and be eventually consistent
     * with this put value.
     *
     * It ignores put operation if the input value conflicts with the existing one in the persistent store.
     *
     * @param key the key to put
     * @param value the value to put
     * @return a future to track the completion of the operation
     */
    CompletableFuture<Void> put(String key, ServiceUnitStateData value);

    /**
     * Tries to delete the item from the persistent store.
     * All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion.
     *
     * It ignores delete operation if the key is not present in the persistent store.
     *
     * @param key the key to delete
     * @return a future to track the completion of the operation
     */
    CompletableFuture<Void> delete(String key);

    /**
     * Returns the entry set of the items in the local tableview.
     * @return entry set
     */
    Set<Map.Entry<String, ServiceUnitStateData>> entrySet();

    /**
     * Returns service units (namespace bundles) owned by this broker.
     * @return a set of owned service units (namespace bundles)
     */
    Set<NamespaceBundle> ownedServiceUnits();

    /**
     * Tries to flush any batched or buffered updates.
     * @param waitDurationInMillis time to wait until complete.
     * @throws ExecutionException
     * @throws InterruptedException
     * @throws TimeoutException
     */
    void flush(long waitDurationInMillis) throws ExecutionException, InterruptedException, TimeoutException;
}
/**
 * Defines metadata store tableview.
 * MetadataStoreTableView initially fills existing items to its local tableview and eventually
 * synchronize remote updates to its local tableview from the remote metadata store.
 * This abstraction can help replicate metadata in memory from metadata store.
 */
public interface MetadataStoreTableView<T> {

    class ConflictException extends RuntimeException {
        public ConflictException(String msg) {
            super(msg);
        }
    }

    /**
     * Starts the tableview by filling existing items to its local tableview from the remote metadata store.
     */
    void start() throws MetadataStoreException;

    /**
     * Reads whether a specific key exists in the local tableview.
     *
     * @param key the key to check
     * @return true if exists. Otherwise, false.
     */
    boolean exists(String key);

    /**
     * Gets one item from the local tableview.
     * <p>
     * If the key is not found, return null.
     *
     * @param key the key to check
     * @return value if exists. Otherwise, null.
     */
    T get(String key);

    /**
     * Tries to put the item in the persistent store.
     * All peer tableviews (including the local one) will be notified and be eventually consistent with this put value.
     * <p>
     * This operation can fail if the input value conflicts with the existing one.
     *
     * @param key the key to check on the tableview
     * @return a future to track the completion of the operation
     * @throws MetadataStoreTableView.ConflictException
     *             if the input value conflicts with the existing one.
     */
    CompletableFuture<Void> put(String key, T value);

    /**
     * Tries to delete the item from the persistent store.
     * All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion.
     * <p>
     * This can fail if the item is not present in the metadata store.
     *
     * @param key the key to check on the tableview
     * @return a future to track the completion of the operation
     * @throws MetadataStoreException.NotFoundException
     *             if the key is not present in the metadata store.
     */
    CompletableFuture<Void> delete(String key);

    /**
     * Returns the size of the items in the local tableview.
     * @return size
     */
    int size();

    /**
     * Reads whether the local tableview is empty or not.
     * @return true if empty. Otherwise, false
     */
    boolean isEmpty();

    /**
     * Returns the entry set of the items in the local tableview.
     * @return entry set
     */
    Set<Map.Entry<String, T>> entrySet();

    /**
     * Returns the key set of the items in the local tableview.
     * @return key set
     */
    Set<String> keySet();

    /**
     * Returns the values of the items in the local tableview.
     * @return values
     */
    Collection<T> values();

    /**
     * Runs the action for each item in the local tableview.
     */
    void forEach(BiConsumer<String, T> action);
}
public class MetadataCacheConfig<T> {
    private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);

    ...

    /**
     * Specifies cache reload consumer behavior when the cache is refreshed automatically at refreshAfterWriteMillis
     * frequency.
     */
    @Builder.Default
    private final BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer = null;
/**
 * ServiceUnitStateTableViewSyncer can be used to sync system topic and metadata store table views to migrate to one
 * from the other.
 */
@Slf4j
public class ServiceUnitStateTableViewSyncer implements Cloneable {
    ...

    public void start(PulsarService pulsar) throws IOException {
        ... // sync  SystemTopicTableView and MetadataStoreTableView
    }


    public void close() throws IOException {
        ... // stop syncer
    }
...
}

Configuration

  • Add a loadManagerServiceUnitStateTableViewClassName static configuration to specify ServiceUnitStateTableView implementation class name.
  • Add a loadBalancerServiceUnitTableViewSyncer dynamic configuration to enable ServiceUnitTableViewSyncer to sync metadata store and system topic ServiceUnitStateTableView during migration.

Backward & Forward Compatibility

It will ba Backward & Forward compatible as loadManagerServiceUnitStateTableViewClassName will be ServiceUnitStateTableViewImpl(system topic implementation) by default.

We will introduce ServiceUnitStateTableViewSyncer dynamic config to sync system topic and metadata store table views when migrating to ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl and vice versa. The admin could enable this syncer before migration and disable it after it is finished.

Alternatives

General Notes

Links