Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-378 Add ServiceUnitStateTableView abstraction (ExtensibleLoadMangerImpl only) #23300

Merged
merged 9 commits into from
Sep 17, 2024
221 changes: 221 additions & 0 deletions pip/pip-378.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# PIP-378: Add ServiceUnitStateTableView abstraction (ExtensibleLoadMangerImpl only)

## Background

### ExtensibleLoadMangerImpl uses system topics to event-source bundle ownerships

PIP-192 introduces a new broker load balancer using a persistent system topic to event-source bundle ownerships among brokers.

PIP-307 introduces graceful ownership change protocol over the system topic (from PIP-192).

However, using system topics to manage bundle ownerships may not always be the best choice. Users might need an alternative approach to event-source bundle ownerships.


## Motivation

Add `ServiceUnitStateTableView` abstraction and make it pluggable, so users can customize `ServiceUnitStateTableView` implementations and event-source bundles ownerships using other stores.

## Goals

### In Scope

- Add `ServiceUnitStateTableView` interface
- Add `ServiceUnitStateTableViewImpl` implementation that uses Pulsar System topic (compatible with existing behavior)
- Add `ServiceUnitStateMetadataStoreTableViewImpl` implementation that uses Pulsar Metadata Store (new behavior)
- Refactor `ExtensibleLoadMangerImpl` and `ServiceUnitStateChannelImpl` to accept `ServiceUnitStateTableView`.
- Refactor related tests code

## High-Level Design

- Refactor `ServiceUnitStateChannelImpl` to accept `ServiceUnitStateTableView` interface and `ServiceUnitStateTableViewImpl` system topic implementation.
- Introduce `MetadataStoreTableView` interface to support `ServiceUnitStateMetadataStoreTableViewImpl` implementation.
- `MetadataStoreTableViewImpl` will use shadow hashmap to maintain the metadata tableview. It will initially fill the local tableview by scanning all existing items in the metadata store path. Also, new items will be updated to the tableview via metadata watch notifications.

## Detailed Design

### Design & Implementation Details
```java
public interface ServiceUnitStateTableView extends Closeable {

/**
* Starts the tableview.
* @param pulsar pulsar service reference
* @param tailItemListener listener to listen tail(newly updated) items
* @param existingItemListener listener to listen existing items
* @param skipItemListener listener to listen skipped(update conflict) items
* @throws IOException if it fails to init the tableview.
*/
void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData> tailItemListener,
BiConsumer<String, ServiceUnitStateData> existingItemListener,
BiConsumer<String, ServiceUnitStateData> skipItemListener) throws IOException;


/**
* Closes the tableview.
* @throws IOException if it fails to close the tableview.
*/
void close() throws IOException;

/**
* Gets one item from the local tableview.
* @param key the key to get
* @return value if exists. Otherwise, null.
*/
ServiceUnitStateData get(String key);

/**
* Tries to put the item in the persistent store.
* All peer tableviews (including the local one) will be notified and be eventually consistent with this put value.
* <p>
* It ignores conflicting updates, which can be listened by skipItemListener.
* @param key the key to put
* @param value the value to put
* @return a future to track the completion of the operation
*/
CompletableFuture<Void> put(String key, ServiceUnitStateData value);

/**
* Tries to delete the item from the persistent store.
* All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion.
* @param key the key to delete
* @return a future to track the completion of the operation
*/
CompletableFuture<Void> delete(String key);

/**
* Returns the entry set of the completed items in the tableview.
* @return entry set
*/
Set<Map.Entry<String, ServiceUnitStateData>> entrySet();

/**
* Returns service units owned by this broker.
* @return a set of owned service units
*/
Set<NamespaceBundle> ownedServiceUnits();

/**
* Tries to flush any batched or buffered updates.
* @param waitDurationInMillis time to wait until completed
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
void flush(long waitDurationInMillis) throws ExecutionException, InterruptedException, TimeoutException;
}
```

```java
public interface MetadataStoreTableView<T> {

class ConflictException extends RuntimeException {
public ConflictException(String msg) {
super(msg);
}
}

/**
* Starts the tableview.
*/
void start() throws MetadataStoreException;

/**
* Reads whether a specific key exists.
*
* @param key the key to check on the tableview
* @return a future to track the async request
*/
boolean exists(String key);

/**
* Gets one item from the local tableview.
* <p>
* If the key is not found, return null.
*
* @param key the key to check on the tableview
* @return a future to track the completion of the operation
*/
T get(String key);

/**
* Tries to put the item in the persistent store.
* All peer tableviews (including the local one) will be notified and be eventually consistent with this put value.
* <p>
* This can fail by the registered conflict resolver when putting conflicting items concurrently.
*
* @param key the key to check on the tableview
* @return a future to track the completion of the operation
* @throws MetadataStoreTableView.ConflictException
* if the put fails due to conflicting value.
*/
CompletableFuture<Void> put(String key, T value);

/**
* Tries to delete the item from the persistent store.
* All peer tableviews (including the local one) will be notified and be eventually consistent with this deletion.
* <p>
* This can fail if the item is not present in the metadata store.
*
* @param key the key to check on the tableview
* @return a future to track the completion of the operation
* @throws MetadataStoreException.NotFoundException
* if the item is not present in the metadata store.
*/
CompletableFuture<Void> delete(String key);

/**
* Returns the size of the items in the tableview.
* @return size
*/
int size();

/**
* Reads whether the tableview is empty or not.
* @return true if empty. Otherwise, false
*/
boolean isEmpty();

/**
* Returns the entry set of the completed items in the tableview.
* @return entry set
*/
Set<Map.Entry<String, T>> entrySet();

/**
* Returns the key set of the completed items in the tableview.
* @return key set
*/
Set<String> keySet();

/**
* Returns the values of the completed items in the tableview.
* @return values
*/
Collection<T> values();

/**
* Runs the action for each completed item in the tableview.
*/
void forEach(BiConsumer<String, T> action);

}
```


### Configuration

Add a `loadManagerServiceUnitStateTableViewClassName` configuration to specify `ServiceUnitStateTableView` implementation class name.

## Backward & Forward Compatibility

It will ba Backward & Forward compatible as `loadManagerServiceUnitStateTableViewClassName` will be `ServiceUnitStateTableViewImpl`(system topic implementation) by default.

## Alternatives

## General Notes

## Links

* Mailing List discussion thread:
* Mailing List voting thread: