Skip to content

Commit

Permalink
feat: use json type for topic.accept_message_types and s3walobject.su…
Browse files Browse the repository at this point in the history
…b_streams columns (#530)

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui authored Oct 31, 2023
1 parent 67dbe8c commit 53f539c
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ int delete(@Param("objectId") Long objectId,
List<S3WalObject> list(@Param("nodeId") Integer nodeId, @Param("sequenceId") Long sequenceId);

int commit(S3WalObject s3WALObject);

boolean streamExclusive(@Param("nodeId") int nodeId, @Param("streamId") long streamId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,13 @@
</where>
</select>

<select id="streamExclusive" resultType="boolean">
SELECT COUNT(node_id) = 0 AS exclusive
FROM
(SELECT node_id, JSON_EXTRACT(sub_streams, '$.subStreams.*.streamId') AS ids
FROM s3walobject
WHERE node_id != #{nodeId}) t
WHERE CONVERT(#{streamId}, CHAR) MEMBER OF(t.ids)
</select>

</mapper>
12 changes: 6 additions & 6 deletions controller/src/main/resources/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS topic
status TINYINT DEFAULT 0,
create_time DATETIME DEFAULT current_timestamp,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
accept_message_types TEXT NOT NULL,
accept_message_types JSON NOT NULL,
UNIQUE INDEX idx_topic_name (name)
);

Expand Down Expand Up @@ -159,11 +159,11 @@ CREATE TABLE IF NOT EXISTS s3streamobject

CREATE TABLE IF NOT EXISTS s3walobject
(
object_id BIGINT NOT NULL,
object_size BIGINT NOT NULL,
node_id INT NOT NULL,
sequence_id BIGINT NOT NULL,
sub_streams LONGTEXT NOT NULL, -- immutable
object_id BIGINT NOT NULL,
object_size BIGINT NOT NULL,
node_id INT NOT NULL,
sequence_id BIGINT NOT NULL,
sub_streams JSON NOT NULL, -- immutable
base_data_timestamp TIMESTAMP(3),
committed_timestamp TIMESTAMP(3),
created_timestamp TIMESTAMP(3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void testCreateGroup() throws IOException, ExecutionException, Interrupte
topic.setName("T1");
topic.setStatus(TopicStatus.TOPIC_STATUS_ACTIVE);
topic.setQueueNum(4);
topic.setAcceptMessageTypes("abc");
topic.setAcceptMessageTypes("{}");
topicMapper.create(topic);
topicId = topic.getId();
session.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,4 @@ protected Map<Long, SubStream> buildWalSubStreams(int count, long startOffset, l
}
return subStreams;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package com.automq.rocketmq.controller.metadata;

import apache.rocketmq.controller.v1.SubStream;
import apache.rocketmq.controller.v1.SubStreams;
import com.automq.rocketmq.controller.metadata.database.dao.S3WalObject;
import com.automq.rocketmq.controller.metadata.database.mapper.S3WalObjectMapper;
import com.google.protobuf.util.JsonFormat;
import org.apache.ibatis.session.SqlSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
Expand All @@ -44,7 +47,7 @@ public void testS3WalObjectCRUD() throws IOException {
s3WALObject.setObjectSize(22L);
s3WALObject.setNodeId(1);
s3WALObject.setSequenceId(999L);
s3WALObject.setSubStreams("<json><json>");
s3WALObject.setSubStreams("{}");

Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, 30);
Expand Down Expand Up @@ -72,7 +75,7 @@ public void testS3WalObjectCRUD() throws IOException {
Assertions.assertEquals(22, s3WalObjects.get(0).getObjectSize());
Assertions.assertEquals(1, s3WalObjects.get(0).getNodeId());
Assertions.assertEquals(999, s3WalObjects.get(0).getSequenceId());
Assertions.assertEquals("<json><json>", s3WalObjects.get(0).getSubStreams());
Assertions.assertEquals("{}", s3WalObjects.get(0).getSubStreams());
Assertions.assertEquals(time, s3WalObjects.get(0).getBaseDataTimestamp());

// test delete
Expand All @@ -92,7 +95,7 @@ public void testCommitS3WalObject() throws IOException {
s3WALObject.setObjectSize(22L);
s3WALObject.setNodeId(1);
s3WALObject.setSequenceId(999L);
s3WALObject.setSubStreams("<json><json>");
s3WALObject.setSubStreams("{}");

Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, 30);
Expand Down Expand Up @@ -122,4 +125,36 @@ public void testCommitS3WalObject() throws IOException {
Assertions.assertTrue(s3WalObjects.isEmpty());
}
}

@Test
public void testStreamExclusive() throws IOException {
try (SqlSession session = getSessionFactory().openSession()) {
S3WalObjectMapper mapper = session.getMapper(S3WalObjectMapper.class);

S3WalObject walObject = new S3WalObject();
walObject.setNodeId(1);
walObject.setObjectSize(128L);
walObject.setObjectId(2L);
walObject.setSequenceId(3L);
walObject.setBaseDataTimestamp(new Date());
walObject.setCommittedTimestamp(new Date());
walObject.setCreatedTimestamp(new Date());
SubStreams subStreams = SubStreams.newBuilder()
.putSubStreams(1, SubStream.newBuilder().setStreamId(1).setStartOffset(0).setEndOffset(10).build())
.putSubStreams(2, SubStream.newBuilder().setStreamId(2).setStartOffset(0).setEndOffset(10).build())
.putSubStreams(3, SubStream.newBuilder().setStreamId(3).setStartOffset(0).setEndOffset(10).build())
.build();
walObject.setSubStreams(JsonFormat.printer().print(subStreams));
int rowsAffected = mapper.create(walObject);
Assertions.assertEquals(1, rowsAffected);

Assertions.assertTrue(mapper.streamExclusive(1, 1));
Assertions.assertTrue(mapper.streamExclusive(1, 2));
Assertions.assertTrue(mapper.streamExclusive(1, 3));

Assertions.assertFalse(mapper.streamExclusive(2, 1));
Assertions.assertFalse(mapper.streamExclusive(2, 2));
Assertions.assertFalse(mapper.streamExclusive(2, 3));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.metadata.database.dao.Topic;
import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.util.List;

import org.apache.ibatis.session.SqlSession;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -64,7 +66,8 @@ public void testTopicCRUD() throws IOException {

got = topicMapper.get(topic.getId(), null);
Assertions.assertEquals(TopicStatus.TOPIC_STATUS_ACTIVE, got.getStatus());
Assertions.assertEquals(acceptTypesJson, got.getAcceptMessageTypes());
AcceptTypes gotAcceptedTypes = decodeAcceptTypes(got.getAcceptMessageTypes());
Assertions.assertEquals(acceptTypes, gotAcceptedTypes);

List<Topic> topics = topicMapper.list(null, null);
Assertions.assertEquals(1, topics.size());
Expand All @@ -86,7 +89,14 @@ public void testTopicCRUD() throws IOException {
topicMapper.update(topic3);

Topic topic4 = topicMapper.get(topic.getId(), null);
Assertions.assertEquals(acceptTypesJson, topic4.getAcceptMessageTypes());
Assertions.assertEquals(acceptTypes, decodeAcceptTypes(topic4.getAcceptMessageTypes()));
}
}

@NotNull
private static AcceptTypes decodeAcceptTypes(String json) throws InvalidProtocolBufferException {
AcceptTypes.Builder builder = AcceptTypes.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -257,6 +258,12 @@ void testCreateTopic() throws IOException, ExecutionException, InterruptedExcept
}
}

private static AcceptTypes decodeAcceptTypes(String json) throws InvalidProtocolBufferException {
AcceptTypes.Builder builder = AcceptTypes.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
return builder.build();
}

@Test
void testUpdateTopic() throws IOException, ExecutionException, InterruptedException {
String address = "localhost:1234";
Expand All @@ -277,6 +284,11 @@ void testUpdateTopic() throws IOException, ExecutionException, InterruptedExcept
int queueNum = 4;
String topicName = "t1";

AcceptTypes acceptTypes = AcceptTypes.newBuilder()
.addTypes(MessageType.NORMAL)
.addTypes(MessageType.TRANSACTION)
.build();

try (MetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) {
metadataStore.start();
Awaitility.await().with().atMost(10, TimeUnit.SECONDS)
Expand All @@ -301,10 +313,7 @@ void testUpdateTopic() throws IOException, ExecutionException, InterruptedExcept
UpdateTopicRequest updateTopicRequest = UpdateTopicRequest.newBuilder()
.setTopicId(topicId)
.setName(topicName)
.setAcceptTypes(AcceptTypes.newBuilder()
.addTypes(MessageType.NORMAL)
.addTypes(MessageType.TRANSACTION)
.build())
.setAcceptTypes(acceptTypes)
.build();
metadataStore.updateTopic(updateTopicRequest).get();
}
Expand All @@ -314,11 +323,14 @@ void testUpdateTopic() throws IOException, ExecutionException, InterruptedExcept
List<Topic> topics = topicMapper.list(null, null);
topics.stream().filter(topic -> topic.getName().equals("t1")).forEach(topic -> Assertions.assertEquals(4, topic.getQueueNum()));

String json = JsonFormat.printer().print(AcceptTypes.newBuilder()
.addTypes(MessageType.NORMAL)
.addTypes(MessageType.TRANSACTION)
.build());
topics.stream().filter(topic -> topic.getName().equals("t1")).forEach(topic -> Assertions.assertEquals(json, topic.getAcceptMessageTypes()));
topics.stream().filter(topic -> topic.getName().equals("t1"))
.forEach(topic -> {
try {
Assertions.assertEquals(acceptTypes, decodeAcceptTypes(topic.getAcceptMessageTypes()));
} catch (InvalidProtocolBufferException e) {
Assertions.fail(e);
}
});

QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
List<QueueAssignment> assignments = assignmentMapper.list(topicId, null, null, null, null);
Expand Down

0 comments on commit 53f539c

Please sign in to comment.