Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): ignore operation with illegal argument in recovery process #429

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.automq.stream.s3.wal.BlockWALService;
import com.automq.stream.s3.wal.WriteAheadLog;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -52,28 +53,20 @@ public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
private final Config s3Config;
private final StreamClient streamClient;
private final StoreMetadataService metadataService;
private final StreamManager streamManager;
private final ObjectManager objectManager;
private final WriteAheadLog writeAheadLog;
private final S3Operator operator;
private final Storage storage;
private final CompactionManager compactionManager;
private final S3BlockCache blockCache;
private final ThreadPoolExecutor storeWorkingThreadPool;

public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, StoreMetadataService metadataService,
S3Operator operator) {
this.s3Config = configFrom(streamConfig);

// Build meta service and related manager
this.metadataService = metadataService;
this.streamManager = new S3StreamManager(metadataService);
this.objectManager = new S3ObjectManager(metadataService);
StreamManager streamManager = new S3StreamManager(metadataService);
ObjectManager objectManager = new S3ObjectManager(metadataService);

this.operator = operator;
this.writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
this.blockCache = new DefaultS3BlockCache(s3Config.s3CacheSize(), objectManager, operator);
WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.s3WALPath(), s3Config.s3WALCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config.s3CacheSize(), objectManager, operator);

// Build the s3 storage
this.storage = new S3Storage(s3Config, writeAheadLog, streamManager, objectManager, blockCache, operator);
Expand Down Expand Up @@ -117,13 +110,10 @@ public CompletableFuture<Void> close(List<Long> streamIds) {
List<CompletableFuture<Void>> futureList = streamIds.stream()
.map(streamId -> {
Optional<Stream> stream = streamClient.getStream(streamId);
if (stream.isEmpty()) {
return stream.get().close();
}
return null;
return stream.map(Stream::close).orElse(null);
})
.filter(x -> x != null)
.collect(java.util.stream.Collectors.toList());
.filter(Objects::nonNull)
.toList();
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]))
.thenApplyAsync(result -> result, storeWorkingThreadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ public CompletableFuture<AckResult> ack(String receiptHandle) {
AckOperation operation = new AckOperation(handle.topicId(), handle.queueId(), operationStreamId,
snapshotStreamId, stateMachine, handle.consumerGroupId(), handle.operationId(), System.currentTimeMillis(),
AckOperation.AckOperationType.ACK_NORMAL);
return operationLogService.logAckOperation(operation).thenApply(nil -> {
inflightService.decreaseInflightCount(handle.consumerGroupId(), handle.topicId(), handle.queueId(), 1);
return new AckResult(AckResult.Status.SUCCESS);
}).exceptionally(throwable -> new AckResult(AckResult.Status.ERROR));
return operationLogService.logAckOperation(operation)
.thenApply(nil -> {
inflightService.decreaseInflightCount(handle.consumerGroupId(), handle.topicId(), handle.queueId(), 1);
return new AckResult(AckResult.Status.SUCCESS);
}).exceptionally(throwable -> new AckResult(AckResult.Status.ERROR));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.rocketmq.common.config.StoreConfig;
import com.automq.rocketmq.store.api.MessageStateMachine;
import com.automq.rocketmq.store.api.StreamStore;
import com.automq.rocketmq.store.exception.StoreErrorCode;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.operation.AckOperation;
import com.automq.rocketmq.store.model.operation.ChangeInvisibleDurationOperation;
Expand Down Expand Up @@ -87,7 +88,9 @@ public CompletableFuture<Void> recover(MessageStateMachine stateMachine, long op
replay(batchWithContext.baseOffset(), operation);
} catch (StoreException e) {
LOGGER.error("Topic {}, queue: {}: Replay operation:{} failed when recover", stateMachine.topicId(), stateMachine.queueId(), operation, e);
throw new CompletionException(e);
if (e.code() != StoreErrorCode.ILLEGAL_ARGUMENT) {
throw new CompletionException(e);
}
}
}
}));
Expand Down Expand Up @@ -122,7 +125,8 @@ public CompletableFuture<LogResult> logAckOperation(AckOperation operation) {
}

@Override
public CompletableFuture<LogResult> logChangeInvisibleDurationOperation(ChangeInvisibleDurationOperation operation) {
public CompletableFuture<LogResult> logChangeInvisibleDurationOperation(
ChangeInvisibleDurationOperation operation) {
return streamStore.append(operation.operationStreamId(),
new SingleRecord(ByteBuffer.wrap(SerializeUtil.encodeChangeInvisibleDurationOperation(operation))))
.thenApply(result -> {
Expand Down