Skip to content

Commit

Permalink
fix(controller): fix illegal basedatetTimestamp in commit wal (#535)
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <xuanyewang.cs@gmail.com>
  • Loading branch information
wangxye authored Nov 1, 2023
1 parent 4d50453 commit a9c47fb
Show file tree
Hide file tree
Showing 3 changed files with 907 additions and 842 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes) {
});
session.commit();
future.complete(next);
} catch (Exception e) {
LOGGER.error("PrepareS3Objects failed", e);
ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "PrepareS3Objects failed" + e.getMessage());
future.completeExceptionally(ex);
}
} else {
PrepareS3ObjectsRequest request = PrepareS3ObjectsRequest.newBuilder()
Expand Down Expand Up @@ -218,7 +222,7 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
object.setObjectId(s3StreamObject.getObjectId());
object.setCommittedTimestamp(new Date());
object.setStartOffset(s3StreamObject.getStartOffset());
object.setBaseDataTimestamp(new Date(s3StreamObject.getBaseDataTimestamp()));
object.setBaseDataTimestamp(new Date());
object.setEndOffset(s3StreamObject.getEndOffset());
object.setObjectSize(s3StreamObject.getObjectSize());
s3StreamObjectMapper.commit(object);
Expand Down Expand Up @@ -249,8 +253,10 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]",
brokerId, walObject.getObjectId(), compactedObjects, streamObjects);
future.complete(null);
} catch (InvalidProtocolBufferException e) {
future.completeExceptionally(e);
} catch (Exception e) {
LOGGER.error("CommitWalObject failed", e);
ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "CommitWalObject failed" + e.getMessage());
future.completeExceptionally(ex);
}
} else {
CommitWALObjectRequest request = CommitWALObjectRequest.newBuilder()
Expand Down Expand Up @@ -341,6 +347,10 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
session.commit();
LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}", streamObject.getObjectId(), compactedObjects);
future.complete(null);
} catch (Exception e) {
LOGGER.error("CommitStream failed", e);
ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "CommitStream failed" + e.getMessage());
future.completeExceptionally(ex);
}
} else {
CommitStreamObjectRequest request = CommitStreamObjectRequest.newBuilder()
Expand Down Expand Up @@ -740,6 +750,10 @@ public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long
LOGGER.info("Node[node-id={}] trim stream [stream-id={}] with epoch={} and newStartOffset={}",
metadataStore.config().nodeId(), streamId, streamEpoch, newStartOffset);
future.complete(null);
} catch (Exception e) {
LOGGER.error("TrimStream failed", e);
ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "TrimStream failed" + e.getMessage());
future.completeExceptionally(ex);
}
} else {
TrimStreamRequest request = TrimStreamRequest.newBuilder()
Expand Down
Loading

0 comments on commit a9c47fb

Please sign in to comment.