Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8481] Improve delete and rolling strategy for tiered storage modules #8493

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;

public class MessageStoreConfig {

Expand Down Expand Up @@ -59,6 +60,7 @@ public int getValue() {
return value;
}

@SuppressWarnings("DuplicatedCode")
public static TieredStorageLevel valueOf(int value) {
switch (value) {
case 1:
Expand Down Expand Up @@ -91,18 +93,18 @@ public boolean check(TieredStorageLevel targetLevel) {
private long tieredStoreConsumeQueueMaxSize = 100 * 1024 * 1024;
private int tieredStoreIndexFileMaxHashSlotNum = 5000000;
private int tieredStoreIndexFileMaxIndexNum = 5000000 * 4;
// index file will force rolling to next file after idle specified time, default is 3h
private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;

private String tieredMetadataServiceProvider = "org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore";
private String tieredBackendServiceProvider = "org.apache.rocketmq.tieredstore.provider.MemoryFileSegment";

// file reserved time, default is 72 hour
private boolean tieredStoreDeleteFileEnable = true;
private int tieredStoreFileReservedTime = 72;
private long tieredStoreDeleteFileInterval = Duration.ofHours(1).toMillis();

// time of forcing commitLog to roll to next file, default is 24 hour
private int commitLogRollingInterval = 24;
// rolling will only happen if file segment size is larger than commitcp b LogRollingMinimumSize, default is 128M
private int commitLogRollingMinimumSize = 128 * 1024 * 1024;
// default is 100, unit is millisecond
private int maxCommitJitter = 100;
private int commitLogRollingMinimumSize = 16 * 1024 * 1024;

private boolean tieredStoreGroupCommit = true;
private int tieredStoreGroupCommitTimeout = 30 * 1000;
Expand All @@ -112,7 +114,6 @@ public boolean check(TieredStorageLevel targetLevel) {
private int tieredStoreGroupCommitSize = 4 * 1024 * 1024;
// Cached message count larger than this value will suspend append. default is 10000
private int tieredStoreMaxGroupCommitCount = 10000;
private long tieredStoreMaxFallBehindSize = 128 * 1024 * 1024;

private boolean readAheadCacheEnable = true;
private int readAheadMessageCountThreshold = 4096;
Expand Down Expand Up @@ -226,14 +227,6 @@ public void setTieredStoreIndexFileMaxIndexNum(int tieredStoreIndexFileMaxIndexN
this.tieredStoreIndexFileMaxIndexNum = tieredStoreIndexFileMaxIndexNum;
}

public int getTieredStoreIndexFileRollingIdleInterval() {
return tieredStoreIndexFileRollingIdleInterval;
}

public void setTieredStoreIndexFileRollingIdleInterval(int tieredStoreIndexFileRollingIdleInterval) {
this.tieredStoreIndexFileRollingIdleInterval = tieredStoreIndexFileRollingIdleInterval;
}

public String getTieredMetadataServiceProvider() {
return tieredMetadataServiceProvider;
}
Expand All @@ -250,6 +243,14 @@ public void setTieredBackendServiceProvider(String tieredBackendServiceProvider)
this.tieredBackendServiceProvider = tieredBackendServiceProvider;
}

public boolean isTieredStoreDeleteFileEnable() {
return tieredStoreDeleteFileEnable;
}

public void setTieredStoreDeleteFileEnable(boolean tieredStoreDeleteFileEnable) {
this.tieredStoreDeleteFileEnable = tieredStoreDeleteFileEnable;
}

public int getTieredStoreFileReservedTime() {
return tieredStoreFileReservedTime;
}
Expand All @@ -258,6 +259,14 @@ public void setTieredStoreFileReservedTime(int tieredStoreFileReservedTime) {
this.tieredStoreFileReservedTime = tieredStoreFileReservedTime;
}

public long getTieredStoreDeleteFileInterval() {
return tieredStoreDeleteFileInterval;
}

public void setTieredStoreDeleteFileInterval(long tieredStoreDeleteFileInterval) {
this.tieredStoreDeleteFileInterval = tieredStoreDeleteFileInterval;
}

public int getCommitLogRollingInterval() {
return commitLogRollingInterval;
}
Expand All @@ -274,14 +283,6 @@ public void setCommitLogRollingMinimumSize(int commitLogRollingMinimumSize) {
this.commitLogRollingMinimumSize = commitLogRollingMinimumSize;
}

public int getMaxCommitJitter() {
return maxCommitJitter;
}

public void setMaxCommitJitter(int maxCommitJitter) {
this.maxCommitJitter = maxCommitJitter;
}

public boolean isTieredStoreGroupCommit() {
return tieredStoreGroupCommit;
}
Expand Down Expand Up @@ -322,14 +323,6 @@ public void setTieredStoreMaxGroupCommitCount(int tieredStoreMaxGroupCommitCount
this.tieredStoreMaxGroupCommitCount = tieredStoreMaxGroupCommitCount;
}

public long getTieredStoreMaxFallBehindSize() {
return tieredStoreMaxFallBehindSize;
}

public void setTieredStoreMaxFallBehindSize(long tieredStoreMaxFallBehindSize) {
this.tieredStoreMaxFallBehindSize = tieredStoreMaxFallBehindSize;
}

public boolean isReadAheadCacheEnable() {
return readAheadCacheEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public boolean load() {
if (result) {
indexService.start();
dispatcher.start();
storeExecutor.commonExecutor.scheduleWithFixedDelay(
flatFileStore::scheduleDeleteExpireFile, storeConfig.getTieredStoreDeleteFileInterval(),
storeConfig.getTieredStoreDeleteFileInterval(), TimeUnit.MILLISECONDS);
}
return result;
}
Expand Down Expand Up @@ -457,12 +460,12 @@ public synchronized void shutdown() {
if (dispatcher != null) {
dispatcher.shutdown();
}
if (flatFileStore != null) {
flatFileStore.shutdown();
}
if (indexService != null) {
indexService.shutdown();
}
if (flatFileStore != null) {
flatFileStore.shutdown();
}
if (storeExecutor != null) {
storeExecutor.shutdown();
}
Expand All @@ -473,6 +476,9 @@ public void destroy() {
if (next != null) {
next.destroy();
}
if (indexService != null) {
indexService.destroy();
}
if (flatFileStore != null) {
flatFileStore.destroy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

// If set to max offset here, some written messages may be lost
if (!flatFile.isFlatFileInit()) {
currentOffset = Math.max(minOffsetInQueue,
maxOffsetInQueue - storeConfig.getTieredStoreGroupCommitSize());
currentOffset = defaultStore.getOffsetInQueueByTime(
topic, queueId, System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(2));
currentOffset = Math.max(currentOffset, minOffsetInQueue);
currentOffset = Math.min(currentOffset, maxOffsetInQueue);
flatFile.initOffset(currentOffset);
log.warn("MessageDispatcher#dispatch init, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.file.FlatMessageFile;
import org.apache.rocketmq.tieredstore.index.IndexItem;
import org.apache.rocketmq.tieredstore.index.IndexService;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
Expand All @@ -56,15 +57,24 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
private final MetadataStore metadataStore;
private final MessageStoreConfig storeConfig;
private final TieredMessageStore messageStore;
private final IndexService indexService;
private final FlatFileStore flatFileStore;
private final long memoryMaxSize;
private final Cache<String /* topic@queueId@offset */, SelectBufferResult> fetcherCache;

public MessageStoreFetcherImpl(TieredMessageStore messageStore) {
this.storeConfig = messageStore.getStoreConfig();
this(messageStore, messageStore.getStoreConfig(),
messageStore.getFlatFileStore(), messageStore.getIndexService());
}

public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConfig storeConfig,
FlatFileStore flatFileStore, IndexService indexService) {

this.storeConfig = storeConfig;
this.brokerName = storeConfig.getBrokerName();
this.flatFileStore = messageStore.getFlatFileStore();
this.flatFileStore = flatFileStore;
this.messageStore = messageStore;
this.indexService = indexService;
this.metadataStore = flatFileStore.getMetadataStore();
this.memoryMaxSize =
(long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate());
Expand Down Expand Up @@ -192,7 +202,11 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
log.debug("MessageFetcher cache miss, group={}, topic={}, queueId={}, offset={}, maxCount={}, lag={}",
group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, result.getMaxOffset() - result.getNextBeginOffset());

return fetchMessageThenPutToCache(flatFile, queueOffset, storeConfig.getReadAheadMessageCountThreshold())
// To optimize the performance of pop consumption
// Pop revive will cause a large number of random reads,
// so the amount of pre-fetch message num needs to be reduced.
int fetchSize = maxCount == 1 ? 32 : storeConfig.getReadAheadMessageCountThreshold();
return fetchMessageThenPutToCache(flatFile, queueOffset, fetchSize)
.thenApply(maxOffset -> getMessageFromCache(flatFile, queueOffset, maxCount, messageFilter));
}

Expand Down Expand Up @@ -414,8 +428,7 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
return CompletableFuture.completedFuture(new QueryMessageResult());
}

CompletableFuture<List<IndexItem>> future =
messageStore.getIndexService().queryAsync(topic, key, maxCount, begin, end);
CompletableFuture<List<IndexItem>> future = indexService.queryAsync(topic, key, maxCount, begin, end);

return future.thenCompose(indexItemList -> {
List<CompletableFuture<SelectMappedBufferResult>> futureList = new ArrayList<>(maxCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.provider.FileSegmentFactory;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;

Expand All @@ -33,10 +34,19 @@ public FlatCommitLogFile(FileSegmentFactory fileSegmentFactory, String filePath)
this.initOffset(0L);
}

/**
* Two rules are set here:
* 1. Single file must be saved for more than one day as default.
* 2. Single file must reach the minimum size before switching.
* When calculating storage space, due to the limitation of condition 2,
* the actual usage of storage space may be slightly higher than expected.
*/
public boolean tryRollingFile(long interval) {
long timestamp = this.getFileToWrite().getMinTimestamp();
if (timestamp != Long.MAX_VALUE &&
timestamp + interval < System.currentTimeMillis()) {
FileSegment fileSegment = this.getFileToWrite();
long timestamp = fileSegment.getMinTimestamp();
if (timestamp != Long.MAX_VALUE && timestamp + interval < System.currentTimeMillis() &&
fileSegment.getAppendPosition() >=
fileSegmentFactory.getStoreConfig().getCommitLogRollingMinimumSize()) {
this.rollingNewFile(this.getAppendOffset());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ public boolean load() {
try {
this.flatFileConcurrentMap.clear();
this.recover();
this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours());
flatFile.destroyExpiredFile(expiredTimeStamp);
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
}
}
}, 60, 60, TimeUnit.SECONDS);
log.info("FlatFileStore recover finished, total cost={}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -113,6 +103,27 @@ public CompletableFuture<Void> recoverAsync(TopicMetadata topicMetadata) {
}, executor.bufferCommitExecutor);
}

public void scheduleDeleteExpireFile() {
if (!storeConfig.isTieredStoreDeleteFileEnable()) {
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
ImmutableList<FlatMessageFile> fileList = this.deepCopyFlatFileToList();
for (FlatMessageFile flatFile : fileList) {
flatFile.getFileLock().lock();
try {
flatFile.destroyExpiredFile(System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours()));
} catch (Exception e) {
log.error("FlatFileStore delete expire file error", e);
} finally {
flatFile.getFileLock().unlock();
}
}
log.info("FlatFileStore schedule delete expired file, count={}, cost={}ms",
fileList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

public MetadataStore getMetadataStore() {
return metadataStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ AppendResult putKey(
*/
CompletableFuture<List<IndexItem>> queryAsync(String topic, String key, int maxCount, long beginTime, long endTime);

default void forceUpload() {
}

/**
* Shutdown the index service.
*/
Expand Down
Loading
Loading