Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamically removing store from storage manager #1232

Merged
merged 7 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,34 @@ CompactionDetails getCompactionDetails(CompactionPolicy compactionPolicy) throws
LogSegment.HEADER_SIZE, index.getLogSegmentsNotInJournal(), blobStoreStats);
}

/**
* Delete files of this store.
* This is the last step to remove store from this node. Return swap segments (if any) to reserve pool and delete all
* files/dirs associated with this store. This method is invoked by transition in AmbryStateModel (OFFLINE -> DROPPED)
*/
public void deleteStoreFiles() throws StoreException, IOException {
// step0: ensure the store has been shut down
if (started) {
throw new IllegalStateException("Store is still started. Deleting store files is not allowed.");
}
// step1: return occupied swap segments (if any) to reserve pool
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
String[] swapSegmentsInUse = compactor.getSwapSegmentsInUse();
for (String fileName : swapSegmentsInUse) {
logger.info("Returning swap segment {} to reserve pool", fileName);
File swapSegmentTempFile = new File(dataDir, fileName);
diskSpaceAllocator.free(swapSegmentTempFile, config.storeSegmentSizeInBytes, storeId, true);
}
// step2: delete all files
logger.info("Deleting store {} directory", storeId);
File storeDir = new File(dataDir);
try {
Utils.deleteFileOrDirectory(storeDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method already throws IOException, so you probably don't have to catch the exception, unless you want dataDir to be in the message.

Also, I would recommend setting the cause exception to e if you keep the catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me of this. I prefer to keep this catch in case there is any non-IOException. Also, I would take your advice to set the exception e

} catch (Exception e) {
throw new IOException("Couldn't delete store directory " + dataDir);
}
logger.info("All files of store {} deleted", storeId);
}

@Override
public void shutdown() throws StoreException {
shutdown(false);
Expand Down Expand Up @@ -751,9 +779,10 @@ AtomicInteger getErrorCount() {
*/
DiskSpaceRequirements getDiskSpaceRequirements() throws StoreException {
checkStarted();
DiskSpaceRequirements requirements = log.isLogSegmented() ? new DiskSpaceRequirements(
replicaId.getPartitionId().toPathString(), log.getSegmentCapacity(),
log.getRemainingUnallocatedSegments(), compactor.getSwapSegmentsInUse()) : null;
DiskSpaceRequirements requirements =
log.isLogSegmented() ? new DiskSpaceRequirements(replicaId.getPartitionId().toPathString(),
log.getSegmentCapacity(), log.getRemainingUnallocatedSegments(), compactor.getSwapSegmentsInUse().length)
: null;
logger.info("Store {} has disk space requirements: {}", storeId, requirements);
return requirements;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,15 @@ A single compaction job could be performed across multiple compaction cycles (if
}

/**
* @return the number of temporary log segment files this compactor is currently using.
* @return an array of temporary log segment files this compactor is currently using.
*/
int getSwapSegmentsInUse() throws StoreException {
// TODO: use this method to return swap segment to pool when removing store
String[] getSwapSegmentsInUse() throws StoreException {
String[] tempSegments = dataDir.list(TEMP_LOG_SEGMENTS_FILTER);
if (tempSegments == null) {
throw new StoreException("Error occurred while listing files in data dir:" + dataDir.getAbsolutePath(),
StoreErrorCodes.IOError);
}
return tempSegments.length;
return tempSegments;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ void controlCompactionForBlobStore(BlobStore store, boolean enable) {
}
}

/**
* Remove store from compaction manager.
* @param store the {@link BlobStore} to remove
* @return {@code true} if store is removed successfully. {@code false} if not.
*/
boolean removeBlobStore(BlobStore store) {
boolean result;
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
if (compactionExecutor == null) {
stores.remove(store);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once #1226 is merged, stores will become concurrent set which should be ok here.

result = true;
} else if (!compactionExecutor.getStoresDisabledCompaction().contains(store)) {
logger.error("Fail to remove store ({}) from compaction manager because compaction of it is still enabled",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should be an IllegalStateException. Is there a valid scenario where compaction is still enabled when removeStore is called? If we return false and compaction is turned off later, will the store ever be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your point, however, I am trying to use same way in scheduleNextForCompaction method (line 360). Returning false may happen when disabling compaction on the store hasn't completed or hasn't been executed. The Helix state model will retry entire workflow (disable compaction, remove store) to guarantee the store is correctly removed.

store);
result = false;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove this nested else and use else if (!compact...

// stores.remove(store) is invoked within compactionExecutor.removeBlobStore() because it requires lock
compactionExecutor.removeBlobStore(store);
result = true;
}
return result;
}

/**
* Get compaction details for a given {@link BlobStore} if any
* @param blobStore the {@link BlobStore} for which compaction details are requested
Expand Down Expand Up @@ -365,6 +387,31 @@ void controlCompactionForBlobStore(BlobStore store, boolean enable) {
storesDisabledCompaction.add(store);
}
}

/**
* Remove store from compaction executor.
* @param store the {@link BlobStore} to remove
*/
void removeBlobStore(BlobStore store) {
lock.lock();
try {
stores.remove(store);
// It's ok to remove store from "storesDisabledCompaction" and "storesToSkip" list while executor thread is
// going through each store to check compaction eligibility. Note that the executor will first check if store
// is started, which is guaranteed to be false before removeBlobStore() is invoked.
storesDisabledCompaction.remove(store);
storesToSkip.remove(store);
} finally {
lock.unlock();
}
}

/**
* @return a list of stores on which compaction is disabled.
*/
Set<BlobStore> getStoresDisabledCompaction() {
return storesDisabledCompaction;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void start() throws InterruptedException {
partitionAndStore.getValue().start();
} catch (Exception e) {
numStoreFailures.incrementAndGet();
logger.error("Exception while starting store for the partition" + partitionAndStore.getKey(), e);
logger.error("Exception while starting store for the " + partitionAndStore.getKey(), e);
}
}, false);
thread.start();
Expand Down Expand Up @@ -359,6 +359,32 @@ boolean shutdownBlobStore(PartitionId id) {
}
}

/**
* Given partition id, remove the corresponding blob store in disk manager
* @param id the {@link PartitionId} of the {@link BlobStore} which should be removed.
* @return {@code true} if store removal was successful. {@code false} if not.
*/
boolean removeBlobStore(PartitionId id) {
BlobStore store = stores.get(id);
if (store == null) {
logger.info("Store {} is not found in disk manager", id);
return true;
}
if (!running || store.isStarted()) {
lightningrob marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Removing store {} failed. Disk running = {}, store running = {}", id, running, store.isStarted());
return false;
}
if (!compactionManager.removeBlobStore(store)) {
logger.error("Fail to remove store {} from compaction manager.", id);
return false;
}
stores.remove(id);
stoppedReplicas.remove(id.toPathString());
partitionToReplicaMap.remove(id);
logger.info("Store {} is successfully removed from disk manager", id);
return true;
}

/**
* Set the BlobStore stopped state with given {@link PartitionId} {@code id}.
* @param partitionIds a list of {@link PartitionId} of the {@link BlobStore} whose stopped state should be set.
Expand Down Expand Up @@ -441,6 +467,8 @@ boolean areAllStoresDown() {

/**
* Reports any unrecognized directories on disk
* TODO go to unrecognized dir and check if there is remove store event log. If yes, this method should clean up that
* store dir and return swap segment to reserve pool if needed.
*/
private void reportUnrecognizedDirs() {
File[] dirs = new File(disk.getMountPath()).listFiles(File::isDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) {

/**
* Shutdown the {@link DiskManager}s for the disks on this node.
* @throws StoreException
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
Expand Down Expand Up @@ -290,6 +289,26 @@ public boolean shutdownBlobStore(PartitionId id) {
return diskManager != null && diskManager.shutdownBlobStore(id);
}

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
* @return {@code true} if removal succeeds. {@code false} otherwise.
*/
public boolean removeBlobStore(PartitionId id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this method (and the ones in DiskManager and CompactionManager) to return success/failure booleans instead of throwing exceptions? With exceptions, you can log just once at the top level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly leaning towards current way to make it sort of consistent with scheduleNextForCompaction, controlCompactionForBlobStore etc. Returning boolean can be explicitly verified in unit tests.

DiskManager diskManager = partitionToDiskManager.get(id);
if (diskManager == null) {
logger.info("Store {} is not found in storage manager", id);
return true;
}
if (!diskManager.removeBlobStore(id)) {
logger.error("Fail to remove store {} from disk manager", id);
return false;
}
partitionToDiskManager.remove(id);
logger.info("Store {} is successfully removed from storage manager", id);
return true;
}

/**
* Set BlobStore Stopped state with given {@link PartitionId} {@code id}.
* @param partitionIds a list {@link PartitionId} of the {@link Store} whose stopped state should be set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void initializeCompactionThreadsTracker(final StorageManager storageManager, fin
* Deregister the Metrics related to the compaction thread.
*/
void deregisterCompactionThreadsTracker() {
registry.remove(MetricRegistry.name(CompactionManager.class, "CompactionsInProgress"));
registry.remove(MetricRegistry.name(StorageManager.class, "CompactionThreadsAlive"));
registry.remove(MetricRegistry.name(StorageManager.class, "CompactionHealth"));
}
Expand Down
18 changes: 10 additions & 8 deletions ambry-store/src/main/java/com.github.ambry.store/StoreMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Metrics for store operations
*/
public class StoreMetrics {
private static final String SEPERATOR = ".";
private static final String SEPARATOR = ".";

public final Timer getResponse;
public final Timer putResponse;
Expand Down Expand Up @@ -102,7 +102,7 @@ public StoreMetrics(MetricRegistry registry) {

public StoreMetrics(String prefix, MetricRegistry registry) {
this.registry = registry;
String name = !prefix.isEmpty() ? prefix + SEPERATOR : "";
String name = !prefix.isEmpty() ? prefix + SEPARATOR : "";
getResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreGetResponse"));
putResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StorePutResponse"));
deleteResponse = registry.timer(MetricRegistry.name(BlobStore.class, name + "StoreDeleteResponse"));
Expand Down Expand Up @@ -201,7 +201,7 @@ public StoreMetrics(String prefix, MetricRegistry registry) {
}

void initializeIndexGauges(String storeId, final PersistentIndex index, final long capacityInBytes) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
Gauge<Long> currentCapacityUsed = index::getLogUsedCapacity;
registry.register(MetricRegistry.name(Log.class, prefix + "CurrentCapacityUsed"), currentCapacityUsed);
Gauge<Double> percentageUsedCapacity = () -> ((double) index.getLogUsedCapacity() / capacityInBytes) * 100;
Expand All @@ -215,14 +215,16 @@ void initializeIndexGauges(String storeId, final PersistentIndex index, final lo
* @param storeId the {@link BlobStore} for which the IndexGauges should be deregistered.
*/
private void deregisterIndexGauges(String storeId) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
registry.remove(MetricRegistry.name(Log.class, prefix + "CurrentCapacityUsed"));
registry.remove(MetricRegistry.name(Log.class, prefix + "PercentageUsedCapacity"));
registry.remove(MetricRegistry.name(Log.class, prefix + "CurrentSegmentCount"));
registry.remove(MetricRegistry.name(Log.class, "ByteBufferForAppendTotalCount"));
registry.remove(MetricRegistry.name(Log.class, "UnderCompaction" + SEPARATOR + "ByteBufferForAppendTotalCount"));
}

void initializeHardDeleteMetric(String storeId, final HardDeleter hardDeleter, final PersistentIndex index) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
Gauge<Long> currentHardDeleteProgress = hardDeleter::getProgress;
registry.register(MetricRegistry.name(PersistentIndex.class, prefix + "CurrentHardDeleteProgress"),
currentHardDeleteProgress);
Expand All @@ -245,15 +247,15 @@ void initializeHardDeleteMetric(String storeId, final HardDeleter hardDeleter, f
* @param storeId the {@link BlobStore} for which the HardDeleteMetric should be deregistered.
*/
private void deregisterHardDeleteMetric(String storeId) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
registry.remove(MetricRegistry.name(PersistentIndex.class, prefix + "CurrentHardDeleteProgress"));
registry.remove(MetricRegistry.name(Log.class, prefix + "PercentageHardDeleteCompleted"));
registry.remove(MetricRegistry.name(PersistentIndex.class, prefix + "HardDeleteThreadRunning"));
registry.remove(MetricRegistry.name(PersistentIndex.class, prefix + "HardDeleteCaughtUp"));
}

void initializeCompactorGauges(String storeId, final AtomicBoolean compactionInProgress) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
Gauge<Long> compactionInProgressGauge = () -> compactionInProgress.get() ? 1L : 0L;
registry.register(MetricRegistry.name(BlobStoreCompactor.class, prefix + "CompactionInProgress"),
compactionInProgressGauge);
Expand All @@ -264,7 +266,7 @@ void initializeCompactorGauges(String storeId, final AtomicBoolean compactionInP
* @param storeId the {@link BlobStore} for which the CompactorGauges should be deregistered.
*/
private void deregisterCompactorGauges(String storeId) {
String prefix = storeId + SEPERATOR;
String prefix = storeId + SEPARATOR;
registry.remove(MetricRegistry.name(BlobStoreCompactor.class, prefix + "CompactionInProgress"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void resumeCompactionWithoutAnyInProgressTest() throws Exception {
compactor = getCompactor(state.log, DISK_IO_SCHEDULER);
compactor.initialize(state.index);
assertFalse("Compaction should not be in progress", CompactionLog.isCompactionInProgress(tempDirStr, STORE_ID));
assertEquals("Temp log segment should not be found", 0, compactor.getSwapSegmentsInUse());
assertEquals("Temp log segment should not be found", 0, compactor.getSwapSegmentsInUse().length);
try {
compactor.resumeCompaction(bundleReadBuffer);
fail("Should have failed because there is no compaction in progress");
Expand Down Expand Up @@ -1213,7 +1213,7 @@ private void compactAndVerify(List<String> segmentsUnderCompaction, long deleteR
}

assertFalse("No compaction should be in progress", CompactionLog.isCompactionInProgress(tempDirStr, STORE_ID));
assertEquals("Swap segments should not be found", 0, compactor.getSwapSegmentsInUse());
assertEquals("Swap segments should not be found", 0, compactor.getSwapSegmentsInUse().length);
long logSegmentSizeAfterCompaction = getSumOfLogSegmentEndOffsets();
long logSegmentCountAfterCompaction = state.index.getLogSegmentCount();
long indexSegmentCountAfterCompaction = state.index.getIndexSegments().size();
Expand Down Expand Up @@ -1290,7 +1290,7 @@ private void compactWithRecoveryAndVerify(Log log, DiskIOScheduler diskIOSchedul
state.initIndex(null);
compactor.initialize(state.index);
assertEquals("Wrong number of swap segments in use",
tempDir.list(BlobStoreCompactor.TEMP_LOG_SEGMENTS_FILTER).length, compactor.getSwapSegmentsInUse());
tempDir.list(BlobStoreCompactor.TEMP_LOG_SEGMENTS_FILTER).length, compactor.getSwapSegmentsInUse().length);
try {
if (CompactionLog.isCompactionInProgress(tempDirStr, STORE_ID)) {
compactor.resumeCompaction(bundleReadBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public static List<Object[]> data() {
*/
public BlobStoreTest(boolean isLogSegmented) throws InterruptedException, IOException, StoreException {
this.isLogSegmented = isLogSegmented;
tempDir = StoreTestUtils.createTempDirectory("storeDir-" + UtilsTest.getRandomString(10));
tempDir = StoreTestUtils.createTempDirectory("storeDir-" + storeId);
tempDirStr = tempDir.getAbsolutePath();
StoreConfig config = new StoreConfig(new VerifiableProperties(properties));
long bufferTimeMs = TimeUnit.SECONDS.toMillis(config.storeTtlUpdateBufferTimeSeconds);
Expand Down Expand Up @@ -1180,6 +1180,66 @@ public void storeIoErrorCountTest() throws StoreException, IOException {
reloadStore();
}

/**
* Test both success and failure cases when deleting store files.
* @throws Exception
*/
@Test
public void deleteStoreFilesTest() throws Exception {
store.shutdown();
// create test store directory
File storeDir = StoreTestUtils.createTempDirectory("store-" + storeId);
File reserveDir = StoreTestUtils.createTempDirectory("reserve-pool");
DiskSpaceAllocator diskAllocator =
new DiskSpaceAllocator(true, reserveDir, 0, new StorageManagerMetrics(new MetricRegistry()));
StoreConfig config = new StoreConfig(new VerifiableProperties(properties));
MetricRegistry registry = new MetricRegistry();
StoreMetrics metrics = new StoreMetrics(registry);
BlobStore testStore =
new BlobStore(getMockReplicaId(storeDir.getAbsolutePath()), config, scheduler, storeStatsScheduler,
diskIOScheduler, diskAllocator, metrics, metrics, STORE_KEY_FACTORY, recovery, hardDelete, null, time);
testStore.start();
DiskSpaceRequirements diskSpaceRequirements = testStore.getDiskSpaceRequirements();
diskAllocator.initializePool(diskSpaceRequirements == null ? Collections.emptyList()
: Collections.singletonList(testStore.getDiskSpaceRequirements()));
// ensure store directory and file exist
assertTrue("Store directory doesn't exist", storeDir.exists());
// test that deletion on started store should fail
try {
testStore.deleteStoreFiles();
} catch (IllegalStateException e) {
//expected
}
// create a unreadable dir in store dir to induce deletion failure
File invalidDir = new File(storeDir, "invalidDir");
assertTrue("Couldn't create dir within store dir", invalidDir.mkdir());
assertTrue("Could not make unreadable", invalidDir.setReadable(false));
testStore.shutdown();
try {
testStore.deleteStoreFiles();
fail("should fail because one invalid dir is unreadable");
} catch (Exception e) {
// expected
}
assertTrue("store directory should exist because deletion failed", storeDir.exists());
// reset permission to allow deletion to succeed.
assertTrue("Could not make readable", invalidDir.setReadable(true));

// put a swap segment into store dir
File tempFile = File.createTempFile("sample-swap",
LogSegmentNameHelper.SUFFIX + BlobStoreCompactor.TEMP_LOG_SEGMENT_NAME_SUFFIX, storeDir);
// test success case (swap segment is returned and store dir is correctly deleted)
assertEquals("Swap reserve dir should be empty initially", 0,
diskAllocator.getSwapReserveFileMap().getFileSizeSet().size());
testStore.deleteStoreFiles();
assertFalse("swap segment still exists", tempFile.exists());
assertEquals("Swap reserve dir should have one swap segment", 1,
diskAllocator.getSwapReserveFileMap().getFileSizeSet().size());
assertFalse("store directory shouldn't exist", storeDir.exists());

reloadStore();
}

// helpers
// general

Expand Down
Loading