Skip to content

Commit

Permalink
Create interface RemoteEntitiesManager
Browse files Browse the repository at this point in the history
 - Add javadocs
 - fix after rebase

Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jul 9, 2024
1 parent 51af2e2 commit 9c6a732
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 225 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* An abstract class that provides a base implementation for managing remote entities in the remote store.
*/
public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager {
/**
* A map that stores the remote writable entity stores, keyed by the entity type.
*/
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();

/**
* Retrieves the remote writable entity store for the given entity.
*
* @param entity the entity for which the store is requested
* @return the remote writable entity store for the given entity
* @throws IllegalArgumentException if the entity type is unknown
*/
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

/**
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
*
* @param component the component for which the write operation is performed
* @param remoteObject the remote object to be written
* @param latchedActionListener the latched action listener to be notified when the write operation completes
* @return an ActionListener for handling the write operation
*/
protected abstract ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
);

/**
* Returns an ActionListener for handling the read operation for the specified component,
* remote object, and latched action listener.
*
* @param component the component for which the read operation is performed
* @param remoteObject the remote object to be read
* @param latchedActionListener the latched action listener to be notified when the read operation completes
* @return an ActionListener for handling the read operation
*/
protected abstract ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
);

@Override
public CheckedRunnable<IOException> getAsyncWriteRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener));
}

@Override
public CheckedRunnable<IOException> getAsyncReadRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
) {
return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;

/**
* The RemoteEntitiesManager interface provides async read and write methods for managing remote entities in the remote store
*/
public interface RemoteEntitiesManager {

/**
* Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity.
*
* @param component the component for which the read operation is performed
* @param entity the entity to be read
* @param latchedActionListener the listener to be notified when the read operation completes
* @return a CheckedRunnable that performs the asynchronous read operation
*/
CheckedRunnable<IOException> getAsyncReadRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
);

/**
* Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity.
*
* @param component the component for which the write operation is performed
* @param entity the entity to be written
* @param latchedActionListener the listener to be notified when the write operation completes
* @return a CheckedRunnable that performs the asynchronous write operation
*/
CheckedRunnable<IOException> getAsyncWriteRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<UploadedMetadata> latchedActionListener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteEntitiesManager;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
Expand All @@ -26,22 +25,19 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
Expand All @@ -52,7 +48,6 @@ public class RemoteClusterStateAttributesManager {
ThreadPool threadpool
) {
this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteWritableEntityStores = new HashMap<>();
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
Expand Down Expand Up @@ -85,46 +80,28 @@ public class RemoteClusterStateAttributesManager {
);
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
}

private ActionListener<Void> getActionListener(
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
);
}

private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<RemoteReadResult> listener
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
return ActionListener.wrap(
response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
);
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
Expand Down Expand Up @@ -158,4 +135,5 @@ public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterSta
NonDiffableValueSerializer.getAbstractInstance()
);
}

}
Loading

0 comments on commit 9c6a732

Please sign in to comment.