diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java new file mode 100644 index 0000000000..6c5cab2cc7 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/FileStoreException.java @@ -0,0 +1,7 @@ +package com.github.ambry.clustermap; + +public class FileStoreException extends RuntimeException{ + public enum FileStoreErrorCode{ + FileStore, + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java index dc9531ccd7..c0ae6ae445 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaSyncUpManager.java @@ -35,6 +35,8 @@ public interface ReplicaSyncUpManager { * @throws InterruptedException */ void waitBootstrapCompleted(String partitionName) throws InterruptedException; + void initiateFileCopy(ReplicaId replicaId); + void waitForFileCopyCompleted(String partitionName) throws InterruptedException; /** * Update replica lag (in byte) between two replicas (local and peer replica) and check sync-up status. @@ -64,6 +66,8 @@ boolean updateReplicaLagAndCheckSyncStatus(ReplicaId localReplica, ReplicaId pee */ void onBootstrapComplete(ReplicaId replicaId); + void onFileCopyComplete(ReplicaId replicaId); + /** * Deactivation on given replica is complete. * @param replicaId the replica which completes deactivation. diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java index 83c5f43618..c43bccd5e1 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java @@ -41,5 +41,13 @@ public enum StateModelListenerType { * leadership hand-off occurs. For example, if any replica becomes LEADER from STANDBY, it is supposed to replicate * data from VCR nodes. This is part of two-way replication between Ambry and cloud. */ - CloudToStoreReplicationManagerListener + CloudToStoreReplicationManagerListener, + + /** + * The partition state change listener owned by Helix participant. It takes actions when partition state transition + * occurs. + */ + FileCopyManagerListener + + } diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java index 7df1ee7a94..6f89632e40 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateTransitionException.java @@ -73,6 +73,8 @@ public enum TransitionErrorCode { /** * If the resource name is not a numeric number. */ - InvalidResourceName + InvalidResourceName, + + FileCopyFailure } } diff --git a/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java new file mode 100644 index 0000000000..3a7f41fcb6 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java @@ -0,0 +1,31 @@ +package com.github.ambry.config; + +public class FileCopyConfig { + + public static final String PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK = "parallel.partition.hydration.count.per.disk"; + @Config(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK) + public final int parallelPartitionHydrationCountPerDisk; + + public static final String NUMBER_OF_FILE_COPY_THREADS = "number.of.file.copy.threads"; + @Config(NUMBER_OF_FILE_COPY_THREADS) + public final int numberOfFileCopyThreads; + + public static final String FILE_CHUNK_TIMEOUT_IN_MINUTES = "file.chunk.timeout.in.minutes"; + @Config(FILE_CHUNK_TIMEOUT_IN_MINUTES) + public final long fileChunkTimeoutInMins; + + /** + * The frequency at which the data gets flushed to disk + */ + public static final String STORE_DATA_FLUSH_INTERVAL_IN_MBS = "store.data.flush.size.In"; + @Config("store.data.flush.size.In") + @Default("60") + public final long storeDataFlushIntervalSeconds; + + public FileCopyConfig(VerifiableProperties verifiableProperties) { + parallelPartitionHydrationCountPerDisk = verifiableProperties.getInt(PARALLEL_PARTITION_HYDRATION_COUNT_PER_DISK, 1); + numberOfFileCopyThreads = verifiableProperties.getInt(NUMBER_OF_FILE_COPY_THREADS, 4); + fileChunkTimeoutInMins = verifiableProperties.getInt(FILE_CHUNK_TIMEOUT_IN_MINUTES, 5); + storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.size", 60); + } +} diff --git a/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java b/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java new file mode 100644 index 0000000000..5d3b6a2ae1 --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/ServerReplicationMode.java @@ -0,0 +1,6 @@ +package com.github.ambry.config; + +public enum ServerReplicationMode { + BLOB_BASED, + FILE_BASED; +} diff --git a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java index 32b48e5dab..f022d0f172 100644 --- a/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java +++ b/ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java @@ -674,6 +674,14 @@ public class StoreConfig { public final boolean storeBlockStaleBlobStoreToStart; public final static String storeBlockStaleBlobStoreToStartName = "store.block.stale.blob.store.to.start"; + /** + * Config to Decide Replication Protocol For Hydration Of Newly Added Replicas + */ + public static final String SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION = "server.replication.protocol.for.hydration"; + @Config(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION) + public final ServerReplicationMode serverReplicationProtocolForHydration; + + /** * Whether to attempt reshuffling of reordered disks and subsequent process termination. */ @@ -683,6 +691,8 @@ public class StoreConfig { public final static String storeReshuffleDisksOnReorderName = "store.reshuffle.disks.on.reorder"; public StoreConfig(VerifiableProperties verifiableProperties) { + serverReplicationProtocolForHydration = verifiableProperties.getEnum(SERVER_REPLICATION_PROTOCOL_FOR_HYDRATION, + ServerReplicationMode.class, ServerReplicationMode.BLOB_BASED); storeKeyFactory = verifiableProperties.getString("store.key.factory", "com.github.ambry.commons.BlobIdFactory"); storeDataFlushIntervalSeconds = verifiableProperties.getLong("store.data.flush.interval.seconds", 60); storeIndexMaxMemorySizeBytes = verifiableProperties.getInt("store.index.max.memory.size.bytes", 20 * 1024 * 1024); diff --git a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java index 1c268fd9eb..b393a53f9f 100644 --- a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java +++ b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java @@ -89,6 +89,13 @@ public interface RequestAPI { */ void handleReplicaMetadataRequest(NetworkRequest request) throws IOException, InterruptedException; + /** + * + * @param request + * @throws IOException + * @throws InterruptedException + */ + /** * Replicate one specific Blob from a remote host to the local store. * @param request The request that contains the remote host information and the blob id to be replicated. @@ -116,4 +123,12 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException { throw new UnsupportedOperationException("Undelete request not supported on this node"); } + + default void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException{ + throw new UnsupportedOperationException("File Meta Data request not supported on this node"); + } + + default void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException{ + throw new UnsupportedOperationException("File Chunk request not supported on this node"); + } } diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index c25fb82092..6fa212b3e5 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -34,6 +34,12 @@ public interface StoreManager { */ boolean addBlobStore(ReplicaId replica); + /** + * Build state after filecopy is completed + * @param partitionName the partition id for which state is to be built.. + */ + void buildStateForFileCopy(String partitionName); + /** * Remove store from storage manager. * @param id the {@link PartitionId} associated with store diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index a33cf80c16..bcc9fa87af 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -56,6 +56,10 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric public boolean addBlobStore(ReplicaId replica) { return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null; } + @Override + public void buildStateForFileCopy(String partitionName){ + // no-op + } @Override public boolean shutdownBlobStore(PartitionId id) { diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java index bda9b7af0b..6e6e7cedb1 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/AmbryReplicaSyncUpManager.java @@ -41,6 +41,9 @@ public class AmbryReplicaSyncUpManager implements ReplicaSyncUpManager { private static final Logger logger = LoggerFactory.getLogger(AmbryReplicaSyncUpManager.class); private final ConcurrentHashMap partitionToBootstrapLatch = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap partitionToFileCopyLatch = new ConcurrentHashMap<>(); + private final ConcurrentHashMap partitionToFileCopySuccessLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToDeactivationLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToDisconnectionLatch = new ConcurrentHashMap<>(); private final ConcurrentHashMap partitionToBootstrapSuccess = new ConcurrentHashMap<>(); @@ -63,6 +66,12 @@ public void initiateBootstrap(ReplicaId replicaId) { ReplicaState.BOOTSTRAP)); } + @Override + public void initiateFileCopy(ReplicaId replicaId){ + partitionToFileCopyLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), false); + } + @Override public void initiateDeactivation(ReplicaId replicaId) { partitionToDeactivationLatch.put(replicaId.getPartitionId().toPathString(), new CountDownLatch(1)); @@ -101,6 +110,22 @@ public void waitBootstrapCompleted(String partitionName) throws InterruptedExcep } } + @Override + public void waitForFileCopyCompleted(String partitionName) throws InterruptedException { + CountDownLatch latch = partitionToFileCopyLatch.get(partitionName); + if(latch == null) { + logger.info("Skipping file copy for existing partition {}", partitionName); + } else{ + logger.info("Waiting for new partition to {} to comeplete FileCopy", partitionName); + latch.await(); + partitionToFileCopyLatch.remove(partitionName); + if(!partitionToFileCopySuccessLatch.remove(partitionName)){ + throw new StateTransitionException("Partition " + partitionName + " failed to copy files.", FileCopyFailure); + } + logger.info("File Copy is complete on partition {}", partitionName); + } + } + @Override public void waitDeactivationCompleted(String partitionName) throws InterruptedException { CountDownLatch latch = partitionToDeactivationLatch.get(partitionName); @@ -192,6 +217,12 @@ public void onBootstrapComplete(ReplicaId replicaId) { countDownLatch(partitionToBootstrapLatch, replicaId.getPartitionId().toPathString()); } + @Override + public void onFileCopyComplete(ReplicaId replicaId){ + partitionToFileCopySuccessLatch.put(replicaId.getPartitionId().toPathString(), true); + countDownLatch(partitionToFileCopyLatch, replicaId.getPartitionId().toPathString()); + } + @Override public void onDeactivationComplete(ReplicaId replicaId) { partitionToDeactivationSuccess.put(replicaId.getPartitionId().toPathString(), true); diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index e94b5b9260..c43d702f67 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -870,6 +870,18 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { if (storageManagerListener != null) { storageManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); } + + /** + * Should be invoked after storage manager listener to ensure that the replica is added to the store. + * Conditional execution based on requirement for File Copy. + */ + PartitionStateChangeListener fileCopyManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener); + if(fileCopyManagerListener != null){ + fileCopyManagerListener.onPartitionBecomeBootstrapFromOffline(partitionName); + replicaSyncUpManager.waitForFileCopyCompleted(partitionName); + } + // 2. take actions in replication manager (add new replica if necessary) PartitionStateChangeListener replicationManagerListener = partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); diff --git a/ambry-file-transfer/build.gradle b/ambry-file-transfer/build.gradle new file mode 100644 index 0000000000..3f5b11c5d3 --- /dev/null +++ b/ambry-file-transfer/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'java' +} + +group = 'com.github.ambry' +version = '0.4.512' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java new file mode 100644 index 0000000000..68afc08da0 --- /dev/null +++ b/ambry-file-transfer/src/main/java/com/github/ambry/FileCopyManager.java @@ -0,0 +1,79 @@ +package com.github.ambry; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterParticipant; +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.PartitionStateChangeListener; +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.config.StoreConfig; +import com.github.ambry.network.NetworkClientFactory; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.StoreKeyFactory; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class FileCopyManager { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + public FileCopyManager(PrioritisationManager prioritisationManager, FileCopyConfig fileCopyConfig, ClusterMapConfig clusterMapConfig, + StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, + ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory, + MetricRegistry metricRegistry, ClusterParticipant clusterParticipant) { + if (clusterParticipant != null) { + clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, + new PartitionStateChangeListenerImpl()); + logger.info("File Copy Manager's state change listener registered!"); + } + if(!prioritisationManager.isRunning()) { + prioritisationManager.start(); + } + } + public void start() throws InterruptedException, IOException { + + } + class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + + @Override + public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + // StateBuilding (storeManager.buildStateForFileCopy()) will be triggered at the end of FCM's async handler. + } + + @Override + public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + + } + + @Override + public void onPartitionBecomeLeaderFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeStandbyFromLeader(String partitionName) { + + } + + @Override + public void onPartitionBecomeInactiveFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeOfflineFromInactive(String partitionName) { + + } + + @Override + public void onPartitionBecomeDroppedFromOffline(String partitionName) { + + } + } +} + +} diff --git a/ambry-prioritisation/build.gradle b/ambry-prioritisation/build.gradle new file mode 100644 index 0000000000..48a562dddd --- /dev/null +++ b/ambry-prioritisation/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'java' +} + +group = 'com.github.ambry' +version = '0.4.514' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java b/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java new file mode 100644 index 0000000000..c77e7acbc1 --- /dev/null +++ b/ambry-prioritisation/src/main/java/com/github/ambry/PrioritisationManager.java @@ -0,0 +1,62 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.AmbryPartition; +import com.github.ambry.clustermap.DiskId; +import com.github.ambry.clustermap.ReplicaId; +import java.util.HashMap; +import java.util.Map; + + +public class PrioritisationManager { + private Map diskToReplicaQueue; + + private boolean running; + public PrioritisationManager() { + diskToReplicaQueue = new HashMap<>(); + running = false; + // Constructor + } + + public void start() { + running = true; + } + + public boolean isRunning(){ + return running; + // Start the PrioritisationManager + } + + public void shutdown() { + // Shutdown the PrioritisationManager + } + + public void addReplica(String partitionName) { + // Add a replica to the PrioritisationManager + } + + public void removeReplica(String partitionName) { + // Remove a task from the PrioritisationManager + } + + public void updatePartitionState(String partitionName) { + // Update the state of a task in the PrioritisationManager + } + + public void updatePartitionProgress(String partitionName) { + // Update the progress of a task in the PrioritisationManager + } + + public void updatePartitionResult() { + // Update the result of a task in the PrioritisationManager + } + + public String getPartitionForDisk(DiskId diskId){ + // Get a partition from the PrioritisationManager + return null; + } + + public String getReplica(String partitionName) { + // Get a replica from the PrioritisationManager + return null; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java new file mode 100644 index 0000000000..a0483174a5 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java @@ -0,0 +1,93 @@ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileChunkRequest extends RequestOrResponse{ + + private String hostName; + private PartitionId partitionId; + private String fileName; + private long startOffset; + private long size; + + private static final short File_Chunk_Request_Version_V1 = 1; + + private static final int HostName_Field_Size_In_Bytes = 4; + private static final int FileName_Field_Size_In_Bytes = 4; + + public FileChunkRequest(short versionId, int correlationId, String clientId,PartitionId partitionId, String hostName, String fileName, long startOffset, long size) { + super(RequestOrResponseType.FileChunkRequest, versionId, correlationId, clientId); + this.hostName = hostName; + this.partitionId = partitionId; + this.fileName = fileName; + this.startOffset = startOffset; + this.size = size; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileChunkRequest[").append(", HostName=").append(hostName).append("PartitionId=").append(partitionId) + .append(", FileName=").append(fileName).append(", StartOffset=").append(startOffset).append(", Size=").append(size).append("]"); + return sb.toString(); + } + + public String getHostName() { + return hostName; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + public String getFileName() { + return fileName; + } + + public long getStartOffset() { + return startOffset; + } + + public long getSize() { + return size; + } + + public long sizeInBytes() { + return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length + + FileName_Field_Size_In_Bytes + fileName.length() + Long.BYTES + Long.BYTES; + } + + protected void prepareBuffer() { + super.prepareBuffer(); + + Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); + Utils.serializeString(bufferToSend, fileName, Charset.defaultCharset()); + bufferToSend.writeBytes(partitionId.getBytes()); + bufferToSend.writeLong(startOffset); + bufferToSend.writeLong(size); + } + + protected static FileChunkRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + String hostName = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + String fileName = Utils.readIntString(stream); + long startOffset = stream.readLong(); + long size = stream.readLong(); + return new FileChunkRequest(versionId, correlationId, clientId, partitionId, hostName, fileName, startOffset, size); + } + + static void validateVersion(short version) { + if (version != File_Chunk_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileChunkRequest: " + version); + } + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java new file mode 100644 index 0000000000..ff9ec85ee7 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java @@ -0,0 +1,5 @@ +package com.github.ambry.protocol; + +public class FileChunkResponse { + //to be Filled After confirmation +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java new file mode 100644 index 0000000000..6574b18d68 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileInfo.java @@ -0,0 +1,51 @@ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileInfo{ + private String fileName; + private long fileSizeInBytes; + + private static final int FileName_Field_Size_In_Bytes = 4; + + private static final int FileSize_Field_Size_In_Bytes = 8; + + + public FileInfo(String fileName, long fileSize) { + this.fileName = fileName; + this.fileSizeInBytes = fileSize; + } + + public long sizeInBytes() { + return FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes; + } + public static FileInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream); + long fileSize = stream.readLong(); + return new FileInfo(fileName, fileSize); + } + public void writeTo(ByteBuf buf) { + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileInfo[").append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes) + .append("]"); + return sb.toString(); + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java new file mode 100644 index 0000000000..703ebbd1a4 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java @@ -0,0 +1,68 @@ +package com.github.ambry.protocol; + +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.PartitionId; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; + + +public class FileMetaDataRequest extends RequestOrResponse{ + private PartitionId partitionId; + private String hostName; + + private static final short File_Metadata_Request_Version_V1 = 1; + private static final int HostName_Field_Size_In_Bytes = 4; + + public FileMetaDataRequest(short versionId, int correlationId, String clientId, + PartitionId partitionId, String hostName) { + super(RequestOrResponseType.FileMetaDataRequest, versionId, correlationId, clientId); + if (partitionId == null || hostName.isEmpty()) { + throw new IllegalArgumentException("Partition and Host Name cannot be null"); + } + this.partitionId = partitionId; + this.hostName = hostName; + } + + public String getHostName() { + return hostName; + } + + public PartitionId getPartitionId() { + return partitionId; + } + + protected static FileMetaDataRequest readFrom(DataInputStream stream, ClusterMap clusterMap) throws IOException { + Short versionId = stream.readShort(); + validateVersion(versionId); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + String hostName = Utils.readIntString(stream); + PartitionId partitionId = clusterMap.getPartitionIdFromStream(stream); + return new FileMetaDataRequest(versionId, correlationId, clientId, partitionId, hostName); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataRequest[").append("PartitionId=").append(partitionId).append(", HostName=").append(hostName) + .append("]"); + return sb.toString(); + } + + public long sizeInBytes() { + return super.sizeInBytes() + HostName_Field_Size_In_Bytes + hostName.length() + partitionId.getBytes().length; + } + + protected void prepareBuffer() { + super.prepareBuffer(); + Utils.serializeString(bufferToSend, hostName, Charset.defaultCharset()); + bufferToSend.writeBytes(partitionId.getBytes()); + } + + static void validateVersion(short version) { + if (version != File_Metadata_Request_Version_V1) { + throw new IllegalArgumentException("Unknown version for FileMetadataRequest: " + version); + } + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java new file mode 100644 index 0000000000..c9d8deb791 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java @@ -0,0 +1,64 @@ +package com.github.ambry.protocol; +import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.Utils; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +public class FileMetaDataResponse extends Response { + private int numberOfLogfiles; + private List logInfoList; + + public FileMetaDataResponse(short versionId, int correlationId, String clientId, int numberOfLogfiles, + List logInfoList, ServerErrorCode errorCode) { + super(RequestOrResponseType.FileMetaDataResponse, versionId, correlationId, clientId, errorCode); + this.numberOfLogfiles = numberOfLogfiles; + this.logInfoList = logInfoList; + } + + public static FileMetaDataResponse readFrom(DataInputStream stream) throws IOException { + RequestOrResponseType type = RequestOrResponseType.values()[stream.readShort()]; + if (type != RequestOrResponseType.FileMetaDataResponse) { + throw new IllegalArgumentException("The type of request response is not compatible"); + } + short versionId = stream.readShort(); + int correlationId = stream.readInt(); + String clientId = Utils.readIntString(stream); + ServerErrorCode errorCode = ServerErrorCode.values()[stream.readShort()]; + int numberOfLogfiles = stream.readInt(); + int logInfoListSize = stream.readInt(); + List logInfoList = new ArrayList<>(); + for (int i = 0; i < logInfoListSize; i++) { + logInfoList.add(LogInfo.readFrom(stream)); + } + return new FileMetaDataResponse(versionId, correlationId, clientId, numberOfLogfiles, logInfoList, errorCode); + } + protected void prepareBuffer() { + super.prepareBuffer(); + bufferToSend.writeInt(numberOfLogfiles); + bufferToSend.writeInt(logInfoList.size()); + for (LogInfo logInfo : logInfoList) { + logInfo.writeTo(bufferToSend); + } + } + + public long sizeInBytes() { + return super.sizeInBytes() + Integer.BYTES + Integer.BYTES + logInfoList.stream().mapToLong(LogInfo::sizeInBytes).sum(); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("FileMetaDataResponse[NumberOfLogfiles=").append(numberOfLogfiles).append(", logInfoList").append(logInfoList.toString()).append("]"); + return sb.toString(); + } + + public int getNumberOfLogfiles() { + return numberOfLogfiles; + } + + public List getLogInfoList() { + return logInfoList; + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java new file mode 100644 index 0000000000..54a53045a5 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/LogInfo.java @@ -0,0 +1,103 @@ +package com.github.ambry.protocol; + +import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + + +public class LogInfo { + private String fileName; + private long fileSizeInBytes; + List listOfIndexFiles; + List listOfBloomFilters; + + private static final int FileName_Field_Size_In_Bytes = 4; + private static final int FileSize_Field_Size_In_Bytes = 8; + + private static final int ListSize_In_Bytes = 4; + public LogInfo(String fileName, long fileSizeInBytes, List listOfIndexFiles, List listOfBloomFilters) { + this.fileName = fileName; + this.fileSizeInBytes = fileSizeInBytes; + this.listOfIndexFiles = listOfIndexFiles; + this.listOfBloomFilters = listOfBloomFilters; + } + + public String getFileName() { + return fileName; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public List getListOfBloomFilters() { + return listOfBloomFilters; + } + + public List getListOfIndexFiles() { + return listOfIndexFiles; + } + + public long sizeInBytes() { + long size = FileName_Field_Size_In_Bytes + fileName.length() + FileSize_Field_Size_In_Bytes + ListSize_In_Bytes; + for (FileInfo fileInfo : listOfIndexFiles) { + size += fileInfo.sizeInBytes(); + } + size += ListSize_In_Bytes; + for (FileInfo fileInfo : listOfBloomFilters) { + size += fileInfo.sizeInBytes(); + } + return size; + } + + public static LogInfo readFrom(DataInputStream stream) throws IOException { + String fileName = Utils.readIntString(stream ); + long fileSize = stream.readLong(); + List listOfIndexFiles = new ArrayList<>(); + List listOfBloomFilters = new ArrayList<>(); + + int indexFilesCount = stream.readInt(); + for (int i = 0; i < indexFilesCount; i++) { + listOfIndexFiles.add(FileInfo.readFrom(stream)); + } + + int bloomFiltersCount = stream.readInt(); + for(int i= 0;i< bloomFiltersCount; i++){ + listOfBloomFilters.add(FileInfo.readFrom(stream)); + } + return new LogInfo(fileName, fileSize, listOfIndexFiles, listOfBloomFilters); + } + + public void writeTo(ByteBuf buf){ + Utils.serializeString(buf, fileName, Charset.defaultCharset()); + buf.writeLong(fileSizeInBytes); + buf.writeInt(listOfIndexFiles.size()); + for(FileInfo fileInfo : listOfIndexFiles){ + fileInfo.writeTo(buf); + } + buf.writeInt(listOfBloomFilters.size()); + for(FileInfo fileInfo: listOfBloomFilters){ + fileInfo.writeTo(buf); + } + } + + public String toString(){ + StringBuilder sb = new StringBuilder(); + sb.append("LogInfo["); + sb.append("FileName=").append(fileName).append(", FileSizeInBytes=").append(fileSizeInBytes).append(","); + for(FileInfo fileInfo : listOfIndexFiles) { + sb.append(fileInfo.toString()); + } + for(FileInfo fileInfo: listOfBloomFilters){ + sb.append(fileInfo.toString()); + } + sb.append("]"); + return sb.toString(); + } + + +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 115562bd9a..89078977ec 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -37,5 +37,11 @@ public enum RequestOrResponseType { PurgeRequest, PurgeResponse, BatchDeleteRequest, - BatchDeleteResponse + BatchDeleteResponse, + + FileMetaDataRequest, + FileMetaDataResponse, + FileChunkRequest, + FileChunkResponse, + } diff --git a/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java b/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java index bf939b9d15..5521788b3a 100644 --- a/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com/github/ambry/replication/InMemoryStore.java @@ -452,6 +452,11 @@ public boolean isBootstrapInProgress() { throw new UnsupportedOperationException("Method not supported"); } + @Override + public boolean isFileCopyInProgress() { + return false; + } + @Override public boolean isDecommissionInProgress() { throw new UnsupportedOperationException("Method not supported"); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a3d5553abc..0817296658 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -234,6 +234,16 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; + case FileMetaDataRequest: + handleFileMetaDataRequest(networkRequest); + break; + + case FileChunkResponse: + handleFileMetaDataResponse(networkRequest); + break; + case FileChunkRequest: + handleFileChunkRequest(networkRequest); + break; default: throw new UnsupportedOperationException("Request type not supported"); } @@ -1672,6 +1682,15 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + @Override + public void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + //geStore, Call Blob Store APIs here. + } + @Override + public void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException { + + } + /** * Get the formatted messages which needs to be written to Store. * @param receivedRequest received Put Request diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 428bb55af2..d2b68e4d7e 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -15,6 +15,8 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; +import com.github.ambry.FileCopyManager; +import com.github.ambry.PrioritisationManager; import com.github.ambry.account.AccountService; import com.github.ambry.account.AccountServiceCallback; import com.github.ambry.account.AccountServiceFactory; @@ -43,6 +45,7 @@ import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.Http2ClientConfig; import com.github.ambry.config.NettyConfig; import com.github.ambry.config.NetworkConfig; @@ -223,6 +226,8 @@ public void startup() throws InstantiationException { SSLConfig sslConfig = new SSLConfig(properties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); StatsManagerConfig statsConfig = new StatsManagerConfig(properties); + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + // verify the configs properties.verify(); @@ -288,6 +293,7 @@ public void startup() throws InstantiationException { new StorageManager(storeConfig, diskManagerConfig, scheduler, registry, storeKeyFactory, staticClusterManager, nodeId, new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService); storageManager.start(); + /** * Backup integrity monitor here because vcr does not have code to store to disk. Only server does. * DataNodeId -> AmbryDataNode -> AmbryServerDataNode : for helix @@ -327,6 +333,13 @@ public void startup() throws InstantiationException { new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService); storageManager.start(); + PrioritisationManager prioritisationManager= new PrioritisationManager(); + FileCopyManager + fileCopyManager = new FileCopyManager(prioritisationManager, fileCopyConfig, clusterMapConfig, storeConfig, storageManager, storeKeyFactory, + clusterMap, scheduler, nodeId, networkClientFactory, registry, clusterParticipant); + fileCopyManager.start(); + + // if there are more than one participant on local node, we create a consistency checker to monitor and alert any // mismatch in sealed/stopped replica lists that maintained by each participant. if (clusterParticipants != null && clusterParticipants.size() > 1 diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java index b9b1331ec2..d185e52aa4 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServerRequests.java @@ -128,7 +128,9 @@ public class AmbryServerRequests extends AmbryRequests { for (RequestOrResponseType requestType : EnumSet.of(RequestOrResponseType.PutRequest, RequestOrResponseType.GetRequest, RequestOrResponseType.DeleteRequest, RequestOrResponseType.BatchDeleteRequest, RequestOrResponseType.UndeleteRequest, RequestOrResponseType.ReplicaMetadataRequest, - RequestOrResponseType.TtlUpdateRequest)) { + RequestOrResponseType.TtlUpdateRequest, RequestOrResponseType.FileMetaDataRequest, + RequestOrResponseType.FileMetaDataResponse, RequestOrResponseType.FileChunkRequest, + RequestOrResponseType.FileChunkResponse)) { requestsDisableInfo.put(requestType, Collections.newSetFromMap(new ConcurrentHashMap<>())); } StoreKeyJacksonConfig.setupObjectMapper(objectMapper, new BlobIdFactory(clusterMap)); diff --git a/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java b/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java index 8b5ffd4925..e1b7e8ef42 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java +++ b/ambry-server/src/test/java/com/github/ambry/server/StatsManagerTest.java @@ -669,6 +669,11 @@ public boolean isBootstrapInProgress() { throw new IllegalStateException("Not implemented"); } + @Override + public boolean isFileCopyInProgress() { + return false; + } + @Override public boolean isDecommissionInProgress() { throw new IllegalStateException("Not implemented"); diff --git a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java index 3a72a26d31..272bc3e6b7 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java +++ b/ambry-store/src/main/java/com/github/ambry/store/BlobStore.java @@ -28,6 +28,7 @@ import com.github.ambry.messageformat.MessageFormatWriteSet; import com.github.ambry.messageformat.TtlUpdateMessageFormatInputStream; import com.github.ambry.messageformat.UndeleteMessageFormatInputStream; +import com.github.ambry.protocol.FileInfo; import com.github.ambry.replication.FindToken; import com.github.ambry.utils.FileLock; import com.github.ambry.utils.SystemTime; @@ -40,6 +41,7 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -56,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1318,6 +1321,24 @@ public void shutdown() throws StoreException { shutdown(false); } + public List getSealedLogsAndMetaDataFiles(){ + List logSegments = log.getAllLogSegmentNames().stream().filter(segment -> log.getActiveSegment().getName() != segment) + .map(segment -> log.getSegment(segment)) + .map(segment -> new FileInfo(segment.getName().toString(), segment.getView().getFirst().length())).collect(Collectors.toList()); + return logSegments; + } + + public List getAllIndexSegmentsForALogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexSegmentFilesForLogSegment(dataDir, logSegmentName)) + .map(file -> new FileInfo(file.getName(), file.length())).collect( + Collectors.toList()); + } + + public List getAllBloomFiltersForALogSegment(String dataDir, LogSegmentName logSegmentName){ + return Arrays.stream(PersistentIndex.getIndexAndBloomFilterFiles(dataDir, logSegmentName)) + .map(file -> new FileInfo(file.getName(), file.length())).collect(Collectors.toList()); + } + /** * Update the sealed status of the replica. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/FileStore.java b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java new file mode 100644 index 0000000000..845d6b96ea --- /dev/null +++ b/ambry-store/src/main/java/com/github/ambry/store/FileStore.java @@ -0,0 +1,41 @@ +package com.github.ambry.store; + +import com.github.ambry.clustermap.FileStoreException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.ConcurrentHashMap; + + +public class FileStore { + private static boolean isRunning = false; + + public FileStore() { + } + + public ConcurrentHashMap fileNameToFileChannelMap; + + public void start() throws StoreException { + isRunning = true; + } + public boolean isRunning() { + return isRunning; + } + public void stop() { + isRunning = false; + } + + public void putChunkToFile(String mountPath, String fileName, ByteBuffer byteBuffer, long offset, long size){ + if(!isRunning){ + throw new FileStoreException("FileStore is not running", File); + } + if(byteBuffer == null){ + throw new IllegalArgumentException("ByteBuffer is null"); + } + FileChannel currentFileBuffer = fileNameToFileChannelMap.get(fileName); + if(currentFileBuffer == null){ + throw new IllegalArgumentException("File not found"); + } + + //long currentOffset = + } +} diff --git a/ambry-store/src/main/java/com/github/ambry/store/Log.java b/ambry-store/src/main/java/com/github/ambry/store/Log.java index 4d76814d1d..c5a8fe9c81 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/Log.java +++ b/ambry-store/src/main/java/com/github/ambry/store/Log.java @@ -298,6 +298,10 @@ LogSegment getSegment(LogSegmentName name) { return segmentsByName.get(name); } + + LogSegment getActiveSegment() { + return activeSegment; + } /** * @return the end offset of the log abstraction. */ diff --git a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java index 0290013cf1..445420f71f 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java +++ b/ambry-store/src/main/java/com/github/ambry/store/PersistentIndex.java @@ -2677,6 +2677,18 @@ public boolean accept(File dir, String name) { */ static void cleanupIndexSegmentFilesForLogSegment(String dataDir, final LogSegmentName logSegmentName) throws StoreException { + File[] filesToCleanup = getIndexAndBloomFilterFiles(dataDir, logSegmentName); + if (filesToCleanup == null) { + throw new StoreException("Failed to list index segment files", StoreErrorCodes.IOError); + } + for (File file : filesToCleanup) { + if (!file.delete()) { + throw new StoreException("Could not delete file named " + file, StoreErrorCodes.Unknown_Error); + } + } + } + + static File[] getIndexAndBloomFilterFiles(String dataDir, LogSegmentName logSegmentName){ File[] filesToCleanup = new File(dataDir).listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -2684,20 +2696,12 @@ public boolean accept(File dir, String name) { return (name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( IndexSegment.BLOOM_FILE_NAME_SUFFIX)); } else { - return name.startsWith(logSegmentName.toString() + BlobStore.SEPARATOR) && ( - name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( - IndexSegment.BLOOM_FILE_NAME_SUFFIX)); + return name.startsWith(logSegmentName.toString() + BlobStore.SEPARATOR) && (name.endsWith(IndexSegment.INDEX_SEGMENT_FILE_NAME_SUFFIX) || name.endsWith( + IndexSegment.BLOOM_FILE_NAME_SUFFIX)); } } }); - if (filesToCleanup == null) { - throw new StoreException("Failed to list index segment files", StoreErrorCodes.IOError); - } - for (File file : filesToCleanup) { - if (!file.delete()) { - throw new StoreException("Could not delete file named " + file, StoreErrorCodes.Unknown_Error); - } - } + return filesToCleanup; } class IndexPersistor implements Runnable { diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index d3b279e3a1..b2ef4fe2c4 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -530,6 +530,10 @@ public boolean addBlobStore(ReplicaId replica) { logger.info("New store is successfully added into StorageManager"); return true; } + @Override + public void buildStateForFileCopy(String partitionName){ + // no-op + } /** * If a bootstrap replica fails, try to remove all the files and directories associated with it. diff --git a/settings.gradle b/settings.gradle index d19582e993..3887cdca08 100644 --- a/settings.gradle +++ b/settings.gradle @@ -31,4 +31,6 @@ include 'ambry-api', 'ambry-mysql', 'ambry-filesystem', 'ambry-vcr' +include 'ambry-file-transfer' +include 'ambry-prioritisation'