diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java index 523b19d8d..07a8280e7 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java @@ -99,6 +99,10 @@ public CompletableFuture prepareS3Objects(int count, int ttlInMinutes) { }); session.commit(); future.complete(next); + } catch (Exception e) { + LOGGER.error("PrepareS3Objects failed", e); + ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "PrepareS3Objects failed" + e.getMessage()); + future.completeExceptionally(ex); } } else { PrepareS3ObjectsRequest request = PrepareS3ObjectsRequest.newBuilder() @@ -218,7 +222,7 @@ public CompletableFuture commitWalObject(S3WALObject walObject, object.setObjectId(s3StreamObject.getObjectId()); object.setCommittedTimestamp(new Date()); object.setStartOffset(s3StreamObject.getStartOffset()); - object.setBaseDataTimestamp(new Date(s3StreamObject.getBaseDataTimestamp())); + object.setBaseDataTimestamp(new Date()); object.setEndOffset(s3StreamObject.getEndOffset()); object.setObjectSize(s3StreamObject.getObjectSize()); s3StreamObjectMapper.commit(object); @@ -249,8 +253,10 @@ public CompletableFuture commitWalObject(S3WALObject walObject, LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]", brokerId, walObject.getObjectId(), compactedObjects, streamObjects); future.complete(null); - } catch (InvalidProtocolBufferException e) { - future.completeExceptionally(e); + } catch (Exception e) { + LOGGER.error("CommitWalObject failed", e); + ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "CommitWalObject failed" + e.getMessage()); + future.completeExceptionally(ex); } } else { CommitWALObjectRequest request = CommitWALObjectRequest.newBuilder() @@ -341,6 +347,10 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. session.commit(); LOGGER.info("S3StreamObject[object-id={}] commit success, compacted objects: {}", streamObject.getObjectId(), compactedObjects); future.complete(null); + } catch (Exception e) { + LOGGER.error("CommitStream failed", e); + ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "CommitStream failed" + e.getMessage()); + future.completeExceptionally(ex); } } else { CommitStreamObjectRequest request = CommitStreamObjectRequest.newBuilder() @@ -740,6 +750,10 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long LOGGER.info("Node[node-id={}] trim stream [stream-id={}] with epoch={} and newStartOffset={}", metadataStore.config().nodeId(), streamId, streamEpoch, newStartOffset); future.complete(null); + } catch (Exception e) { + LOGGER.error("TrimStream failed", e); + ControllerException ex = new ControllerException(Code.INTERNAL_VALUE, "TrimStream failed" + e.getMessage()); + future.completeExceptionally(ex); } } else { TrimStreamRequest request = TrimStreamRequest.newBuilder() diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java index bf0ac68ed..1138ca5e2 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java @@ -23,17 +23,12 @@ import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.GroupStatus; import apache.rocketmq.controller.v1.GroupType; -import apache.rocketmq.controller.v1.S3ObjectState; -import apache.rocketmq.controller.v1.S3StreamObject; -import apache.rocketmq.controller.v1.S3WALObject; import apache.rocketmq.controller.v1.StreamMetadata; import apache.rocketmq.controller.v1.StreamRole; import apache.rocketmq.controller.v1.StreamState; -import apache.rocketmq.controller.v1.SubStream; import apache.rocketmq.controller.v1.TopicStatus; import apache.rocketmq.controller.v1.MessageType; import apache.rocketmq.controller.v1.UpdateTopicRequest; -import com.automq.rocketmq.common.system.StreamConstants; import com.automq.rocketmq.controller.exception.ControllerException; import com.automq.rocketmq.controller.metadata.ControllerClient; import com.automq.rocketmq.controller.metadata.DatabaseTestBase; @@ -43,8 +38,6 @@ import com.automq.rocketmq.controller.metadata.database.dao.Lease; import com.automq.rocketmq.controller.metadata.database.dao.Node; import com.automq.rocketmq.controller.metadata.database.dao.QueueAssignment; -import com.automq.rocketmq.controller.metadata.database.dao.S3Object; -import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject; import com.automq.rocketmq.controller.metadata.database.dao.Stream; import com.automq.rocketmq.controller.metadata.database.dao.Range; import com.automq.rocketmq.controller.metadata.database.dao.StreamCriteria; @@ -53,9 +46,6 @@ import com.automq.rocketmq.controller.metadata.database.mapper.NodeMapper; import com.automq.rocketmq.controller.metadata.database.mapper.QueueAssignmentMapper; import com.automq.rocketmq.controller.metadata.database.mapper.RangeMapper; -import com.automq.rocketmq.controller.metadata.database.mapper.S3ObjectMapper; -import com.automq.rocketmq.controller.metadata.database.mapper.S3StreamObjectMapper; -import com.automq.rocketmq.controller.metadata.database.mapper.S3WalObjectMapper; import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper; import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper; @@ -63,15 +53,11 @@ import com.google.protobuf.util.JsonFormat; import java.io.IOException; import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.Pair; import org.apache.ibatis.session.SqlSession; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; @@ -499,349 +485,6 @@ public void testMarkMessageQueueAssignable() throws IOException, ExecutionExcept } } - @Test - public void testListStreamObjects() throws IOException, ExecutionException, InterruptedException { - long startOffset = 0L, interval = 1000L, endOffset; - int limit = 1; - long streamId; - try (SqlSession session = getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - List s3StreamObjects = buildS3StreamObjs(1, 1, startOffset, interval); - s3StreamObjects.forEach(s3StreamObjectMapper::create); - streamId = s3StreamObjects.get(0).getStreamId(); - endOffset = s3StreamObjects.get(0).getEndOffset(); - session.commit(); - } - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - List s3StreamObjects = metadataStore.listStreamObjects(streamId, startOffset, endOffset, limit).get(); - S3StreamObject s3StreamObject = s3StreamObjects.get(0); - Assertions.assertEquals(1, s3StreamObject.getObjectId()); - Assertions.assertEquals(100, s3StreamObject.getObjectSize()); - Assertions.assertEquals(1, s3StreamObject.getStreamId()); - Assertions.assertEquals(startOffset, s3StreamObject.getStartOffset()); - Assertions.assertEquals(endOffset, s3StreamObject.getEndOffset()); - } - } - - @Test - public void testListWALObjects_WithPrams() throws IOException, ExecutionException, InterruptedException { - long streamId, startOffset, endOffset; - streamId = 1; - startOffset = 0L; - endOffset = 9L; - int limit = 1; - - try (SqlSession session = getSessionFactory().openSession()) { - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - Range range = new Range(); - range.setStreamId(streamId); - range.setRangeId(0); - range.setStartOffset(startOffset); - range.setEndOffset(endOffset); - range.setEpoch(1L); - range.setNodeId(1); - rangeMapper.create(range); - - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - buildS3WalObjs(1, 1).stream().peek(s3WalObject -> { - Map subStreams = buildWalSubStreams(4, 0, 10); - s3WalObject.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - session.commit(); - } - - Map expectedSubStream = buildWalSubStreams(1, 0, 10); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - List s3WALObjects = metadataStore.listWALObjects(streamId, startOffset, endOffset, limit).get(); - - Assertions.assertFalse(s3WALObjects.isEmpty()); - S3WALObject s3WALObject = s3WALObjects.get(0); - Assertions.assertEquals(1, s3WALObject.getObjectId()); - Assertions.assertEquals(100, s3WALObject.getObjectSize()); - Assertions.assertEquals(1, s3WALObject.getBrokerId()); - Assertions.assertEquals(1, s3WALObject.getSequenceId()); - Assertions.assertEquals(expectedSubStream, s3WALObject.getSubStreams().getSubStreamsMap()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - s3WALObjectMapper.delete(123L, 1, null); - session.commit(); - } - } - - @Test - public void testListWALObjects_NotParams() throws IOException, ExecutionException, InterruptedException { - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - - buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { - Map subStreams = buildWalSubStreams(4, 0, 10); - s3WalObject1.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - session.commit(); - } - - Map subStreams = buildWalSubStreams(4, 0, 10); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - List s3WALObjects = metadataStore.listWALObjects().get(); - - Assertions.assertFalse(s3WALObjects.isEmpty()); - S3WALObject s3WALObject = s3WALObjects.get(0); - Assertions.assertEquals(1, s3WALObject.getObjectId()); - Assertions.assertEquals(100, s3WALObject.getObjectSize()); - Assertions.assertEquals(1, s3WALObject.getBrokerId()); - Assertions.assertEquals(1, s3WALObject.getSequenceId()); - Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - s3WALObjectMapper.delete(123L, 1, null); - session.commit(); - } - } - - @Test - public void testListObjects_OnlyStream() throws IOException, ExecutionException, InterruptedException { - long startOffset, endOffset; - startOffset = 0L; - endOffset = 9L; - int limit = 3; - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - - buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { - Map subStreams = buildWalSubStreams(4, 10, 10); - s3WalObject1.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); - session.commit(); - } - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - Pair, List> listPair = metadataStore.listObjects(1, startOffset, endOffset, limit).get(); - - Assertions.assertFalse(listPair.getLeft().isEmpty()); - Assertions.assertTrue(listPair.getRight().isEmpty()); - S3StreamObject s3StreamObject = listPair.getLeft().get(0); - Assertions.assertEquals(5, s3StreamObject.getObjectId()); - Assertions.assertEquals(100, s3StreamObject.getObjectSize()); - Assertions.assertEquals(1, s3StreamObject.getStreamId()); - Assertions.assertEquals(0, s3StreamObject.getStartOffset()); - Assertions.assertEquals(10, s3StreamObject.getEndOffset()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - s3StreamObjectMapper.delete(null, 1L, 122L); - session.commit(); - } - } - - @Test - public void testListObjects_OnlyWAL() throws IOException, ExecutionException, InterruptedException { - long streamId, startOffset, endOffset; - streamId = 1; - startOffset = 11L; - endOffset = 19L; - int limit = 3; - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { - Map subStreams = buildWalSubStreams(4, 10, 10); - s3WalObject1.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); - session.commit(); - } - - Map subStreams = buildWalSubStreams(1, 10, 10); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); - - Assertions.assertTrue(listPair.getLeft().isEmpty()); - Assertions.assertFalse(listPair.getRight().isEmpty()); - - S3WALObject s3WALObject = listPair.getRight().get(0); - Assertions.assertEquals(1, s3WALObject.getObjectId()); - Assertions.assertEquals(100, s3WALObject.getObjectSize()); - Assertions.assertEquals(1, s3WALObject.getBrokerId()); - Assertions.assertEquals(1, s3WALObject.getSequenceId()); - Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - s3WALObjectMapper.delete(123L, 1, null); - session.commit(); - } - } - - @Test - public void testListObjects_Both() throws IOException, ExecutionException, InterruptedException { - long streamId, startOffset, endOffset; - streamId = 1; - startOffset = 0L; - endOffset = 21L; - int limit = 3; - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { - Map subStreams = buildWalSubStreams(4, 10, 10); - s3WalObject1.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); - - session.commit(); - } - - Map subStreams = buildWalSubStreams(1, 10, 10); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); - - Assertions.assertFalse(listPair.getLeft().isEmpty()); - Assertions.assertFalse(listPair.getRight().isEmpty()); - S3StreamObject s3StreamObject = listPair.getLeft().get(0); - Assertions.assertEquals(5, s3StreamObject.getObjectId()); - Assertions.assertEquals(100, s3StreamObject.getObjectSize()); - Assertions.assertEquals(streamId, s3StreamObject.getStreamId()); - Assertions.assertEquals(0, s3StreamObject.getStartOffset()); - Assertions.assertEquals(10, s3StreamObject.getEndOffset()); - - S3WALObject s3WALObject = listPair.getRight().get(0); - Assertions.assertEquals(1, s3WALObject.getObjectId()); - Assertions.assertEquals(100, s3WALObject.getObjectSize()); - Assertions.assertEquals(1, s3WALObject.getBrokerId()); - Assertions.assertEquals(1, s3WALObject.getSequenceId()); - Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - s3WALObjectMapper.delete(123L, 1, null); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - s3StreamObjectMapper.delete(null, streamId, 122L); - session.commit(); - } - } - - @Test - public void testListObjects_Both_Interleaved() throws IOException, ExecutionException, InterruptedException { - long streamId, startOffset, endOffset; - streamId = 1; - startOffset = 0L; - endOffset = 40L; - int limit = 3; - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - buildS3WalObjs(1, 1).stream().peek(s3WalObject -> { - Map subStreams = buildWalSubStreams(1, 0, 10); - s3WalObject.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - buildS3WalObjs(2, 1).stream().peek(s3WalObject -> { - Map subStreams = buildWalSubStreams(1, 20, 20); - s3WalObject.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(5, 1, 10, 10).forEach(s3StreamObjectMapper::create); - buildS3StreamObjs(6, 1, 40, 10).forEach(s3StreamObjectMapper::create); - - session.commit(); - } - - Map subStreams1 = buildWalSubStreams(1, 0, 10); - Map subStreams2 = buildWalSubStreams(1, 20, 20); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); - - Assertions.assertFalse(listPair.getLeft().isEmpty()); - Assertions.assertFalse(listPair.getRight().isEmpty()); - List s3StreamObjects = listPair.getLeft(); - Assertions.assertEquals(1, s3StreamObjects.size()); - S3StreamObject s3StreamObject = s3StreamObjects.get(0); - - Assertions.assertEquals(5, s3StreamObject.getObjectId()); - Assertions.assertEquals(100, s3StreamObject.getObjectSize()); - Assertions.assertEquals(streamId, s3StreamObject.getStreamId()); - Assertions.assertEquals(10, s3StreamObject.getStartOffset()); - Assertions.assertEquals(20, s3StreamObject.getEndOffset()); - - List s3WALObjects = listPair.getRight(); - Assertions.assertEquals(2, s3WALObjects.size()); - S3WALObject s3WALObject = s3WALObjects.get(0); - - Assertions.assertEquals(1, s3WALObject.getObjectId()); - Assertions.assertEquals(100, s3WALObject.getObjectSize()); - Assertions.assertEquals(1, s3WALObject.getBrokerId()); - Assertions.assertEquals(1, s3WALObject.getSequenceId()); - Assertions.assertEquals(subStreams1, s3WALObject.getSubStreams().getSubStreamsMap()); - - S3WALObject s3WALObject1 = s3WALObjects.get(1); - Assertions.assertEquals(2, s3WALObject1.getObjectId()); - Assertions.assertEquals(100, s3WALObject1.getObjectSize()); - Assertions.assertEquals(1, s3WALObject1.getBrokerId()); - Assertions.assertEquals(2, s3WALObject1.getSequenceId()); - Assertions.assertEquals(subStreams2, s3WALObject1.getSubStreams().getSubStreamsMap()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - s3WALObjectMapper.delete(1L, 1, null); - s3WALObjectMapper.delete(2L, 2, null); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - s3StreamObjectMapper.delete(null, streamId, 122L); - s3StreamObjectMapper.delete(null, streamId, 121L); - session.commit(); - } - } - @Test public void testOpenStream_WithCloseStream_AtStart() throws IOException, ExecutionException, InterruptedException { @@ -1360,65 +1003,6 @@ public void testListOpenStreams() throws IOException, ExecutionException, Interr } } - @Test - public void testTrimStream() throws IOException { - long streamId, streamEpoch = 1, newStartOffset = 2000; - int nodeId = 1, rangId = 0; - try (SqlSession session = this.getSessionFactory().openSession()) { - StreamMapper streamMapper = session.getMapper(StreamMapper.class); - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - - com.automq.rocketmq.controller.metadata.database.dao.Stream stream = new com.automq.rocketmq.controller.metadata.database.dao.Stream(); - stream.setSrcNodeId(nodeId); - stream.setDstNodeId(nodeId); - stream.setStartOffset(1234L); - stream.setEpoch(0L); - stream.setRangeId(rangId); - stream.setTopicId(1L); - stream.setQueueId(2); - stream.setState(StreamState.OPEN); - stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); - streamMapper.create(stream); - streamId = stream.getId(); - - Range range = new Range(); - range.setRangeId(rangId); - range.setStreamId(streamId); - range.setEpoch(0L); - range.setStartOffset(1234L); - range.setEndOffset(2345L); - range.setNodeId(nodeId); - rangeMapper.create(range); - - session.commit(); - } - - try (MetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - metadataStore.start(); - Awaitility.await().with().atMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS) - .until(metadataStore::isLeader); - - metadataStore.trimStream(streamId, streamEpoch, newStartOffset); - } - - try (SqlSession session = this.getSessionFactory().openSession()) { - StreamMapper streamMapper = session.getMapper(StreamMapper.class); - Stream stream = streamMapper.getByStreamId(streamId); - Assertions.assertEquals(newStartOffset, stream.getStartOffset()); - Assertions.assertEquals(nodeId, stream.getSrcNodeId()); - Assertions.assertEquals(nodeId, stream.getDstNodeId()); - Assertions.assertEquals(0, stream.getRangeId()); - - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - Range range = rangeMapper.get(rangId, streamId, null); - Assertions.assertEquals(newStartOffset, range.getStartOffset()); - Assertions.assertEquals(2345, range.getEndOffset()); - Assertions.assertEquals(nodeId, range.getNodeId()); - Assertions.assertEquals(streamId, range.getStreamId()); - } - - } - @Test public void testListOpenStream() throws IOException, ExecutionException, InterruptedException { long streamId; @@ -1527,429 +1111,6 @@ public void testListOpenStream_NotFound() throws IOException, ExecutionException } } - @Test - public void testPrepareS3Objects() throws IOException { - long objectId; - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - objectId = metadataStore.prepareS3Objects(3, 5).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - for (long index = objectId; index < objectId + 3; index++) { - S3Object object = s3ObjectMapper.getById(index); - Assertions.assertEquals(S3ObjectState.BOS_PREPARED, object.getState()); - } - } - } - - @Test - public void testCommitStreamObject() throws IOException, ControllerException { - long objectId, streamId = 1; - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - objectId = metadataStore.prepareS3Objects(3, 5).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - S3StreamObject news3StreamObject = S3StreamObject.newBuilder() - .setObjectId(objectId + 2) - .setStreamId(streamId) - .setObjectSize(111L) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(objectId, 2, 3, 100L).forEach(s3StreamObjectMapper::create); - session.commit(); - } - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - List compactedObjects = new ArrayList<>(); - compactedObjects.add(objectId); - compactedObjects.add(objectId + 1); - metadataStore.commitStreamObject(news3StreamObject, compactedObjects); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - S3Object s3Object = s3ObjectMapper.getById(objectId); - Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, s3Object.getState()); - - S3Object s3Object1 = s3ObjectMapper.getById(objectId); - Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, s3Object1.getState()); - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - for (long index = objectId; index < objectId + 2; index++) { - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); - Assertions.assertNull(object); - } - - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertEquals(111L, object.getObjectSize()); - Assertions.assertEquals(streamId, object.getStreamId()); - Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); - Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); - } - } - - @Test - public void testCommitStreamObject_NoCompacted() throws IOException, ControllerException { - long objectId, streamId = 1; - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - objectId = metadataStore.prepareS3Objects(3, 5).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - S3StreamObject news3StreamObject = S3StreamObject.newBuilder() - .setObjectId(objectId + 2) - .setStreamId(streamId) - .setObjectSize(111L) - .build(); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - metadataStore.commitStreamObject(news3StreamObject, Collections.emptyList()); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - S3Object s3Object = s3ObjectMapper.getById(objectId + 2); - Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); - - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); - Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); - Assertions.assertEquals(111L, object.getObjectSize()); - Assertions.assertEquals(streamId, object.getStreamId()); - } - } - - @Test - public void testCommitStreamObject_ObjectNotExist() throws IOException { - long streamId = 1; - - S3StreamObject s3StreamObject = S3StreamObject.newBuilder() - .setObjectId(1) - .setStreamId(streamId) - .setObjectSize(111L) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(1, 1, 100L, 100L).forEach(s3StreamObjectMapper::create); - } - - List compactedObjects = new ArrayList<>(); - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitStreamObject(s3StreamObject, compactedObjects).get()); - } - - } - - @Test - public void testCommitStreamObject_StreamNotExist() throws IOException { - long streamId = 1; - - S3StreamObject s3StreamObject = S3StreamObject.newBuilder() - .setObjectId(-1) - .setStreamId(streamId) - .setObjectSize(111L) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - buildS3StreamObjs(1, 1, 100L, 100L).forEach(s3StreamObjectMapper::create); - } - - List compactedObjects = new ArrayList<>(); - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitStreamObject(s3StreamObject, compactedObjects).get()); - } - - } - - @Test - public void testCommitWALObject() throws IOException, ExecutionException, InterruptedException { - long objectId; - int nodeId = 1; - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - objectId = metadataStore.prepareS3Objects(5, 5).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - S3WALObject walObject = S3WALObject.newBuilder() - .setObjectId(objectId + 4) - .setObjectSize(222L) - .setBrokerId(nodeId) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - - buildS3WalObjs(objectId + 2, 1).stream().peek(s3WalObject -> { - Map subStreams = buildWalSubStreams(1, 20L, 10L); - s3WalObject.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - buildS3WalObjs(objectId + 3, 1).stream().peek(s3WalObject -> { - Map subStreams = buildWalSubStreams(1, 30L, 10L); - s3WalObject.setSubStreams(toJson(subStreams)); - }).forEach(s3WALObjectMapper::create); - - session.commit(); - } - - List compactedObjects = new ArrayList<>(); - compactedObjects.add(objectId + 2); - compactedObjects.add(objectId + 3); - - List s3StreamObjects = buildS3StreamObjs(objectId, 2, 0, 10) - .stream().map(s3StreamObject2 -> S3StreamObject.newBuilder() - .setObjectId(s3StreamObject2.getObjectId()) - .setStreamId(s3StreamObject2.getStreamId()) - .setObjectSize(s3StreamObject2.getObjectSize()) - .setBaseDataTimestamp(s3StreamObject2.getBaseDataTimestamp().getTime()) - .setStartOffset(s3StreamObject2.getStartOffset()) - .setEndOffset(s3StreamObject2.getEndOffset()) - .build()).toList(); - - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get(); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); - for (long index = objectId; index < objectId + 2; index++) { - S3Object object = s3ObjectMapper.getById(index); - Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, object.getState()); - } - - for (long index = objectId + 2; index < objectId + 4; index++) { - S3Object object = s3ObjectMapper.getById(index); - Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, object.getState()); - } - - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - for (long index = objectId; index < objectId + 2; index++) { - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); - Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); - } - - S3Object s3Object = s3ObjectMapper.getById(objectId + 4); - Assertions.assertEquals(222L, s3Object.getObjectSize()); - Assertions.assertEquals(StreamConstants.NOOP_STREAM_ID, s3Object.getStreamId()); - - S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); - S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); - Assertions.assertEquals(objectId + 2, object.getSequenceId()); - Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); - Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); - } - } - - @Test - public void testCommitWalObject_ObjectNotPrepare() throws IOException, ExecutionException, InterruptedException { - long streamId, startOffset = 0, endOffset = 10; - Integer nodeId = 1; - - S3WALObject walObject = S3WALObject.newBuilder() - .setObjectId(3) - .setBrokerId(-1) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - StreamMapper streamMapper = session.getMapper(StreamMapper.class); - Stream stream = new Stream(); - stream.setState(StreamState.OPEN); - stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); - stream.setTopicId(2L); - stream.setRangeId(1); - stream.setEpoch(3L); - stream.setTopicId(1L); - stream.setQueueId(2); - streamMapper.create(stream); - streamId = stream.getId(); - Range range = new Range(); - - range.setStreamId(streamId); - range.setRangeId(1); - range.setEpoch(3L); - range.setStartOffset(0L); - range.setEndOffset(0L); - range.setNodeId(nodeId); - rangeMapper.create(range); - - buildS3StreamObjs(1, 1, 0L, 100L).forEach(s3StreamObjectMapper::create); - session.commit(); - } - - List compactedObjects = new ArrayList<>(); - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - Assertions.assertNull(metadataStore.getLease()); - Lease lease = new Lease(); - lease.setNodeId(config.nodeId()); - metadataStore.setLease(lease); - metadataStore.setRole(Role.Leader); - - List s3StreamObjects = metadataStore.listStreamObjects(streamId, startOffset, endOffset, 2).get(); - Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get()); - } - - } - - @Test - public void testCommitWalObject_WalNotExist() throws IOException, ExecutionException, InterruptedException { - long streamId; - int nodeId = 1; - long objectId; - Calendar calendar = Calendar.getInstance(); - - S3WALObject walObject = S3WALObject.newBuilder() - .setObjectId(-1) - .setSequenceId(-1) - .setBrokerId(-1) - .build(); - - try (SqlSession session = this.getSessionFactory().openSession()) { - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - StreamMapper streamMapper = session.getMapper(StreamMapper.class); - Stream stream = new Stream(); - stream.setState(StreamState.OPEN); - stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); - stream.setTopicId(2L); - stream.setRangeId(1); - stream.setEpoch(3L); - stream.setTopicId(1L); - stream.setQueueId(2); - streamMapper.create(stream); - streamId = stream.getId(); - Range range = new Range(); - - range.setStreamId(streamId); - range.setRangeId(1); - range.setEpoch(3L); - range.setStartOffset(0L); - range.setEndOffset(0L); - range.setNodeId(nodeId); - rangeMapper.create(range); - - S3ObjectMapper objectMapper = session.getMapper(S3ObjectMapper.class); - S3Object s3Object = new S3Object(); - s3Object.setId(nextS3ObjectId()); - s3Object.setState(S3ObjectState.BOS_PREPARED); - s3Object.setStreamId(streamId); - s3Object.setObjectSize(2139L); - - calendar.add(Calendar.HOUR, 1); - s3Object.setExpiredTimestamp(calendar.getTime()); - objectMapper.prepare(s3Object); - objectId = s3Object.getId(); - - session.commit(); - } - - List compactedObjects = new ArrayList<>(); - try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { - metadataStore.start(); - Awaitility.await().with() - .pollInterval(100, TimeUnit.MILLISECONDS) - .atMost(10, TimeUnit.SECONDS) - .until(metadataStore::isLeader); - - calendar.add(Calendar.HOUR, 2); - S3StreamObject streamObject = S3StreamObject.newBuilder() - .setObjectId(objectId) - .setStreamId(streamId) - .setBaseDataTimestamp(calendar.getTimeInMillis()) - .setStartOffset(0) - .setEndOffset(2) - .setObjectSize(2139) - .build(); - - List s3StreamObjects = new ArrayList<>(); - s3StreamObjects.add(streamObject); - metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get(); - } - - try (SqlSession session = getSessionFactory().openSession()) { - S3StreamObjectMapper mapper = session.getMapper(S3StreamObjectMapper.class); - com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = mapper.getByObjectId(objectId); - Assertions.assertTrue(s3StreamObject.getCommittedTimestamp().getTime() > 0); - - S3ObjectMapper objectMapper = session.getMapper(S3ObjectMapper.class); - S3Object s3Object = objectMapper.getById(objectId); - Assertions.assertNotNull(s3Object.getCommittedTimestamp()); - Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); - - RangeMapper rangeMapper = session.getMapper(RangeMapper.class); - Range range = rangeMapper.get(1, streamId, null); - Assertions.assertEquals(2, range.getEndOffset()); - } - - } - @Test public void testConsumerOffset() throws IOException, ExecutionException, InterruptedException { long groupId = 2, topicId = 1; diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManagerTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManagerTest.java new file mode 100644 index 000000000..046e56b9b --- /dev/null +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManagerTest.java @@ -0,0 +1,890 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.automq.rocketmq.controller.metadata.database; + +import apache.rocketmq.controller.v1.S3ObjectState; +import apache.rocketmq.controller.v1.S3StreamObject; +import apache.rocketmq.controller.v1.S3WALObject; +import apache.rocketmq.controller.v1.StreamRole; +import apache.rocketmq.controller.v1.StreamState; +import apache.rocketmq.controller.v1.SubStream; +import com.automq.rocketmq.common.system.StreamConstants; +import com.automq.rocketmq.controller.exception.ControllerException; +import com.automq.rocketmq.controller.metadata.ControllerClient; +import com.automq.rocketmq.controller.metadata.DatabaseTestBase; +import com.automq.rocketmq.controller.metadata.MetadataStore; +import com.automq.rocketmq.controller.metadata.Role; +import com.automq.rocketmq.controller.metadata.database.dao.Lease; +import com.automq.rocketmq.controller.metadata.database.dao.Range; +import com.automq.rocketmq.controller.metadata.database.dao.S3Object; +import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject; +import com.automq.rocketmq.controller.metadata.database.dao.Stream; +import com.automq.rocketmq.controller.metadata.database.mapper.RangeMapper; +import com.automq.rocketmq.controller.metadata.database.mapper.S3ObjectMapper; +import com.automq.rocketmq.controller.metadata.database.mapper.S3StreamObjectMapper; +import com.automq.rocketmq.controller.metadata.database.mapper.S3WalObjectMapper; +import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.ibatis.session.SqlSession; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class S3MetadataManagerTest extends DatabaseTestBase { + + ControllerClient client; + + public S3MetadataManagerTest() { + this.client = Mockito.mock(ControllerClient.class); + } + + @Test + public void testListStreamObjects() throws IOException, ExecutionException, InterruptedException { + long startOffset = 0L, interval = 1000L, endOffset; + int limit = 1; + long streamId; + try (SqlSession session = getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + List s3StreamObjects = buildS3StreamObjs(1, 1, startOffset, interval); + s3StreamObjects.forEach(s3StreamObjectMapper::create); + streamId = s3StreamObjects.get(0).getStreamId(); + endOffset = s3StreamObjects.get(0).getEndOffset(); + session.commit(); + } + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + List s3StreamObjects = metadataStore.listStreamObjects(streamId, startOffset, endOffset, limit).get(); + S3StreamObject s3StreamObject = s3StreamObjects.get(0); + Assertions.assertEquals(1, s3StreamObject.getObjectId()); + Assertions.assertEquals(100, s3StreamObject.getObjectSize()); + Assertions.assertEquals(1, s3StreamObject.getStreamId()); + Assertions.assertEquals(startOffset, s3StreamObject.getStartOffset()); + Assertions.assertEquals(endOffset, s3StreamObject.getEndOffset()); + } + } + + @Test + public void testListWALObjects_WithPrams() throws IOException, ExecutionException, InterruptedException { + long streamId, startOffset, endOffset; + streamId = 1; + startOffset = 0L; + endOffset = 9L; + int limit = 1; + + try (SqlSession session = getSessionFactory().openSession()) { + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + Range range = new Range(); + range.setStreamId(streamId); + range.setRangeId(0); + range.setStartOffset(startOffset); + range.setEndOffset(endOffset); + range.setEpoch(1L); + range.setNodeId(1); + rangeMapper.create(range); + + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + buildS3WalObjs(1, 1).stream().peek(s3WalObject -> { + Map subStreams = buildWalSubStreams(4, 0, 10); + s3WalObject.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + session.commit(); + } + + Map expectedSubStream = buildWalSubStreams(1, 0, 10); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + List s3WALObjects = metadataStore.listWALObjects(streamId, startOffset, endOffset, limit).get(); + + Assertions.assertFalse(s3WALObjects.isEmpty()); + S3WALObject s3WALObject = s3WALObjects.get(0); + Assertions.assertEquals(1, s3WALObject.getObjectId()); + Assertions.assertEquals(100, s3WALObject.getObjectSize()); + Assertions.assertEquals(1, s3WALObject.getBrokerId()); + Assertions.assertEquals(1, s3WALObject.getSequenceId()); + Assertions.assertEquals(expectedSubStream, s3WALObject.getSubStreams().getSubStreamsMap()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + s3WALObjectMapper.delete(123L, 1, null); + session.commit(); + } + } + + @Test + public void testListWALObjects_NotParams() throws IOException, ExecutionException, InterruptedException { + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + + buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { + Map subStreams = buildWalSubStreams(4, 0, 10); + s3WalObject1.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + session.commit(); + } + + Map subStreams = buildWalSubStreams(4, 0, 10); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + List s3WALObjects = metadataStore.listWALObjects().get(); + + Assertions.assertFalse(s3WALObjects.isEmpty()); + S3WALObject s3WALObject = s3WALObjects.get(0); + Assertions.assertEquals(1, s3WALObject.getObjectId()); + Assertions.assertEquals(100, s3WALObject.getObjectSize()); + Assertions.assertEquals(1, s3WALObject.getBrokerId()); + Assertions.assertEquals(1, s3WALObject.getSequenceId()); + Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + s3WALObjectMapper.delete(123L, 1, null); + session.commit(); + } + } + + @Test + public void testListObjects_OnlyStream() throws IOException, ExecutionException, InterruptedException { + long startOffset, endOffset; + startOffset = 0L; + endOffset = 9L; + int limit = 3; + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + + buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { + Map subStreams = buildWalSubStreams(4, 10, 10); + s3WalObject1.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); + session.commit(); + } + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + Pair, List> listPair = metadataStore.listObjects(1, startOffset, endOffset, limit).get(); + + Assertions.assertFalse(listPair.getLeft().isEmpty()); + Assertions.assertTrue(listPair.getRight().isEmpty()); + S3StreamObject s3StreamObject = listPair.getLeft().get(0); + Assertions.assertEquals(5, s3StreamObject.getObjectId()); + Assertions.assertEquals(100, s3StreamObject.getObjectSize()); + Assertions.assertEquals(1, s3StreamObject.getStreamId()); + Assertions.assertEquals(0, s3StreamObject.getStartOffset()); + Assertions.assertEquals(10, s3StreamObject.getEndOffset()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + s3StreamObjectMapper.delete(null, 1L, 122L); + session.commit(); + } + } + + @Test + public void testListObjects_OnlyWAL() throws IOException, ExecutionException, InterruptedException { + long streamId, startOffset, endOffset; + streamId = 1; + startOffset = 11L; + endOffset = 19L; + int limit = 3; + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { + Map subStreams = buildWalSubStreams(4, 10, 10); + s3WalObject1.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); + session.commit(); + } + + Map subStreams = buildWalSubStreams(1, 10, 10); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); + + Assertions.assertTrue(listPair.getLeft().isEmpty()); + Assertions.assertFalse(listPair.getRight().isEmpty()); + + S3WALObject s3WALObject = listPair.getRight().get(0); + Assertions.assertEquals(1, s3WALObject.getObjectId()); + Assertions.assertEquals(100, s3WALObject.getObjectSize()); + Assertions.assertEquals(1, s3WALObject.getBrokerId()); + Assertions.assertEquals(1, s3WALObject.getSequenceId()); + Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + s3WALObjectMapper.delete(123L, 1, null); + session.commit(); + } + } + + @Test + public void testListObjects_Both() throws IOException, ExecutionException, InterruptedException { + long streamId, startOffset, endOffset; + streamId = 1; + startOffset = 0L; + endOffset = 21L; + int limit = 3; + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + buildS3WalObjs(1, 1).stream().peek(s3WalObject1 -> { + Map subStreams = buildWalSubStreams(4, 10, 10); + s3WalObject1.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(5, 1, 0, 10).forEach(s3StreamObjectMapper::create); + + session.commit(); + } + + Map subStreams = buildWalSubStreams(1, 10, 10); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); + + Assertions.assertFalse(listPair.getLeft().isEmpty()); + Assertions.assertFalse(listPair.getRight().isEmpty()); + S3StreamObject s3StreamObject = listPair.getLeft().get(0); + Assertions.assertEquals(5, s3StreamObject.getObjectId()); + Assertions.assertEquals(100, s3StreamObject.getObjectSize()); + Assertions.assertEquals(streamId, s3StreamObject.getStreamId()); + Assertions.assertEquals(0, s3StreamObject.getStartOffset()); + Assertions.assertEquals(10, s3StreamObject.getEndOffset()); + + S3WALObject s3WALObject = listPair.getRight().get(0); + Assertions.assertEquals(1, s3WALObject.getObjectId()); + Assertions.assertEquals(100, s3WALObject.getObjectSize()); + Assertions.assertEquals(1, s3WALObject.getBrokerId()); + Assertions.assertEquals(1, s3WALObject.getSequenceId()); + Assertions.assertEquals(subStreams, s3WALObject.getSubStreams().getSubStreamsMap()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + s3WALObjectMapper.delete(123L, 1, null); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + s3StreamObjectMapper.delete(null, streamId, 122L); + session.commit(); + } + } + + @Test + public void testListObjects_Both_Interleaved() throws IOException, ExecutionException, InterruptedException { + long streamId, startOffset, endOffset; + streamId = 1; + startOffset = 0L; + endOffset = 40L; + int limit = 3; + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + buildS3WalObjs(1, 1).stream().peek(s3WalObject -> { + Map subStreams = buildWalSubStreams(1, 0, 10); + s3WalObject.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + buildS3WalObjs(2, 1).stream().peek(s3WalObject -> { + Map subStreams = buildWalSubStreams(1, 20, 20); + s3WalObject.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(5, 1, 10, 10).forEach(s3StreamObjectMapper::create); + buildS3StreamObjs(6, 1, 40, 10).forEach(s3StreamObjectMapper::create); + + session.commit(); + } + + Map subStreams1 = buildWalSubStreams(1, 0, 10); + Map subStreams2 = buildWalSubStreams(1, 20, 20); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + Pair, List> listPair = metadataStore.listObjects(streamId, startOffset, endOffset, limit).get(); + + Assertions.assertFalse(listPair.getLeft().isEmpty()); + Assertions.assertFalse(listPair.getRight().isEmpty()); + List s3StreamObjects = listPair.getLeft(); + Assertions.assertEquals(1, s3StreamObjects.size()); + S3StreamObject s3StreamObject = s3StreamObjects.get(0); + + Assertions.assertEquals(5, s3StreamObject.getObjectId()); + Assertions.assertEquals(100, s3StreamObject.getObjectSize()); + Assertions.assertEquals(streamId, s3StreamObject.getStreamId()); + Assertions.assertEquals(10, s3StreamObject.getStartOffset()); + Assertions.assertEquals(20, s3StreamObject.getEndOffset()); + + List s3WALObjects = listPair.getRight(); + Assertions.assertEquals(2, s3WALObjects.size()); + S3WALObject s3WALObject = s3WALObjects.get(0); + + Assertions.assertEquals(1, s3WALObject.getObjectId()); + Assertions.assertEquals(100, s3WALObject.getObjectSize()); + Assertions.assertEquals(1, s3WALObject.getBrokerId()); + Assertions.assertEquals(1, s3WALObject.getSequenceId()); + Assertions.assertEquals(subStreams1, s3WALObject.getSubStreams().getSubStreamsMap()); + + S3WALObject s3WALObject1 = s3WALObjects.get(1); + Assertions.assertEquals(2, s3WALObject1.getObjectId()); + Assertions.assertEquals(100, s3WALObject1.getObjectSize()); + Assertions.assertEquals(1, s3WALObject1.getBrokerId()); + Assertions.assertEquals(2, s3WALObject1.getSequenceId()); + Assertions.assertEquals(subStreams2, s3WALObject1.getSubStreams().getSubStreamsMap()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + s3WALObjectMapper.delete(1L, 1, null); + s3WALObjectMapper.delete(2L, 2, null); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + s3StreamObjectMapper.delete(null, streamId, 122L); + s3StreamObjectMapper.delete(null, streamId, 121L); + session.commit(); + } + } + + @Test + public void testTrimStream() throws IOException { + long streamId, streamEpoch = 1, newStartOffset = 2000; + int nodeId = 1, rangId = 0; + try (SqlSession session = this.getSessionFactory().openSession()) { + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + + com.automq.rocketmq.controller.metadata.database.dao.Stream stream = new com.automq.rocketmq.controller.metadata.database.dao.Stream(); + stream.setSrcNodeId(nodeId); + stream.setDstNodeId(nodeId); + stream.setStartOffset(1234L); + stream.setEpoch(0L); + stream.setRangeId(rangId); + stream.setTopicId(1L); + stream.setQueueId(2); + stream.setState(StreamState.OPEN); + stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); + streamMapper.create(stream); + streamId = stream.getId(); + + Range range = new Range(); + range.setRangeId(rangId); + range.setStreamId(streamId); + range.setEpoch(0L); + range.setStartOffset(1234L); + range.setEndOffset(2345L); + range.setNodeId(nodeId); + rangeMapper.create(range); + + session.commit(); + } + + try (MetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + metadataStore.start(); + Awaitility.await().with().atMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .until(metadataStore::isLeader); + + metadataStore.trimStream(streamId, streamEpoch, newStartOffset); + } + + try (SqlSession session = this.getSessionFactory().openSession()) { + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + Stream stream = streamMapper.getByStreamId(streamId); + Assertions.assertEquals(newStartOffset, stream.getStartOffset()); + Assertions.assertEquals(nodeId, stream.getSrcNodeId()); + Assertions.assertEquals(nodeId, stream.getDstNodeId()); + Assertions.assertEquals(0, stream.getRangeId()); + + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + Range range = rangeMapper.get(rangId, streamId, null); + Assertions.assertEquals(newStartOffset, range.getStartOffset()); + Assertions.assertEquals(2345, range.getEndOffset()); + Assertions.assertEquals(nodeId, range.getNodeId()); + Assertions.assertEquals(streamId, range.getStreamId()); + } + + } + @Test + public void testPrepareS3Objects() throws IOException { + long objectId; + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + objectId = metadataStore.prepareS3Objects(3, 5).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); + for (long index = objectId; index < objectId + 3; index++) { + S3Object object = s3ObjectMapper.getById(index); + Assertions.assertEquals(S3ObjectState.BOS_PREPARED, object.getState()); + } + } + } + + @Test + public void testCommitStreamObject() throws IOException, ControllerException { + long objectId, streamId = 1; + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + objectId = metadataStore.prepareS3Objects(3, 5).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + S3StreamObject news3StreamObject = S3StreamObject.newBuilder() + .setObjectId(objectId + 2) + .setStreamId(streamId) + .setObjectSize(111L) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(objectId, 2, 3, 100L).forEach(s3StreamObjectMapper::create); + session.commit(); + } + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + List compactedObjects = new ArrayList<>(); + compactedObjects.add(objectId); + compactedObjects.add(objectId + 1); + metadataStore.commitStreamObject(news3StreamObject, compactedObjects); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); + S3Object s3Object = s3ObjectMapper.getById(objectId); + Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, s3Object.getState()); + + S3Object s3Object1 = s3ObjectMapper.getById(objectId); + Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, s3Object1.getState()); + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + for (long index = objectId; index < objectId + 2; index++) { + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); + Assertions.assertNull(object); + } + + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); + Assertions.assertEquals(111L, object.getObjectSize()); + Assertions.assertEquals(streamId, object.getStreamId()); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); + } + } + + @Test + public void testCommitStreamObject_NoCompacted() throws IOException, ControllerException { + long objectId, streamId = 1; + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + objectId = metadataStore.prepareS3Objects(3, 5).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + S3StreamObject news3StreamObject = S3StreamObject.newBuilder() + .setObjectId(objectId + 2) + .setStreamId(streamId) + .setObjectSize(111L) + .build(); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + metadataStore.commitStreamObject(news3StreamObject, Collections.emptyList()); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + S3Object s3Object = s3ObjectMapper.getById(objectId + 2); + Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); + + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); + Assertions.assertEquals(111L, object.getObjectSize()); + Assertions.assertEquals(streamId, object.getStreamId()); + } + } + + @Test + public void testCommitStreamObject_ObjectNotExist() throws IOException { + long streamId = 1; + + S3StreamObject s3StreamObject = S3StreamObject.newBuilder() + .setObjectId(1) + .setStreamId(streamId) + .setObjectSize(111L) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(1, 1, 100L, 100L).forEach(s3StreamObjectMapper::create); + } + + List compactedObjects = new ArrayList<>(); + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitStreamObject(s3StreamObject, compactedObjects).get()); + } + + } + + @Test + public void testCommitStreamObject_StreamNotExist() throws IOException { + long streamId = 1; + + S3StreamObject s3StreamObject = S3StreamObject.newBuilder() + .setObjectId(-1) + .setStreamId(streamId) + .setObjectSize(111L) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + buildS3StreamObjs(1, 1, 100L, 100L).forEach(s3StreamObjectMapper::create); + } + + List compactedObjects = new ArrayList<>(); + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitStreamObject(s3StreamObject, compactedObjects).get()); + } + + } + + @Test + public void testCommitWALObject() throws IOException, ExecutionException, InterruptedException { + long objectId; + int nodeId = 1; + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + objectId = metadataStore.prepareS3Objects(5, 5).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + S3WALObject walObject = S3WALObject.newBuilder() + .setObjectId(objectId + 4) + .setObjectSize(222L) + .setBrokerId(nodeId) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + + buildS3WalObjs(objectId + 2, 1).stream().peek(s3WalObject -> { + Map subStreams = buildWalSubStreams(1, 20L, 10L); + s3WalObject.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + buildS3WalObjs(objectId + 3, 1).stream().peek(s3WalObject -> { + Map subStreams = buildWalSubStreams(1, 30L, 10L); + s3WalObject.setSubStreams(toJson(subStreams)); + }).forEach(s3WALObjectMapper::create); + + session.commit(); + } + + List compactedObjects = new ArrayList<>(); + compactedObjects.add(objectId + 2); + compactedObjects.add(objectId + 3); + + List s3StreamObjects = buildS3StreamObjs(objectId, 2, 0, 10) + .stream().map(s3StreamObject2 -> S3StreamObject.newBuilder() + .setObjectId(s3StreamObject2.getObjectId()) + .setStreamId(s3StreamObject2.getStreamId()) + .setObjectSize(s3StreamObject2.getObjectSize()) + .setBaseDataTimestamp(s3StreamObject2.getBaseDataTimestamp().getTime()) + .setStartOffset(s3StreamObject2.getStartOffset()) + .setEndOffset(s3StreamObject2.getEndOffset()) + .build()).toList(); + + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get(); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); + for (long index = objectId; index < objectId + 2; index++) { + S3Object object = s3ObjectMapper.getById(index); + Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, object.getState()); + } + + for (long index = objectId + 2; index < objectId + 4; index++) { + S3Object object = s3ObjectMapper.getById(index); + Assertions.assertEquals(S3ObjectState.BOS_WILL_DELETE, object.getState()); + } + + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + for (long index = objectId; index < objectId + 2; index++) { + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); + } + + S3Object s3Object = s3ObjectMapper.getById(objectId + 4); + Assertions.assertEquals(222L, s3Object.getObjectSize()); + Assertions.assertEquals(StreamConstants.NOOP_STREAM_ID, s3Object.getStreamId()); + + S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); + S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); + Assertions.assertEquals(objectId + 2, object.getSequenceId()); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); + } + } + + @Test + public void testCommitWalObject_ObjectNotPrepare() throws IOException, ExecutionException, InterruptedException { + long streamId, startOffset = 0, endOffset = 10; + Integer nodeId = 1; + + S3WALObject walObject = S3WALObject.newBuilder() + .setObjectId(3) + .setBrokerId(-1) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + Stream stream = new Stream(); + stream.setState(StreamState.OPEN); + stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); + stream.setTopicId(2L); + stream.setRangeId(1); + stream.setEpoch(3L); + stream.setTopicId(1L); + stream.setQueueId(2); + streamMapper.create(stream); + streamId = stream.getId(); + Range range = new Range(); + + range.setStreamId(streamId); + range.setRangeId(1); + range.setEpoch(3L); + range.setStartOffset(0L); + range.setEndOffset(0L); + range.setNodeId(nodeId); + rangeMapper.create(range); + + buildS3StreamObjs(1, 1, 0L, 100L).forEach(s3StreamObjectMapper::create); + session.commit(); + } + + List compactedObjects = new ArrayList<>(); + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + Assertions.assertNull(metadataStore.getLease()); + Lease lease = new Lease(); + lease.setNodeId(config.nodeId()); + metadataStore.setLease(lease); + metadataStore.setRole(Role.Leader); + + List s3StreamObjects = metadataStore.listStreamObjects(streamId, startOffset, endOffset, 2).get(); + Assertions.assertThrows(ExecutionException.class, () -> metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get()); + } + + } + + @Test + public void testCommitWalObject_WalNotExist() throws IOException, ExecutionException, InterruptedException { + long streamId; + int nodeId = 1; + long objectId; + Calendar calendar = Calendar.getInstance(); + + S3WALObject walObject = S3WALObject.newBuilder() + .setObjectId(-1) + .setSequenceId(-1) + .setBrokerId(-1) + .build(); + + try (SqlSession session = this.getSessionFactory().openSession()) { + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + Stream stream = new Stream(); + stream.setState(StreamState.OPEN); + stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); + stream.setTopicId(2L); + stream.setRangeId(1); + stream.setEpoch(3L); + stream.setTopicId(1L); + stream.setQueueId(2); + streamMapper.create(stream); + streamId = stream.getId(); + Range range = new Range(); + + range.setStreamId(streamId); + range.setRangeId(1); + range.setEpoch(3L); + range.setStartOffset(0L); + range.setEndOffset(0L); + range.setNodeId(nodeId); + rangeMapper.create(range); + + S3ObjectMapper objectMapper = session.getMapper(S3ObjectMapper.class); + S3Object s3Object = new S3Object(); + s3Object.setId(nextS3ObjectId()); + s3Object.setState(S3ObjectState.BOS_PREPARED); + s3Object.setStreamId(streamId); + s3Object.setObjectSize(2139L); + + calendar.add(Calendar.HOUR, 1); + s3Object.setExpiredTimestamp(calendar.getTime()); + objectMapper.prepare(s3Object); + objectId = s3Object.getId(); + + session.commit(); + } + + List compactedObjects = new ArrayList<>(); + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) { + metadataStore.start(); + Awaitility.await().with() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(metadataStore::isLeader); + + calendar.add(Calendar.HOUR, 2); + S3StreamObject streamObject = S3StreamObject.newBuilder() + .setObjectId(objectId) + .setStreamId(streamId) + .setBaseDataTimestamp(calendar.getTimeInMillis()) + .setStartOffset(0) + .setEndOffset(2) + .setObjectSize(2139) + .build(); + + List s3StreamObjects = new ArrayList<>(); + s3StreamObjects.add(streamObject); + metadataStore.commitWalObject(walObject, s3StreamObjects, compactedObjects).get(); + } + + try (SqlSession session = getSessionFactory().openSession()) { + S3StreamObjectMapper mapper = session.getMapper(S3StreamObjectMapper.class); + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = mapper.getByObjectId(objectId); + Assertions.assertTrue(s3StreamObject.getCommittedTimestamp().getTime() > 0); + + S3ObjectMapper objectMapper = session.getMapper(S3ObjectMapper.class); + S3Object s3Object = objectMapper.getById(objectId); + Assertions.assertNotNull(s3Object.getCommittedTimestamp()); + Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); + + RangeMapper rangeMapper = session.getMapper(RangeMapper.class); + Range range = rangeMapper.get(1, streamId, null); + Assertions.assertEquals(2, range.getEndOffset()); + } + + } +}