Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring DiskSpaceAllocator to support dynamically add/remove store #1210

Merged
merged 3 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,10 @@ AtomicInteger getErrorCount() {
*/
DiskSpaceRequirements getDiskSpaceRequirements() throws StoreException {
checkStarted();
DiskSpaceRequirements requirements = log.isLogSegmented() ? new DiskSpaceRequirements(log.getSegmentCapacity(),
DiskSpaceRequirements requirements = log.isLogSegmented() ? new DiskSpaceRequirements(
replicaId.getPartitionId().toPathString(), log.getSegmentCapacity(),
log.getRemainingUnallocatedSegments(), compactor.getSwapSegmentsInUse()) : null;
logger.debug("Store {} has disk space requirements: {}", storeId, requirements);
logger.info("Store {} has disk space requirements: {}", storeId, requirements);
return requirements;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ A single compaction job could be performed across multiple compaction cycles (if
* @return the number of temporary log segment files this compactor is currently using.
*/
int getSwapSegmentsInUse() throws StoreException {
// TODO: use this method to return swap segment to pool when removing store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

address this TODO?

Copy link
Contributor Author

@jsjtzyy jsjtzyy Jul 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO is just a reminder that future PR (dynamically remove store) could invoke this method to swap segment in use and return it to pool. For current PR, no extra changes are needed. I will remove this comment in "remove store" PR :)

String[] tempSegments = dataDir.list(TEMP_LOG_SEGMENTS_FILTER);
if (tempSegments == null) {
throw new StoreException("Error occurred while listing files in data dir:" + dataDir.getAbsolutePath(),
Expand Down Expand Up @@ -339,11 +340,10 @@ private void checkSanity(CompactionDetails details) {
* <p/>
* Splits the compaction into two cycles if there aren't enough swap spaces and completes the copy for the current
* cycle.
* @throws InterruptedException if the compaction was interrupted
* @throws IOException if there were I/O errors during copying.
* @throws StoreException if there were exceptions reading to writing to store components.
*/
private void copy() throws InterruptedException, IOException, StoreException {
private void copy() throws IOException, StoreException {
setupState();
List<String> logSegmentsUnderCompaction = compactionLog.getCompactionDetails().getLogSegmentsUnderCompaction();
FileSpan duplicateSearchSpan = null;
Expand Down Expand Up @@ -405,10 +405,9 @@ private void copy() throws InterruptedException, IOException, StoreException {
* 2. Adding them to the log segments maintained by the application log.
* 3. Atomically switching the old set of index segments for the new ones (if not recovering).
* @param recovering {@code true} if this function was called in the context of recovery. {@code false} otherwise.
* @throws IOException if there were I/O errors during committing.
* @throws StoreException if there were exceptions reading to writing to store components.
*/
private void commit(boolean recovering) throws IOException, StoreException {
private void commit(boolean recovering) throws StoreException {
List<String> logSegmentNames = getTargetLogSegmentNames();
logger.debug("Target log segments are {} for {}", logSegmentNames, storeId);
renameLogSegments(logSegmentNames);
Expand Down
315 changes: 253 additions & 62 deletions ambry-store/src/main/java/com.github.ambry.store/DiskSpaceAllocator.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ class DiskSpaceRequirements {
private final long segmentSizeInBytes;
private final long segmentsNeeded;
private final long swapSegmentsInUse;
private final String storeId;

/**
* @param storeId the store which the {@link DiskSpaceRequirements} is associated with.
* @param segmentSizeInBytes the size of each segment needed, in bytes.
* @param segmentsNeeded the number of additional segments needed in the disk space pool.
* @param swapSegmentsInUse the number of swap segments currently in use by this entity.
*/
DiskSpaceRequirements(long segmentSizeInBytes, long segmentsNeeded, long swapSegmentsInUse) {
DiskSpaceRequirements(String storeId, long segmentSizeInBytes, long segmentsNeeded, long swapSegmentsInUse) {
if (segmentSizeInBytes <= 0 || segmentsNeeded < 0 || swapSegmentsInUse < 0) {
throw new IllegalArgumentException(
"Arguments cannot be negative. segmentSizeInBytes: " + segmentSizeInBytes + ", segmentsNeeded: "
Expand All @@ -37,6 +39,7 @@ class DiskSpaceRequirements {
this.segmentSizeInBytes = segmentSizeInBytes;
this.segmentsNeeded = segmentsNeeded;
this.swapSegmentsInUse = swapSegmentsInUse;
this.storeId = storeId;
}

/**
Expand All @@ -60,6 +63,10 @@ long getSwapSegmentsInUse() {
return swapSegmentsInUse;
}

String getStoreId() {
return storeId;
}

@Override
public String toString() {
return "DiskSpaceRequirements{segmentSizeInBytes=" + segmentSizeInBytes + ", segmentsNeeded=" + segmentsNeeded
Expand Down
45 changes: 28 additions & 17 deletions ambry-store/src/main/java/com.github.ambry.store/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Log implements Write {
new ConcurrentSkipListMap<>(LogSegmentNameHelper.COMPARATOR);
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicLong remainingUnallocatedSegments = new AtomicLong(0);
private final String storeId;

private boolean isLogSegmented;
private LogSegment activeSegment = null;
Expand All @@ -63,27 +64,29 @@ class Log implements Write {
* {@code totalCapacityInBytes} > {@code segmentCapacityInBytes} and {@code totalCapacityInBytes} is not a perfect
* multiple of {@code segmentCapacityInBytes}.
*/
Log(String dataDir, long totalCapacityInBytes, DiskSpaceAllocator diskSpaceAllocator, StoreConfig config, StoreMetrics metrics) throws StoreException {
Log(String dataDir, long totalCapacityInBytes, DiskSpaceAllocator diskSpaceAllocator, StoreConfig config,
StoreMetrics metrics) throws StoreException {
this.dataDir = dataDir;
this.capacityInBytes = totalCapacityInBytes;
this.isLogSegmented = totalCapacityInBytes > config.storeSegmentSizeInBytes;
this.diskSpaceAllocator = diskSpaceAllocator;
this.config = config;
this.metrics = metrics;
this.segmentNameAndFileNameIterator = Collections.EMPTY_LIST.iterator();
storeId = dataDir.substring(dataDir.lastIndexOf(File.separator) + File.separator.length());

File dir = new File(dataDir);
File[] segmentFiles = dir.listFiles(LogSegmentNameHelper.LOG_FILE_FILTER);
if (segmentFiles == null) {
throw new StoreException("Could not read from directory: " + dataDir, StoreErrorCodes.File_Not_Found);
} else {
initialize(getSegmentsToLoad(segmentFiles), config.storeSegmentSizeInBytes);
initialize(getSegmentsToLoad(segmentFiles), config.storeSegmentSizeInBytes, false);
this.isLogSegmented = isExistingLogSegmented();
}
}

/**
* Create a Log instance
* Create a Log instance in COPY phase during compaction.
* @param dataDir the directory where the segments of the log need to be loaded from.
* @param totalCapacityInBytes the total capacity of this log.
* @param diskSpaceAllocator the {@link DiskSpaceAllocator} to use to allocate new log segments.
Expand All @@ -99,7 +102,8 @@ class Log implements Write {
* {@code totalCapacityInBytes} > {@code segmentCapacityInBytes} and {@code totalCapacityInBytes} is not a perfect
* multiple of {@code segmentCapacityInBytes}.
*/
Log(String dataDir, long totalCapacityInBytes, DiskSpaceAllocator diskSpaceAllocator, StoreConfig config, StoreMetrics metrics, boolean isLogSegmented, List<LogSegment> segmentsToLoad,
Log(String dataDir, long totalCapacityInBytes, DiskSpaceAllocator diskSpaceAllocator, StoreConfig config,
StoreMetrics metrics, boolean isLogSegmented, List<LogSegment> segmentsToLoad,
Iterator<Pair<String, String>> segmentNameAndFileNameIterator) throws StoreException {
this.dataDir = dataDir;
this.capacityInBytes = totalCapacityInBytes;
Expand All @@ -108,8 +112,9 @@ class Log implements Write {
this.config = config;
this.metrics = metrics;
this.segmentNameAndFileNameIterator = segmentNameAndFileNameIterator;
storeId = dataDir.substring(dataDir.lastIndexOf(File.separator) + File.separator.length());

initialize(segmentsToLoad, config.storeSegmentSizeInBytes);
initialize(segmentsToLoad, config.storeSegmentSizeInBytes, true);
}

/**
Expand Down Expand Up @@ -176,7 +181,7 @@ void setActiveSegment(String name) throws StoreException {
while (iterator.hasNext()) {
Map.Entry<String, LogSegment> entry = iterator.next();
logger.info("Freeing extra segment with name [{}] ", entry.getValue().getName());
free(entry.getValue());
free(entry.getValue(), false);
remainingUnallocatedSegments.getAndIncrement();
iterator.remove();
}
Expand Down Expand Up @@ -299,10 +304,11 @@ void close(boolean skipDiskFlush) throws IOException {
* Checks the provided arguments for consistency and allocates the first segment file and creates the
* {@link LogSegment} instance for it.
* @param segmentCapacity the intended capacity of each segment of the log.
* @param needSwapSegment whether a swap segment is needed by {@link BlobStoreCompactor}
* @return the {@link LogSegment} instance that is created.
* @throws StoreException if there is store exception when creating the segment files or creating {@link LogSegment} instances.
*/
private LogSegment checkArgsAndGetFirstSegment(long segmentCapacity) throws StoreException {
private LogSegment checkArgsAndGetFirstSegment(long segmentCapacity, boolean needSwapSegment) throws StoreException {
if (capacityInBytes <= 0 || segmentCapacity <= 0) {
throw new IllegalArgumentException(
"One of totalCapacityInBytes [" + capacityInBytes + "] or " + "segmentCapacityInBytes [" + segmentCapacity
Expand All @@ -320,7 +326,7 @@ private LogSegment checkArgsAndGetFirstSegment(long segmentCapacity) throws Stor
logger.info("Allocating first segment with name [{}], back by file {} and capacity {} bytes. Total number of "
+ "segments is {}", segmentNameAndFilename.getFirst(), segmentNameAndFilename.getSecond(), segmentCapacity,
numSegments);
File segmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity);
File segmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity, needSwapSegment);
// to be backwards compatible, headers are not written for a log segment if it is the only log segment.
return new LogSegment(segmentNameAndFilename.getFirst(), segmentFile, segmentCapacity, config, metrics,
isLogSegmented);
Expand Down Expand Up @@ -363,12 +369,15 @@ private boolean isExistingLogSegmented() {
* Initializes the log.
* @param segmentsToLoad the {@link LogSegment} instances to include as a part of the log. These are not in any order
* @param segmentCapacityInBytes the capacity of a single {@link LogSegment}.
* @param mayNeedSwapSegment whether compactor may need swap segment.
* @throws StoreException if there is any store exception during initialization.
*/
private void initialize(List<LogSegment> segmentsToLoad, long segmentCapacityInBytes) throws StoreException {
private void initialize(List<LogSegment> segmentsToLoad, long segmentCapacityInBytes, boolean mayNeedSwapSegment)
throws StoreException {
if (segmentsToLoad.size() == 0) {
// bootstrapping log.
segmentsToLoad = Collections.singletonList(checkArgsAndGetFirstSegment(segmentCapacityInBytes));
segmentsToLoad =
Collections.singletonList(checkArgsAndGetFirstSegment(segmentCapacityInBytes, mayNeedSwapSegment));
}

LogSegment anySegment = segmentsToLoad.get(0);
Expand All @@ -386,14 +395,15 @@ private void initialize(List<LogSegment> segmentsToLoad, long segmentCapacityInB
* Allocates a file named {@code filename} and of capacity {@code size}.
* @param filename the intended filename of the file.
* @param size the intended size of the file.
* @param requestSwapSegment whether swap segment is requested by {@link BlobStoreCompactor}
* @return a {@link File} instance that points to the created file named {@code filename} and capacity {@code size}.
* @throws StoreException if the there is any store exception while allocating the file.
*/
File allocate(String filename, long size) throws StoreException {
File allocate(String filename, long size, boolean requestSwapSegment) throws StoreException {
File segmentFile = new File(dataDir, filename);
if (!segmentFile.exists()) {
try {
diskSpaceAllocator.allocate(segmentFile, size);
diskSpaceAllocator.allocate(segmentFile, size, storeId, requestSwapSegment);
} catch (IOException e) {
StoreErrorCodes errorCode = StoreException.resolveErrorCode(e);
throw new StoreException(errorCode.toString() + " while allocating the file", e, errorCode);
Expand All @@ -405,13 +415,14 @@ File allocate(String filename, long size) throws StoreException {
/**
* Frees the given {@link LogSegment} and its backing segment file.
* @param logSegment the {@link LogSegment} instance whose backing file needs to be freed.
* @param isSwapSegment whether the segment to free is a swap segment.
* @throws StoreException if there is any store exception when freeing the log segment.
*/
private void free(LogSegment logSegment) throws StoreException {
private void free(LogSegment logSegment, boolean isSwapSegment) throws StoreException {
File segmentFile = logSegment.getView().getFirst();
try {
logSegment.close(false);
diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes());
diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes(), storeId, isSwapSegment);
} catch (IOException e) {
StoreErrorCodes errorCode = StoreException.resolveErrorCode(e);
throw new StoreException(errorCode.toString() + " while freeing log segment", e, errorCode);
Expand Down Expand Up @@ -466,14 +477,14 @@ private void ensureCapacity(long writeSize) throws StoreException {
logger.info("Allocating new segment with name: " + segmentNameAndFilename.getFirst());
File newSegmentFile = null;
try {
newSegmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity);
newSegmentFile = allocate(segmentNameAndFilename.getSecond(), segmentCapacity, false);
LogSegment newSegment =
new LogSegment(segmentNameAndFilename.getFirst(), newSegmentFile, segmentCapacity, config, metrics, true);
segmentsByName.put(segmentNameAndFilename.getFirst(), newSegment);
} catch (StoreException e) {
try {
if (newSegmentFile != null) {
diskSpaceAllocator.free(newSegmentFile, segmentCapacity);
diskSpaceAllocator.free(newSegmentFile, segmentCapacity, storeId, false);
}
remainingUnallocatedSegments.incrementAndGet();
throw e;
Expand Down Expand Up @@ -540,7 +551,7 @@ void dropSegment(String segmentName, boolean decreaseUsedSegmentCount) throws St
throw new IllegalArgumentException("Segment does not exist or is the active segment: " + segmentName);
}
segmentsByName.remove(segmentName);
free(segment);
free(segment, !decreaseUsedSegmentCount);
if (decreaseUsedSegmentCount) {
remainingUnallocatedSegments.incrementAndGet();
}
Expand Down
Loading