Skip to content

Commit

Permalink
[ISSUE apache#8481] Improve delete and rolling strategy for tiered st…
Browse files Browse the repository at this point in the history
…orage modules
  • Loading branch information
lizhimins committed Aug 5, 2024
1 parent 304987d commit 5198472
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;

public class MessageStoreConfig {

Expand Down Expand Up @@ -59,6 +60,7 @@ public int getValue() {
return value;
}

@SuppressWarnings("DuplicatedCode")
public static TieredStorageLevel valueOf(int value) {
switch (value) {
case 1:
Expand Down Expand Up @@ -91,18 +93,18 @@ public boolean check(TieredStorageLevel targetLevel) {
private long tieredStoreConsumeQueueMaxSize = 100 * 1024 * 1024;
private int tieredStoreIndexFileMaxHashSlotNum = 5000000;
private int tieredStoreIndexFileMaxIndexNum = 5000000 * 4;
// index file will force rolling to next file after idle specified time, default is 3h
private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;

private String tieredMetadataServiceProvider = "org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore";
private String tieredBackendServiceProvider = "org.apache.rocketmq.tieredstore.provider.MemoryFileSegment";

// file reserved time, default is 72 hour
private boolean tieredStoreDeleteFileEnable = true;
private int tieredStoreFileReservedTime = 72;
private long tieredStoreDeleteFileInterval = Duration.ofHours(1).toMillis();

// time of forcing commitLog to roll to next file, default is 24 hour
private int commitLogRollingInterval = 24;
// rolling will only happen if file segment size is larger than commitcp b LogRollingMinimumSize, default is 128M
private int commitLogRollingMinimumSize = 128 * 1024 * 1024;
// default is 100, unit is millisecond
private int maxCommitJitter = 100;
private int commitLogRollingMinimumSize = 16 * 1024 * 1024;

private boolean tieredStoreGroupCommit = true;
private int tieredStoreGroupCommitTimeout = 30 * 1000;
Expand All @@ -112,7 +114,6 @@ public boolean check(TieredStorageLevel targetLevel) {
private int tieredStoreGroupCommitSize = 4 * 1024 * 1024;
// Cached message count larger than this value will suspend append. default is 10000
private int tieredStoreMaxGroupCommitCount = 10000;
private long tieredStoreMaxFallBehindSize = 128 * 1024 * 1024;

private boolean readAheadCacheEnable = true;
private int readAheadMessageCountThreshold = 4096;
Expand Down Expand Up @@ -226,14 +227,6 @@ public void setTieredStoreIndexFileMaxIndexNum(int tieredStoreIndexFileMaxIndexN
this.tieredStoreIndexFileMaxIndexNum = tieredStoreIndexFileMaxIndexNum;
}

public int getTieredStoreIndexFileRollingIdleInterval() {
return tieredStoreIndexFileRollingIdleInterval;
}

public void setTieredStoreIndexFileRollingIdleInterval(int tieredStoreIndexFileRollingIdleInterval) {
this.tieredStoreIndexFileRollingIdleInterval = tieredStoreIndexFileRollingIdleInterval;
}

public String getTieredMetadataServiceProvider() {
return tieredMetadataServiceProvider;
}
Expand All @@ -250,6 +243,14 @@ public void setTieredBackendServiceProvider(String tieredBackendServiceProvider)
this.tieredBackendServiceProvider = tieredBackendServiceProvider;
}

public boolean isTieredStoreDeleteFileEnable() {
return tieredStoreDeleteFileEnable;
}

public void setTieredStoreDeleteFileEnable(boolean tieredStoreDeleteFileEnable) {
this.tieredStoreDeleteFileEnable = tieredStoreDeleteFileEnable;
}

public int getTieredStoreFileReservedTime() {
return tieredStoreFileReservedTime;
}
Expand All @@ -258,6 +259,14 @@ public void setTieredStoreFileReservedTime(int tieredStoreFileReservedTime) {
this.tieredStoreFileReservedTime = tieredStoreFileReservedTime;
}

public long getTieredStoreDeleteFileInterval() {
return tieredStoreDeleteFileInterval;
}

public void setTieredStoreDeleteFileInterval(long tieredStoreDeleteFileInterval) {
this.tieredStoreDeleteFileInterval = tieredStoreDeleteFileInterval;
}

public int getCommitLogRollingInterval() {
return commitLogRollingInterval;
}
Expand All @@ -274,14 +283,6 @@ public void setCommitLogRollingMinimumSize(int commitLogRollingMinimumSize) {
this.commitLogRollingMinimumSize = commitLogRollingMinimumSize;
}

public int getMaxCommitJitter() {
return maxCommitJitter;
}

public void setMaxCommitJitter(int maxCommitJitter) {
this.maxCommitJitter = maxCommitJitter;
}

public boolean isTieredStoreGroupCommit() {
return tieredStoreGroupCommit;
}
Expand Down Expand Up @@ -322,14 +323,6 @@ public void setTieredStoreMaxGroupCommitCount(int tieredStoreMaxGroupCommitCount
this.tieredStoreMaxGroupCommitCount = tieredStoreMaxGroupCommitCount;
}

public long getTieredStoreMaxFallBehindSize() {
return tieredStoreMaxFallBehindSize;
}

public void setTieredStoreMaxFallBehindSize(long tieredStoreMaxFallBehindSize) {
this.tieredStoreMaxFallBehindSize = tieredStoreMaxFallBehindSize;
}

public boolean isReadAheadCacheEnable() {
return readAheadCacheEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public boolean load() {
if (result) {
indexService.start();
dispatcher.start();
storeExecutor.commonExecutor.scheduleWithFixedDelay(
flatFileStore::scheduleDeleteExpireFile, storeConfig.getTieredStoreDeleteFileInterval(),
storeConfig.getTieredStoreDeleteFileInterval(), TimeUnit.MILLISECONDS);
}
return result;
}
Expand Down Expand Up @@ -457,12 +460,12 @@ public synchronized void shutdown() {
if (dispatcher != null) {
dispatcher.shutdown();
}
if (flatFileStore != null) {
flatFileStore.shutdown();
}
if (indexService != null) {
indexService.shutdown();
}
if (flatFileStore != null) {
flatFileStore.shutdown();
}
if (storeExecutor != null) {
storeExecutor.shutdown();
}
Expand All @@ -473,6 +476,9 @@ public void destroy() {
if (next != null) {
next.destroy();
}
if (indexService != null) {
indexService.destroy();
}
if (flatFileStore != null) {
flatFileStore.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) {
currentOffset = Math.max(minOffsetInQueue,
maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
currentOffset = defaultStore.getOffsetInQueueByTime(
topic, queueId, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2));
currentOffset = Math.max(currentOffset, minOffsetInQueue);
currentOffset = Math.min(currentOffset, maxOffsetInQueue);
flatFile.initOffset(currentOffset);
log.warn("MessageDispatcher#dispatch init, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
import org.apache.rocketmq.tieredstore.index.IndexItem;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
Expand All @@ -56,15 +57,24 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
private final MetadataStore metadataStore;
private final MessageStoreConfig storeConfig;
private final TieredMessageStore messageStore;
private final IndexService indexService;
private final FlatFileStore flatFileStore;
private final long memoryMaxSize;
private final Cache<String /* topic@queueId@offset */, SelectBufferResult> fetcherCache;

public MessageStoreFetcherImpl(TieredMessageStore messageStore) {
this.storeConfig = messageStore.getStoreConfig();
this(messageStore, messageStore.getStoreConfig(),
messageStore.getFlatFileStore(), messageStore.getIndexService());
}

public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConfig storeConfig,
FlatFileStore flatFileStore, IndexService indexService) {

this.storeConfig = storeConfig;
this.brokerName = storeConfig.getBrokerName();
this.flatFileStore = messageStore.getFlatFileStore();
this.flatFileStore = flatFileStore;
this.messageStore = messageStore;
this.indexService = indexService;
this.metadataStore = flatFileStore.getMetadataStore();
this.memoryMaxSize =
(long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate());
Expand Down Expand Up @@ -192,7 +202,11 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
log.debug("MessageFetcher cache miss, group={}, topic={}, queueId={}, offset={}, maxCount={}, lag={}",
group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, result.getMaxOffset() - result.getNextBeginOffset());

return fetchMessageThenPutToCache(flatFile, queueOffset, storeConfig.getReadAheadMessageCountThreshold())
// To optimize the performance of pop consumption
// Pop revive will cause a large number of random reads,
// so the amount of pre-fetch message num needs to be reduced.
int fetchSize = maxCount == 1 ? 32 : storeConfig.getReadAheadMessageCountThreshold();
return fetchMessageThenPutToCache(flatFile, queueOffset, fetchSize)
.thenApply(maxOffset -> getMessageFromCache(flatFile, queueOffset, maxCount, messageFilter));
}

Expand Down Expand Up @@ -414,8 +428,7 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
return CompletableFuture.completedFuture(new QueryMessageResult());
}

CompletableFuture<List<IndexItem>> future =
messageStore.getIndexService().queryAsync(topic, key, maxCount, begin, end);
CompletableFuture<List<IndexItem>> future = indexService.queryAsync(topic, key, maxCount, begin, end);

return future.thenCompose(indexItemList -> {
List<CompletableFuture<SelectMappedBufferResult>> futureList = new ArrayList<>(maxCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.provider.FileSegmentFactory;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;

Expand All @@ -33,10 +34,19 @@ public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath)
this.initOffset(0L);
}

/**
* Two rules are set here:
* 1. Single file must be saved for more than one day as default.
* 2. Single file must reach the minimum size before switching.
* When calculating storage space, due to the limitation of condition 2,
* the actual usage of storage space may be slightly higher than expected.
*/
public boolean tryRollingFile(long interval) {
long timestamp = this.getFileToWrite().getMinTimestamp();
if (timestamp != Long.MAX_VALUE &&
timestamp + interval < System.currentTimeMillis()) {
FileSegment fileSegment = this.getFileToWrite();
long timestamp = fileSegment.getMinTimestamp();
if (timestamp != Long.MAX_VALUE && timestamp + interval < System.currentTimeMillis() &&
fileSegment.getAppendPosition() >=
fileSegmentFactory.getStoreConfig().getCommitLogRollingMinimumSize()) {
this.rollingNewFile(this.getAppendOffset());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ public boolean load() {
try {
this.flatFileConcurrentMap.clear();
this.recover();
this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours());
flatFile.destroyExpiredFile(expiredTimeStamp);
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
}
}
}, 60, 60, TimeUnit.SECONDS);
log.info("FlatFileStore recover finished, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -113,6 +103,27 @@ public CompletableFuture<Void> recoverAsync(TopicMetadata topicMetadata) {
}, executor.bufferCommitExecutor);
}

public void scheduleDeleteExpireFile() {
if (!storeConfig.isTieredStoreDeleteFileEnable()) {
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
ImmutableList<FlatMessageFile> fileList = this.deepCopyFlatFileToList();
for (FlatMessageFile flatFile : fileList) {
flatFile.getFileLock().lock();
try {
flatFile.destroyExpiredFile(System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours()));
} catch (Exception e) {
log.error("FlatFileStore delete expire file error", e);
} finally {
flatFile.getFileLock().unlock();
}
}
log.info("FlatFileStore schedule delete expired file, count={}, cost={}ms",
fileList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

public MetadataStore getMetadataStore() {
return metadataStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ AppendResult putKey(
*/
CompletableFuture<List<IndexItem>> queryAsync(String topic, String key, int maxCount, long beginTime, long endTime);

default void forceUpload() {
}

/**
* Shutdown the index service.
*/
Expand Down
Loading

0 comments on commit 5198472

Please sign in to comment.