From 7d73bec19dec72c58290d07be149cc3d86391bc5 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 18:08:49 -0700 Subject: [PATCH 01/11] Fix describe cluster api --- .../KafkaClientDescribeClusterFactory.java | 30 +++++++++++++------ .../binding-kafka/src/main/zilla/protocol.idl | 21 +++++++++---- .../cluster.brokers.info/client.rpt | 26 +++++++++------- .../cluster.brokers.info/server.rpt | 27 ++++++++++------- 4 files changed, 68 insertions(+), 36 deletions(-) diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java index 4c1dd223cc..51619eb6b8 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java @@ -39,11 +39,11 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.ClusterBrokerFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterRequestFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterRequestPart2FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponseFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponsePart2FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -99,6 +99,8 @@ public final class KafkaClientDescribeClusterFactory extends KafkaClientSaslHand private final RequestHeaderFW.Builder requestHeaderRW = new RequestHeaderFW.Builder(); private final DescribeClusterRequestFW.Builder describeClusterRequestRW = new DescribeClusterRequestFW.Builder(); + private final DescribeClusterRequestPart2FW.Builder describeClusterRequestPart2RW = + new DescribeClusterRequestPart2FW.Builder(); private final ResponseHeaderFW responseHeaderRO = new ResponseHeaderFW(); private final DescribeClusterResponseFW describeClusterResponseRO = new DescribeClusterResponseFW(); @@ -494,8 +496,8 @@ private int decodeDescribeClusterResponse( final int throttle = describeClusterResponse.throttle(); final short error = describeClusterResponse.error(); - final String16FW message = describeClusterResponse.message(); - final String16FW clusterId = describeClusterResponse.clusterId(); + final String message = describeClusterResponse.message().asString(); + final String clusterId = describeClusterResponse.clusterId().asString(); final int controllerId = describeClusterResponse.controllerId(); final int brokerCount = describeClusterResponse.brokerCount(); @@ -517,7 +519,9 @@ private int decodeDescribeClusterResponse( final DescribeClusterResponsePart2FW responsePart2 = describeClusterResponsePart2RO.wrap(buffer, progress, limit); - final int authorizedOperations = responsePart2.authorizedOperations(); + final int authorizedOperations = responsePart2.clusterAuthorizedOperations(); + + progress = responsePart2.limit(); client.onDecodeDescribeClusterResponse(traceId, authorization, throttle, error, message, clusterId, controllerId, brokers, authorizedOperations); @@ -720,8 +724,8 @@ private void doApplicationBegin( long authorization, int throttle, short error, - String16FW message, - String16FW clusterId, + String message, + String clusterId, int controllerId, List brokers, int authorizedOperations) @@ -1239,11 +1243,19 @@ private void doEncodeDescribeClusterRequest( final DescribeClusterRequestFW describeClusterRequest = describeClusterRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit) - .includeAuthorizedOperations(authorizedOperations) + .taggedFields(0) .build(); encodeProgress = describeClusterRequest.limit(); + DescribeClusterRequestPart2FW describeClusterRequestPart2 = + describeClusterRequestPart2RW.wrap(encodeBuffer, encodeProgress, encodeLimit) + .includeAuthorizedOperations(authorizedOperations) + .taggedFields(0) + .build(); + + encodeProgress = describeClusterRequestPart2.limit(); + final int requestId = nextRequestId++; final int requestSize = encodeProgress - encodeOffset - RequestHeaderFW.FIELD_OFFSET_API_KEY; @@ -1470,8 +1482,8 @@ private void onDecodeDescribeClusterResponse( long authorization, int throttle, short error, - String16FW message, - String16FW clusterId, + String message, + String clusterId, int controllerId, List brokers, int authorizedOperations) diff --git a/runtime/binding-kafka/src/main/zilla/protocol.idl b/runtime/binding-kafka/src/main/zilla/protocol.idl index ff606c0107..a3a8fce48f 100644 --- a/runtime/binding-kafka/src/main/zilla/protocol.idl +++ b/runtime/binding-kafka/src/main/zilla/protocol.idl @@ -584,32 +584,41 @@ scope protocol scope describe_cluster { struct DescribeClusterRequest // v0 + { + varuint32 taggedFields; + } + + struct DescribeClusterRequestPart2 { uint8 includeAuthorizedOperations; + varuint32 taggedFields; } struct DescribeClusterResponse { int32 correlationId; + varuint32 taggedFields; int32 throttle; int16 error; - string16 message; - string16 clusterId = null; + varstring message; + varstring clusterId; int32 controllerId; - int32 brokerCount; + varuint32n brokerCount; } struct ClusterBroker { int32 brokerId; - string16 host; + varstring host; int32 port; - string16 rack = null; + varstring rack; + varuint32 taggedFields; } struct DescribeClusterResponsePart2 { - int32 authorizedOperations; + int32 clusterAuthorizedOperations; + varuint32 taggedFields; } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt index 37fe52f5a9..57bdaff151 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt @@ -28,27 +28,33 @@ connect "zilla://streams/net0" connected -write 16 # size +write 18 # size 60s # describe cluster 0s # v0 ${newRequestId} 5s "zilla" # client id - [0x00] # resources + [0x00] # tagged fields + [0x00] # include cluster authorized ops + [0x00] # tagged fields -read 97 # size +read 92 # size (int:newRequestId) + [0x00] # tagged fields 0 # throttle time ms 0s # error code - -1s # error message - 9s "cluster-0" # cluster id + [0x00] # error message + [0x0a] "cluster-0" # cluster id 0 # controller id - 2 # brokers + [0x03] # brokers 1 # broker id - 19s "broker1.example.com" # host + [0x14] "broker1.example.com" # host 9092 # port - -1s # rack + [0x00] # rack + [0x00] # tagged fields 2 # broker id - 19s "broker2.example.com" # host + [0x14] "broker2.example.com" # host 9092 # port - -1s # rack + [0x00] # rack + [0x00] # tagged fields 0 # cluster authorized operations + [0x00] # tagged fields diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt index 473762e7dc..f1ce509e6b 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt @@ -25,28 +25,33 @@ accepted connected -read 16 # size +read 18 # size 60s # describe cluster 0s # v0 (int:newRequestId) 5s "zilla" # client id - [0x00] # resources + [0x00] # tagged fields + [0x00] # include cluster authorized ops + [0x00] # tagged fields - -write 97 # size +write 92 # size ${newRequestId} + [0x00] # tagged fields 0 # throttle time ms 0s # error code - -1s # error message - 9s "cluster-0" # cluster id + [0x00] # error message + [0x0a] "cluster-0" # cluster id 0 # controller id - 2 # brokers + [0x03] # brokers 1 # broker id - 19s "broker1.example.com" # host + [0x14] "broker1.example.com" # host 9092 # port - -1s # rack + [0x00] # rack + [0x00] # tagged fields 2 # broker id - 19s "broker2.example.com" # host + [0x14] "broker2.example.com" # host 9092 # port - -1s # rack + [0x00] # rack + [0x00] # tagged fields 0 # cluster authorized operations + [0x00] # tagged fields From f556baaea40e60374846328d3718075ad23b6aeb Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 18:42:24 -0700 Subject: [PATCH 02/11] WIP --- .../binding/pgsql/kafka/config/proxy.yaml | 19 ++----------- .../PgsqlKafkaValueAvroSchemaTemplate.java | 27 +++++++++++-------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml index 6170c8532d..017ac77e93 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml @@ -23,23 +23,8 @@ catalogs: id: 1 schema: |- { - "type": "record", - "name": "cities", - "namespace": "dev", - "fields": [ - { - "name": "description", - "type": string - }, - { - "name": "id", - "type": string - }, - { - "name": "name", - "type": string - } - ] + "schemaType": "AVRO", + "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"description\", \"type\": \"string\"}, {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}]}" } bindings: app0: diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java index 193243ca24..0d18df3809 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java @@ -43,10 +43,14 @@ public String generateSchema( final String recordName = createTable.getTable().getName(); schemaBuilder.append("{\n"); - schemaBuilder.append("\"type\": \"record\",\n"); - schemaBuilder.append("\"name\": \"").append(recordName).append("\",\n"); - schemaBuilder.append("\"namespace\": \"").append(newNamespace).append("\",\n"); - schemaBuilder.append("\"fields\": [\n"); + schemaBuilder.append("\"schemaType\": \"AVRO\",\n"); + schemaBuilder.append("\"schema\": \""); // Begin the schema field + + // Building the actual Avro schema + schemaBuilder.append("{\\\"type\\\": \\\"record\\\","); + schemaBuilder.append(" \\\"name\\\": \\\"").append(recordName).append("\\\","); + schemaBuilder.append(" \\\"namespace\\\": \\\"").append(newNamespace).append("\\\","); + schemaBuilder.append(" \\\"fields\\\": ["); for (ColumnDefinition column : createTable.getColumnDefinitions()) { @@ -55,15 +59,16 @@ public String generateSchema( String avroType = convertPgsqlTypeToAvro(pgsqlType); - schemaBuilder.append(" {\n"); - schemaBuilder.append(" \"name\": \"").append(fieldName).append("\",\n"); - schemaBuilder.append(" \"type\": ").append(avroType).append("\n"); - schemaBuilder.append(" },\n"); + schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\","); + schemaBuilder.append(" \\\"type\\\": \\\"").append(avroType).append("\\\"},"); } - // Remove the last comma after the last field and close the JSON array - schemaBuilder.setLength(schemaBuilder.length() - 2); // Remove last comma - schemaBuilder.append("\n]\n}"); + // Remove the last comma and close the fields array + schemaBuilder.setLength(schemaBuilder.length() - 1); + schemaBuilder.append("]"); + + // Closing the Avro schema + schemaBuilder.append("}\"\n}"); return schemaBuilder.toString(); } From a1275ca90d9ec4b1a61cfc54dab7a1b395eba5eb Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 20:35:33 -0700 Subject: [PATCH 03/11] No need for describe cluster --- .../streams/kafka/create.topic/client.rpt | 35 ------- .../streams/kafka/create.topic/server.rpt | 34 ------- incubator/binding-pgsql-kafka/pom.xml | 2 +- .../stream/PgsqlKafkaProxyFactory.java | 93 +------------------ 4 files changed, 3 insertions(+), 161 deletions(-) diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt index 7a21d70c2c..c39986a2ab 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt @@ -13,40 +13,6 @@ # specific language governing permissions and limitations under the License. # -connect "zilla://streams/app1" - option zilla:window 8192 - option zilla:transmission "half-duplex" - -write zilla:begin.ext ${kafka:beginEx() - .typeId(zilla:id("kafka")) - .request() - .describeCluster() - .includeAuthorizedOperations("false") - .build() - .build()} - -connected - -read zilla:begin.ext ${kafka:matchBeginEx() - .typeId(zilla:id("kafka")) - .response() - .describeCluster() - .throttle(0) - .error(0) - .clusterId("cluster-0") - .controllerId(0) - .broker() - .brokerId(1) - .host("broker1.example.com") - .port(9092) - .build() - .authorizedOperations(0) - .build() - .build()} - -write close -read closed - connect "zilla://streams/app1" option zilla:window 8192 option zilla:transmission "half-duplex" @@ -59,7 +25,6 @@ write zilla:begin.ext ${kafka:beginEx() .name("dev.cities") .partitionCount(1) .replicas(1) - .assignment(0, 1) .config("cleanup.policy", "compact") .build() .timeout(30000) diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt index 5c3159fad5..2504756e81 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt @@ -21,39 +21,6 @@ accept ${serverAddress} accepted -read zilla:begin.ext ${kafka:matchBeginEx() - .typeId(zilla:id("kafka")) - .request() - .describeCluster() - .includeAuthorizedOperations("false") - .build() - .build()} - -connected - -write zilla:begin.ext ${kafka:beginEx() - .typeId(zilla:id("kafka")) - .response() - .describeCluster() - .throttle(0) - .error(0) - .clusterId("cluster-0") - .controllerId(0) - .broker() - .brokerId(1) - .host("broker1.example.com") - .port(9092) - .build() - .authorizedOperations(0) - .build() - .build()} -write flush - -read closed -write close - -accepted - read zilla:begin.ext ${kafka:matchBeginEx() .typeId(zilla:id("kafka")) .request() @@ -62,7 +29,6 @@ read zilla:begin.ext ${kafka:matchBeginEx() .name("dev.cities") .partitionCount(1) .replicas(1) - .assignment(0, 1) .config("cleanup.policy", "compact") .build() .timeout(30000) diff --git a/incubator/binding-pgsql-kafka/pom.xml b/incubator/binding-pgsql-kafka/pom.xml index 4bc80c1154..5779c09cb9 100644 --- a/incubator/binding-pgsql-kafka/pom.xml +++ b/incubator/binding-pgsql-kafka/pom.xml @@ -24,7 +24,7 @@ - 0.83 + 0.82 0 diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 7eda97fe93..6e675d262e 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -48,7 +48,6 @@ import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.ExtensionFW; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.FlushFW; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaBeginExFW; -import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaDescribeClusterResponseBeginExFW; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlBeginExFW; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlDataExFW; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlFlushExFW; @@ -226,9 +225,7 @@ private final class PgsqlProxy private final String database; private final PgsqlKafkaBindingConfig binding; private final KafkaCreateTopicsProxy createTopicsProxy; - private final KafkaDescribeClusterProxy describeClusterProxy; - private final List brokers; private final IntArrayQueue queries; private final long initialId; @@ -275,10 +272,8 @@ private PgsqlProxy( String dbValue = parameters.get("database\u0000"); this.database = dbValue.substring(0, dbValue.length() - 1); this.queries = new IntArrayQueue(); - this.brokers = new ArrayList<>(); this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this); - this.describeClusterProxy = new KafkaDescribeClusterProxy(routedId, resolvedId, this); } private void onAppMessage( @@ -686,7 +681,6 @@ private void cleanup( long traceId, long authorization) { - describeClusterProxy.doKafkaAbortAndReset(traceId, authorization); createTopicsProxy.doKafkaAbortAndReset(traceId, authorization); doAppAbortAndReset(traceId, authorization); @@ -974,7 +968,6 @@ private void doKafkaBegin( initialMax = delegate.initialMax; state = PgsqlKafkaState.openingInitial(state); - final int numBrokers = delegate.brokers.size(); final int partitionCount = config.kafkaCreateTopicsPartitionCount(); final KafkaBeginExFW kafkaBeginEx = @@ -985,18 +978,8 @@ private void doKafkaBegin( .topics(ct -> topics.forEach(t -> ct.item(i -> i .name(t) - .partitionCount(1) + .partitionCount(partitionCount) .replicas(config.kafkaCreateTopicsReplicas()) - .assignments(a -> a - .item(ai -> - { - for (int p = 0; p < partitionCount; p++) - { - int brokerIndex = p % numBrokers; - int brokerId = delegate.brokers.get(brokerIndex); - ai.partitionId(p).leaderId(brokerId); - } - })) .configs(cf -> cf .item(ci -> ci.name("cleanup.policy").value(deletionPolicy)))))) .timeout(config.kafkaTopicRequestTimeoutMs()) @@ -1048,74 +1031,6 @@ protected void onKafkaBegin( } } - private final class KafkaDescribeClusterProxy extends KafkaProxy - { - private KafkaDescribeClusterProxy( - long originId, - long routedId, - PgsqlProxy delegate) - { - super(originId, routedId, delegate); - } - - protected void doKafkaBegin( - long traceId, - long authorization) - { - state = PgsqlKafkaState.openingInitial(state); - - final KafkaBeginExFW kafkaBeginEx = - kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity()) - .typeId(kafkaTypeId) - .request(r -> r - .describeCluster(d -> d.includeAuthorizedOperations(0))) - .build(); - - kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax, - traceId, authorization, 0, kafkaBeginEx); - } - - @Override - protected void onKafkaBegin( - BeginFW begin) - { - final long sequence = begin.sequence(); - final long acknowledge = begin.acknowledge(); - final long traceId = begin.traceId(); - final long authorization = begin.authorization(); - final OctetsFW extension = begin.extension(); - - assert acknowledge <= sequence; - assert sequence >= replySeq; - assert acknowledge >= replyAck; - - replySeq = sequence; - replyAck = acknowledge; - state = PgsqlKafkaState.openingReply(state); - - assert replyAck <= replySeq; - - final ExtensionFW beginEx = extension.get(extensionRO::tryWrap); - final KafkaBeginExFW kafkaBeginEx = - beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null; - - final KafkaDescribeClusterResponseBeginExFW describeCluster = kafkaBeginEx.response().describeCluster(); - - if (describeCluster.error() == NO_ERROR) - { - describeCluster.brokers().forEach(b -> delegate.brokers.add(b.brokerId())); - delegate.onKafkaDescribeClusterBegin(traceId, authorization); - - doKafkaWindow(traceId, authorization); - doKafkaEnd(traceId, authorization); - } - else - { - delegate.cleanup(traceId, authorization); - } - } - } - private void doBegin( final MessageConsumer receiver, final long originId, @@ -1328,15 +1243,11 @@ private void decodeCreateTopicCommand( int offset, int length) { - if (server.commandsProcessed == 2) + if (server.commandsProcessed == 1) { server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.CREATE_TOPIC_COMMAND); } else if (server.commandsProcessed == 0) - { - server.describeClusterProxy.doKafkaBegin(traceId, authorization); - } - else if (server.commandsProcessed == 1) { final CreateTable statement = (CreateTable) parseStatement(buffer, offset, length); final String topic = statement.getTable().getName(); From 9261c929e298cf4356324dcc0e0fe043703a97ed Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 20:50:59 -0700 Subject: [PATCH 04/11] FIx typos --- .../streams/effective/create.table.with.primary.key/client.rpt | 1 - .../streams/effective/create.table.with.primary.key/server.rpt | 1 - .../binding/risingwave/streams/effective/create.table/client.rpt | 1 - .../binding/risingwave/streams/effective/create.table/server.rpt | 1 - .../streams/effective/query.with.multiple.statements/client.rpt | 1 - .../streams/effective/query.with.multiple.statements/server.rpt | 1 - .../internal/statement/RisingwaveCreateSourceTemplate.java | 1 - .../internal/statement/RisingwaveCreateTableTemplate.java | 1 - 8 files changed, 8 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt index 80ac51e3ed..1063e8a808 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt @@ -42,7 +42,6 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt index deea693f80..f3cb49db5e 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt @@ -46,7 +46,6 @@ read "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt index 34b1eb1260..6a33b51641 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt @@ -39,7 +39,6 @@ write "CREATE SOURCE IF NOT EXISTS weather (*)\n" " topic='dev.weather',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt index b4e06de315..db2c967a64 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt @@ -43,7 +43,6 @@ read "CREATE SOURCE IF NOT EXISTS weather (*)\n" " topic='dev.weather',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt index c1a8d42a24..cdd7143e20 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt @@ -42,7 +42,6 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt index 014dd4d23a..1d111d6108 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt @@ -46,7 +46,6 @@ read "CREATE TABLE IF NOT EXISTS cities (\n" " topic='dev.cities',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ")\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java index 1ee9f0e1f7..8c3963e642 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java @@ -27,7 +27,6 @@ public class RisingwaveCreateSourceTemplate extends RisingwaveCommandTemplate topic='%s.%s', scan.startup.mode='latest', scan.startup.timestamp.millis='%d' - ) ) FORMAT UPSERT ENCODE AVRO ( schema.registry = '%s' );\u0000"""; diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java index 3b64a685eb..02180e99da 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java @@ -30,7 +30,6 @@ PRIMARY KEY (%s) topic='%s.%s', scan.startup.mode='latest', scan.startup.timestamp.millis='%d' - ) ) FORMAT UPSERT ENCODE AVRO ( schema.registry = '%s' );\u0000"""; From cead7aeff50badf06a4575ead0cf824191091497 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 23:17:29 -0700 Subject: [PATCH 05/11] WIP --- .../stream/PgsqlKafkaProxyFactory.java | 34 ++++++++++++++++--- .../create.table.with.primary.key/client.rpt | 2 +- .../create.table.with.primary.key/server.rpt | 2 +- .../query.with.multiple.statements/client.rpt | 2 +- .../query.with.multiple.statements/server.rpt | 2 +- .../RisingwaveCreateTableTemplate.java | 5 ++- .../stream/RisingwaveProxyFactory.java | 2 -- 7 files changed, 35 insertions(+), 14 deletions(-) diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 6e675d262e..17860bcb56 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -67,9 +67,16 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory { + private static final String AVRO_KEY_SCHEMA = """ + { + "schemaType": "AVRO", + "schema": "{\\"type\\": \\"string\\"}" + }"""; + + private static final Byte STATEMENT_SEMICOLON = ';'; private static final int END_OF_FIELD = 0x00; - private static final int NO_ERROR = 0x00; + private static final int NO_ERROR_SCHEMA_VERSION_ID = -1; private static final int FLAGS_INIT = 0x02; private static final int FLAGS_CONT = 0x00; @@ -1256,12 +1263,29 @@ else if (server.commandsProcessed == 0) topics.add(String.format("%s.%s", server.database, topic)); final PgsqlKafkaBindingConfig binding = server.binding; - final String schema = binding.avroValueSchema.generateSchema(server.database, statement); - final String subject = String.format("%s.%s-value", server.database, topic); + final String primaryKey = binding.avroValueSchema.primaryKey(statement); + + final String subjectKey = primaryKey != null + ? String.format("%s.%s-key", server.database, topic) + : null; + + final String subjectValue = String.format("%s.%s-value", server.database, topic); + final String schemaValue = binding.avroValueSchema.generateSchema(server.database, statement); + + int versionId = NO_ERROR_SCHEMA_VERSION_ID; + if (subjectKey != null) + { + //TODO: assign versionId to avoid test failure + binding.catalog.register(subjectKey, AVRO_KEY_SCHEMA); + } + if (versionId != NO_VERSION_ID) + { + versionId = binding.catalog.register(subjectValue, schemaValue); + } - if (binding.catalog.register(subject, schema) != NO_VERSION_ID) + if (versionId != NO_VERSION_ID) { - final String policy = binding.avroValueSchema.primaryKey(statement) != null + final String policy = primaryKey != null ? "compact" : "delete"; diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt index 1063e8a808..fb70359d1e 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (id)\n" + " PRIMARY KEY (key)\n" ") INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt index f3cb49db5e..6479311838 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (id)\n" + " PRIMARY KEY (key)\n" ") INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt index cdd7143e20..ec71b51e3c 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (id)\n" + " PRIMARY KEY (key)\n" ") INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt index 1d111d6108..69ae550915 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" - " PRIMARY KEY (id)\n" + " PRIMARY KEY (key)\n" ") INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java index 02180e99da..8b264a3576 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java @@ -22,7 +22,7 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate private final String sqlFormat = """ CREATE TABLE IF NOT EXISTS %s ( *, - PRIMARY KEY (%s) + PRIMARY KEY (key) ) INCLUDE KEY AS key WITH ( connector='kafka', @@ -54,9 +54,8 @@ public String generate( { CreateTable createTable = (CreateTable) statement; String table = createTable.getTable().getName(); - String primaryKey = getPrimaryKey(createTable); - return String.format(sqlFormat, table, primaryKey, bootstrapServer, database, + return String.format(sqlFormat, table, bootstrapServer, database, table, scanStartupMil, schemaRegistry); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 6b434124f4..e8852864d2 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -931,8 +931,6 @@ private void onAppEnd( state = RisingwaveState.closeReply(state); doAppEnd(traceId, authorization); - - server.cleanup(traceId, authorization); } } From 91cc040838086df253adbb6a178da1b16ac08221 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 18 Sep 2024 23:30:05 -0700 Subject: [PATCH 06/11] Fix source --- .../risingwave/streams/effective/create.table/client.rpt | 2 +- .../risingwave/streams/effective/create.table/server.rpt | 2 +- .../internal/statement/RisingwaveCreateSourceTemplate.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt index 6a33b51641..5635455b3c 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt @@ -39,7 +39,7 @@ write "CREATE SOURCE IF NOT EXISTS weather (*)\n" " topic='dev.weather',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt index db2c967a64..52a524dbc4 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt @@ -43,7 +43,7 @@ read "CREATE SOURCE IF NOT EXISTS weather (*)\n" " topic='dev.weather',\n" " scan.startup.mode='latest',\n" " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT UPSERT ENCODE AVRO (\n" + ") FORMAT PLAIN ENCODE AVRO (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java index 8c3963e642..26cb3e096b 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java @@ -27,7 +27,7 @@ public class RisingwaveCreateSourceTemplate extends RisingwaveCommandTemplate topic='%s.%s', scan.startup.mode='latest', scan.startup.timestamp.millis='%d' - ) FORMAT UPSERT ENCODE AVRO ( + ) FORMAT PLAIN ENCODE AVRO ( schema.registry = '%s' );\u0000"""; From ee750027a85078199a4b5a5b04832c002cec6bec Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 19 Sep 2024 00:14:02 -0700 Subject: [PATCH 07/11] Fix stream state --- .../stream/PgsqlKafkaProxyFactory.java | 8 ----- .../stream/RisingwaveProxyFactory.java | 33 ++++++++++++------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 17860bcb56..060724aaca 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -488,14 +488,6 @@ private void onCommandCompleted( doAppWindow(traceId, authorization); } - private void onKafkaDescribeClusterBegin( - long traceId, - long authorization) - { - commandsProcessed++; - doParseQuery(traceId, authorization); - } - public void onKafkaCreateTopicsBegin( long traceId, long authorization) diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index e8852864d2..b591c0713f 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -244,6 +244,7 @@ private final class PgsqlServer private int initialMax; private int initialPad; private long initialBudgetId; + private long affinity; private long replySeq; private long replyAck; @@ -329,7 +330,8 @@ private void onAppBegin( { final long traceId = begin.traceId(); final long authorization = begin.authorization(); - final long affinity = begin.affinity(); + + this.affinity = begin.affinity(); state = RisingwaveState.openingInitial(state); @@ -751,11 +753,12 @@ private final class PgsqlClient private MessageConsumer app; - private final long initialId; - private final long replyId; private final long originId; private final long routedId; + private long initialId; + private long replyId; + private long initialSeq; private long initialAck; private int initialMax; @@ -780,8 +783,6 @@ private PgsqlClient( this.server = server; this.originId = originId; this.routedId = routedId; - this.initialId = supplyInitialId.applyAsLong(routedId); - this.replyId = supplyReplyId.applyAsLong(initialId); this.typeCommand = proxyFlushCommand; this.completionCommand = ignoreFlushCommand; @@ -926,12 +927,9 @@ private void onAppEnd( final long traceId = end.traceId(); final long authorization = end.authorization(); - if (!RisingwaveState.closed(server.state)) - { - state = RisingwaveState.closeReply(state); + state = RisingwaveState.closeReply(state); - doAppEnd(traceId, authorization); - } + doAppEnd(traceId, authorization); } private void onAppAbort( @@ -1013,8 +1011,16 @@ private void doAppBegin( long authorization, long affinity) { + if (RisingwaveState.closed(state)) + { + state = 0; + } + if (!RisingwaveState.initialOpening(state)) { + this.initialId = supplyInitialId.applyAsLong(routedId); + this.replyId = supplyReplyId.applyAsLong(initialId); + Consumer beginEx = e -> e.set((b, o, l) -> beginExRW.wrap(b, o, l) .typeId(pgsqlTypeId) .parameters(p -> server.parameters.forEach((k, v) -> p.item(i -> i.name(k).value(v)))) @@ -1064,7 +1070,7 @@ private void doAppEnd( long traceId, long authorization) { - if (RisingwaveState.initialOpened(state)) + if (RisingwaveState.initialOpening(state)) { state = RisingwaveState.closeInitial(state); @@ -1132,6 +1138,7 @@ private void doPgsqlQuery( final int lengthMax = Math.min(initialWin - initialPad, remaining); if (RisingwaveState.initialOpened(state) && + !RisingwaveState.initialClosing(state) && lengthMax > 0 && remaining > 0) { final int deferred = remaining - length; @@ -1157,6 +1164,10 @@ private void doPgsqlQuery( messageOffset += lengthMax; } + else + { + doAppBegin(traceId, authorization, server.affinity); + } } } From 283b03bc415048b4d83383648229eb1ce5536479 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 19 Sep 2024 13:50:31 -0700 Subject: [PATCH 08/11] Update materialized view scipts --- .../create.materialized.view/client.rpt | 41 ++++++++++++++++-- .../create.materialized.view/server.rpt | 43 +++++++++++++++++-- .../config/RisingwaveBindingConfig.java | 6 +-- ...e.java => RisingwaveDescribeTemplate.java} | 6 +-- 4 files changed, 84 insertions(+), 12 deletions(-) rename incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/{RisingwaveDescribeMaterializedViewTemplate.java => RisingwaveDescribeTemplate.java} (84%) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt index e5fd9bcf54..87ee044d5d 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt @@ -56,14 +56,14 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "DESCRIBE MATERIALIZED VIEW distinct_cities;" +write "DESCRIBE distinct_cities;" [0x00] read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .type() .column() - .name("id") + .name("Name") .tableOid(0) .index(0) .typeOid(1043) @@ -72,7 +72,7 @@ read advised zilla:flush ${pgsql:flushEx() .format("TEXT") .build() .column() - .name("city") + .name("Type") .tableOid(0) .index(0) .typeOid(1043) @@ -80,8 +80,43 @@ read advised zilla:flush ${pgsql:flushEx() .modifier(-1) .format("TEXT") .build() + .column() + .name("Description") + .tableOid(0) + .index(0) + .typeOid(1043) + .length(4) + .modifier(-1) + .format("TEXT") + .build() + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .row() + .build() + .build()} +read [0x00 0x03] # Field Count + [0x00 0x00 0x00 0x02] # Length + [0x69 0x64] # Data + [0x00 0x00 0x00 0x011] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67 # Data + [0xff 0xff 0xff 0xff] # Length + +write flush + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .row() .build() .build()} +write [0x00 0x03] # Field Count + [0x00 0x00 0x00 0x04] # Length + [0x63 0x69 0x74 0x79] # Data + [0x00 0x00 0x00 0x011] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0xff 0xff 0xff 0xff] read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt index 508ec1a960..350345ff52 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt @@ -57,14 +57,14 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "DESCRIBE MATERIALIZED VIEW distinct_cities;" +read "DESCRIBE distinct_cities;" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .type() .column() - .name("id") + .name("Name") .tableOid(0) .index(0) .typeOid(1043) @@ -73,7 +73,7 @@ write advise zilla:flush ${pgsql:flushEx() .format("TEXT") .build() .column() - .name("city") + .name("Type") .tableOid(0) .index(0) .typeOid(1043) @@ -81,8 +81,45 @@ write advise zilla:flush ${pgsql:flushEx() .modifier(-1) .format("TEXT") .build() + .column() + .name("Description") + .tableOid(0) + .index(0) + .typeOid(1043) + .length(4) + .modifier(-1) + .format("TEXT") + .build() + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .row() + .build() + .build()} +write [0x00 0x03] # Field Count + [0x00 0x00 0x00 0x02] # Length + [0x69 0x64] # Data + [0x00 0x00 0x00 0x011] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67 # Data + [0xff 0xff 0xff 0xff] # Length + +write flush + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .row() .build() .build()} +write [0x00 0x03] # Field Count + [0x00 0x00 0x00 0x04] # Length + [0x63 0x69 0x74 0x79] # Data + [0x00 0x00 0x00 0x011] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0xff 0xff 0xff 0xff] # Length + +write flush write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index 49b1b8e8e3..602e124808 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -30,7 +30,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateSourceTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTableTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTopicTemplate; -import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeMaterializedViewTemplate; +import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeTemplate; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.CatalogedConfig; @@ -45,7 +45,7 @@ public final class RisingwaveBindingConfig public final List routes; public final RisingwaveCreateTopicTemplate createTopic; public final RisingwaveCreateMaterializedViewTemplate createView; - public final RisingwaveDescribeMaterializedViewTemplate describeView; + public final RisingwaveDescribeTemplate describeView; public final RisingwaveCreateTableTemplate createTable; public final RisingwaveCreateSourceTemplate createSource; public final RisingwaveCreateSinkTemplate createSink; @@ -88,7 +88,7 @@ public RisingwaveBindingConfig( this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location); this.createTopic = new RisingwaveCreateTopicTemplate(); this.createView = new RisingwaveCreateMaterializedViewTemplate(); - this.describeView = new RisingwaveDescribeMaterializedViewTemplate(); + this.describeView = new RisingwaveDescribeTemplate(); this.createFunction = new RisingwaveCreateFunctionTemplate(udf); } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java similarity index 84% rename from incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java rename to incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java index 81748ddcdf..238d75bb06 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java @@ -17,12 +17,12 @@ import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.view.CreateView; -public class RisingwaveDescribeMaterializedViewTemplate extends RisingwaveCommandTemplate +public class RisingwaveDescribeTemplate extends RisingwaveCommandTemplate { private final String sqlFormat = """ - DESCRIBE MATERIALIZED VIEW %s;\u0000"""; + DESCRIBE %s;\u0000"""; - public RisingwaveDescribeMaterializedViewTemplate() + public RisingwaveDescribeTemplate() { } From 497bc827a2231f3576e659fd835477bd5725296f Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 19 Sep 2024 13:52:05 -0700 Subject: [PATCH 09/11] Fix typos --- .../streams/effective/create.materialized.view/client.rpt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt index 87ee044d5d..ac94f1c604 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt @@ -104,14 +104,12 @@ read [0x00 0x03] [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67 # Data [0xff 0xff 0xff 0xff] # Length -write flush - -write zilla:data.ext ${pgsql:dataEx() +read zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .row() .build() .build()} -write [0x00 0x03] # Field Count +read [0x00 0x03] # Field Count [0x00 0x00 0x00 0x04] # Length [0x63 0x69 0x74 0x79] # Data [0x00 0x00 0x00 0x011] # Length From 373792d68b1449b40d30477ea4152d4218a24d2e Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 19 Sep 2024 19:27:38 -0700 Subject: [PATCH 10/11] Checkpoint --- .../binding/pgsql/kafka/config/proxy.yaml | 2 +- .../streams/pgsql/create.topic/client.rpt | 2 +- .../streams/pgsql/create.topic/server.rpt | 4 +- .../config/PgsqlKafkaBindingConfig.java | 3 + .../PgsqlKafkaKeyAvroSchemaTemplate.java | 99 +++++++++++++ .../PgsqlKafkaValueAvroSchemaTemplate.java | 23 +++ .../stream/PgsqlKafkaProxyFactory.java | 25 ++-- .../create.materialized.view/client.rpt | 33 +++-- .../create.materialized.view/server.rpt | 38 +++-- .../RisingwaveCreateSinkTemplate.java | 16 ++- .../RisingwaveCreateTopicTemplate.java | 12 +- .../statement/RisingwavePgsqlTypeMapping.java | 101 ++----------- .../stream/RisingwaveProxyFactory.java | 134 +++++++++++++++--- 13 files changed, 332 insertions(+), 160 deletions(-) create mode 100644 incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml index 017ac77e93..ed7e7b8a13 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml @@ -24,7 +24,7 @@ catalogs: schema: |- { "schemaType": "AVRO", - "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"description\", \"type\": \"string\"}, {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}]}" + "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}]}" } bindings: app0: diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt index b9944d55aa..837addc041 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx() .build() .build()} write "CREATE TOPIC IF NOT EXISTS cities " - "(description VARCHAR, id VARCHAR, name VARCHAR, PRIMARY KEY (id));" + "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));" [0x00] write flush diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt index bb35350c94..3f159cb599 100644 --- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt +++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt @@ -34,8 +34,8 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TOPIC IF NOT EXISTS cities " - "(description VARCHAR, id VARCHAR, name VARCHAR, PRIMARY KEY (id));" +read "CREATE TOPIC IF NOT EXISTS cities " + "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));" [0x00] write advise zilla:flush ${pgsql:flushEx() diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java index fb39285f95..3ad98392cf 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java @@ -20,6 +20,7 @@ import java.util.function.LongFunction; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.PgsqlKafkaConfiguration; +import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema.PgsqlKafkaKeyAvroSchemaTemplate; import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema.PgsqlKafkaValueAvroSchemaTemplate; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; @@ -33,6 +34,7 @@ public final class PgsqlKafkaBindingConfig public final List routes; public final CatalogHandler catalog; public final PgsqlKafkaValueAvroSchemaTemplate avroValueSchema; + public final PgsqlKafkaKeyAvroSchemaTemplate avroKeySchema; public PgsqlKafkaBindingConfig( PgsqlKafkaConfiguration config, @@ -46,6 +48,7 @@ public PgsqlKafkaBindingConfig( this.catalog = supplyCatalog.apply(binding.catalogs.get(0).id); this.avroValueSchema = new PgsqlKafkaValueAvroSchemaTemplate(config.kafkaAvroSchemaNamespace()); + this.avroKeySchema = new PgsqlKafkaKeyAvroSchemaTemplate(config.kafkaAvroSchemaNamespace()); } public PgsqlKafkaRouteConfig resolve( diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java new file mode 100644 index 0000000000..c92a3319cb --- /dev/null +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java @@ -0,0 +1,99 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema; + +import java.util.List; + +import net.sf.jsqlparser.statement.create.table.ColumnDefinition; +import net.sf.jsqlparser.statement.create.table.CreateTable; +import net.sf.jsqlparser.statement.create.table.Index; + +public class PgsqlKafkaKeyAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate +{ + private static final String DATABASE_PLACEHOLDER = "{database}"; + + private final StringBuilder schemaBuilder = new StringBuilder(); + private final String namespace; + + public PgsqlKafkaKeyAvroSchemaTemplate( + String namespace) + { + this.namespace = namespace; + } + + public String generateSchema( + String database, + CreateTable createTable) + { + schemaBuilder.setLength(0); + + final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database); + final String recordName = String.format("%s_key", createTable.getTable().getName()); + + schemaBuilder.append("{\n"); + schemaBuilder.append("\"schemaType\": \"AVRO\",\n"); + schemaBuilder.append("\"schema\": \""); // Begin the schema field + + // Building the actual Avro schema + schemaBuilder.append("{\\\"type\\\": \\\"record\\\","); + schemaBuilder.append(" \\\"name\\\": \\\"").append(recordName).append("\\\","); + schemaBuilder.append(" \\\"namespace\\\": \\\"").append(newNamespace).append("\\\","); + schemaBuilder.append(" \\\"fields\\\": ["); + + for (ColumnDefinition column : createTable.getColumnDefinitions()) + { + String fieldName = column.getColumnName(); + String pgsqlType = column.getColDataType().getDataType(); + + String avroType = convertPgsqlTypeToAvro(pgsqlType); + + schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\","); + schemaBuilder.append(" \\\"type\\\": [\\\"").append(avroType).append("\\\", \\\"null\\\"] },"); + } + + // Remove the last comma and close the fields array + schemaBuilder.setLength(schemaBuilder.length() - 1); + schemaBuilder.append("]"); + + // Closing the Avro schema + schemaBuilder.append("}\"\n}"); + + return schemaBuilder.toString(); + } + + public String primaryKey( + CreateTable statement) + { + String primaryKey = null; + + final List indexes = statement.getIndexes(); + + if (indexes != null && !indexes.isEmpty()) + { + match: + for (Index index : indexes) + { + if ("PRIMARY KEY".equalsIgnoreCase(index.getType())) + { + final List primaryKeyColumns = index.getColumns(); + primaryKey = primaryKeyColumns.get(0).columnName; + break match; + } + } + } + + return primaryKey; + } +} diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java index 0d18df3809..a6a712fe86 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java @@ -96,4 +96,27 @@ public String primaryKey( return primaryKey; } + + public int primaryKeyCount( + CreateTable statement) + { + int primaryKeyCount = 0; + + final List indexes = statement.getIndexes(); + + if (indexes != null && !indexes.isEmpty()) + { + match: + for (Index index : indexes) + { + if ("PRIMARY KEY".equalsIgnoreCase(index.getType())) + { + primaryKeyCount = index.getColumns().size(); + break match; + } + } + } + + return primaryKeyCount; + } } diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 060724aaca..e173ce5ad6 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -1248,30 +1248,31 @@ private void decodeCreateTopicCommand( } else if (server.commandsProcessed == 0) { - final CreateTable statement = (CreateTable) parseStatement(buffer, offset, length); - final String topic = statement.getTable().getName(); + final CreateTable createTable = (CreateTable) parseStatement(buffer, offset, length); + final String topic = createTable.getTable().getName(); topics.clear(); topics.add(String.format("%s.%s", server.database, topic)); final PgsqlKafkaBindingConfig binding = server.binding; - final String primaryKey = binding.avroValueSchema.primaryKey(statement); - - final String subjectKey = primaryKey != null - ? String.format("%s.%s-key", server.database, topic) - : null; - - final String subjectValue = String.format("%s.%s-value", server.database, topic); - final String schemaValue = binding.avroValueSchema.generateSchema(server.database, statement); + final String primaryKey = binding.avroValueSchema.primaryKey(createTable); int versionId = NO_ERROR_SCHEMA_VERSION_ID; - if (subjectKey != null) + if (primaryKey != null) { //TODO: assign versionId to avoid test failure - binding.catalog.register(subjectKey, AVRO_KEY_SCHEMA); + final String subjectKey = String.format("%s.%s-key", server.database, topic); + + final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable); + String keySchema = primaryKeyCount > 1 + ? binding.avroKeySchema.generateSchema(server.database, createTable) + : AVRO_KEY_SCHEMA; + binding.catalog.register(subjectKey, keySchema); } if (versionId != NO_VERSION_ID) { + final String subjectValue = String.format("%s.%s-value", server.database, topic); + final String schemaValue = binding.avroValueSchema.generateSchema(server.database, createTable); versionId = binding.catalog.register(subjectValue, schemaValue); } diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt index ac94f1c604..1aa216cf2f 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt @@ -80,6 +80,15 @@ read advised zilla:flush ${pgsql:flushEx() .modifier(-1) .format("TEXT") .build() + .column() + .name("Is Hidden") + .tableOid(0) + .index(0) + .typeOid(1043) + .length(4) + .modifier(-1) + .format("TEXT") + .build() .column() .name("Description") .tableOid(0) @@ -92,16 +101,18 @@ read advised zilla:flush ${pgsql:flushEx() .build() .build()} -write zilla:data.ext ${pgsql:dataEx() +read zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .row() .build() .build()} -read [0x00 0x03] # Field Count +read [0x00 0x04] # Field Count [0x00 0x00 0x00 0x02] # Length - [0x69 0x64] # Data - [0x00 0x00 0x00 0x011] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67 # Data + "id" # Data + [0x00 0x00 0x00 0x11] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data [0xff 0xff 0xff 0xff] # Length read zilla:data.ext ${pgsql:dataEx() @@ -109,11 +120,13 @@ read zilla:data.ext ${pgsql:dataEx() .row() .build() .build()} -read [0x00 0x03] # Field Count +read [0x00 0x04] # Field Count [0x00 0x00 0x00 0x04] # Length [0x63 0x69 0x74 0x79] # Data - [0x00 0x00 0x00 0x011] # Length + [0x00 0x00 0x00 0x11] # Length [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data [0xff 0xff 0xff 0xff] read advised zilla:flush ${pgsql:flushEx() @@ -135,13 +148,13 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE SINK dev.distinct_cities_sink\n" +write "CREATE SINK distinct_cities_sink\n" "FROM distinct_cities\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" " topic='dev.distinct_cities',\n" - " primary_key='key'\n" + " primary_key='city'\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry='http://localhost:8081'\n" ");" @@ -182,7 +195,7 @@ write zilla:data.ext ${pgsql:dataEx() .build() .build()} write "CREATE TOPIC IF NOT EXISTS distinct_cities " - "(id VARCHAR, city VARCHAR);" + "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));" [0x00] write flush diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt index 350345ff52..c6f3fe3afc 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt @@ -81,6 +81,15 @@ write advise zilla:flush ${pgsql:flushEx() .modifier(-1) .format("TEXT") .build() + .column() + .name("Is Hidden") + .tableOid(0) + .index(0) + .typeOid(1043) + .length(4) + .modifier(-1) + .format("TEXT") + .build() .column() .name("Description") .tableOid(0) @@ -93,16 +102,20 @@ write advise zilla:flush ${pgsql:flushEx() .build() .build()} +write flush + write zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .row() .build() .build()} -write [0x00 0x03] # Field Count +write [0x00 0x04] # Field Count [0x00 0x00 0x00 0x02] # Length - [0x69 0x64] # Data - [0x00 0x00 0x00 0x011] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67 # Data + "id" # Data + [0x00 0x00 0x00 0x11] # Length + [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data [0xff 0xff 0xff 0xff] # Length write flush @@ -112,13 +125,14 @@ write zilla:data.ext ${pgsql:dataEx() .row() .build() .build()} -write [0x00 0x03] # Field Count +write [0x00 0x04] # Field Count [0x00 0x00 0x00 0x04] # Length [0x63 0x69 0x74 0x79] # Data - [0x00 0x00 0x00 0x011] # Length + [0x00 0x00 0x00 0x11] # Length [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data [0xff 0xff 0xff 0xff] # Length - write flush write advise zilla:flush ${pgsql:flushEx() @@ -142,13 +156,13 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE SINK dev.distinct_cities_sink\n" +read "CREATE SINK distinct_cities_sink\n" "FROM distinct_cities\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" " topic='dev.distinct_cities',\n" - " primary_key='key'\n" + " primary_key='city'\n" ") FORMAT UPSERT ENCODE AVRO (\n" " schema.registry='http://localhost:8081'\n" ");" @@ -189,9 +203,9 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TOPIC IF NOT EXISTS distinct_cities " - "(id VARCHAR, city VARCHAR);" - [0x00] +read "CREATE TOPIC IF NOT EXISTS distinct_cities " + "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));" + [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java index 5fc3f62614..1c34fcba3c 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java @@ -14,6 +14,8 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; +import java.util.Map; + import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.view.CreateView; @@ -25,12 +27,11 @@ public class RisingwaveCreateSinkTemplate extends RisingwaveCommandTemplate WITH ( connector='kafka', properties.bootstrap.server='%s', - topic='%s', - primary_key='key' + topic='%s.%s', + primary_key='%s' ) FORMAT UPSERT ENCODE AVRO ( schema.registry='%s' );\u0000"""; - private final String sinkFormat = "%s.%s"; private final String bootstrapServer; private final String schemaRegistry; @@ -44,13 +45,14 @@ public RisingwaveCreateSinkTemplate( } public String generate( - Statement statement, - String database) + String database, + Map columns, + Statement statement) { CreateView createView = (CreateView) statement; String viewName = createView.getView().getName(); - String sinkName = String.format(sinkFormat, database, viewName); + String primaryKey = columns.keySet().iterator().next(); - return String.format(sqlFormat, sinkName, viewName, bootstrapServer, sinkName, schemaRegistry); + return String.format(sqlFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java index f6baa8c54f..0e716228ad 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java @@ -28,6 +28,7 @@ public class RisingwaveCreateTopicTemplate extends RisingwaveCommandTemplate private final String fieldFormat = "%s %s, "; private final StringBuilder fieldBuilder = new StringBuilder(); + private final StringBuilder primaryKeyBuilder = new StringBuilder(); public RisingwaveCreateTopicTemplate() { @@ -57,11 +58,18 @@ public String generate( { String topic = createView.getView().getName(); + primaryKeyBuilder.setLength(0); + columns.keySet().forEach(k -> primaryKeyBuilder.append(k).append(", ")); + primaryKeyBuilder.delete(primaryKeyBuilder.length() - 2, primaryKeyBuilder.length()); + + String primaryKey = String.format(primaryKeyFormat, primaryKeyBuilder); + fieldBuilder.setLength(0); - columns.forEach((k, v) -> fieldBuilder.append(String.format(fieldFormat, k, v))); + columns.forEach((k, v) -> fieldBuilder.append(String.format(fieldFormat, k, + RisingwavePgsqlTypeMapping.typeName(v)))); fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length()); - return String.format(sqlFormat, topic, fieldBuilder, ""); + return String.format(sqlFormat, topic, fieldBuilder, primaryKey); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java index 49a5615935..54079411bd 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java @@ -20,98 +20,17 @@ public final class RisingwavePgsqlTypeMapping { - private static final Map OID_TO_TYPE_NAME = new Object2ObjectHashMap<>(); + private static final Map TYPE_MAPPINGS = new Object2ObjectHashMap<>(); static { - // Initialize the mapping of OIDs to uppercase type names - OID_TO_TYPE_NAME.put(16, "BOOL"); - OID_TO_TYPE_NAME.put(17, "BYTEA"); - OID_TO_TYPE_NAME.put(18, "CHAR"); - OID_TO_TYPE_NAME.put(19, "NAME"); - OID_TO_TYPE_NAME.put(20, "INT8"); - OID_TO_TYPE_NAME.put(21, "INT2"); - OID_TO_TYPE_NAME.put(22, "INT2VECTOR"); - OID_TO_TYPE_NAME.put(23, "INT4"); - OID_TO_TYPE_NAME.put(24, "REGPROC"); - OID_TO_TYPE_NAME.put(25, "TEXT"); - OID_TO_TYPE_NAME.put(26, "OID"); - OID_TO_TYPE_NAME.put(27, "TID"); - OID_TO_TYPE_NAME.put(28, "XID"); - OID_TO_TYPE_NAME.put(29, "CID"); - OID_TO_TYPE_NAME.put(30, "OIDVECTOR"); - OID_TO_TYPE_NAME.put(114, "JSON"); - OID_TO_TYPE_NAME.put(142, "XML"); - OID_TO_TYPE_NAME.put(194, "PG_NODE_TREE"); - OID_TO_TYPE_NAME.put(600, "POINT"); - OID_TO_TYPE_NAME.put(601, "LSEG"); - OID_TO_TYPE_NAME.put(602, "PATH"); - OID_TO_TYPE_NAME.put(603, "BOX"); - OID_TO_TYPE_NAME.put(604, "POLYGON"); - OID_TO_TYPE_NAME.put(628, "LINE"); - OID_TO_TYPE_NAME.put(700, "FLOAT4"); - OID_TO_TYPE_NAME.put(701, "FLOAT8"); - OID_TO_TYPE_NAME.put(705, "UNKNOWN"); - OID_TO_TYPE_NAME.put(718, "CIRCLE"); - OID_TO_TYPE_NAME.put(790, "MONEY"); - OID_TO_TYPE_NAME.put(829, "MACADDR"); - OID_TO_TYPE_NAME.put(869, "INET"); - OID_TO_TYPE_NAME.put(1000, "_BOOL"); - OID_TO_TYPE_NAME.put(1001, "_BYTEA"); - OID_TO_TYPE_NAME.put(1002, "_CHAR"); - OID_TO_TYPE_NAME.put(1003, "_NAME"); - OID_TO_TYPE_NAME.put(1005, "_INT2"); - OID_TO_TYPE_NAME.put(1006, "_INT2VECTOR"); - OID_TO_TYPE_NAME.put(1007, "_INT4"); - OID_TO_TYPE_NAME.put(1008, "_REGPROC"); - OID_TO_TYPE_NAME.put(1009, "_TEXT"); - OID_TO_TYPE_NAME.put(1010, "_TID"); - OID_TO_TYPE_NAME.put(1011, "_XID"); - OID_TO_TYPE_NAME.put(1012, "_CID"); - OID_TO_TYPE_NAME.put(1013, "_OIDVECTOR"); - OID_TO_TYPE_NAME.put(1014, "_BPCHAR"); - OID_TO_TYPE_NAME.put(1015, "_VARCHAR"); - OID_TO_TYPE_NAME.put(1016, "_INT8"); - OID_TO_TYPE_NAME.put(1017, "_POINT"); - OID_TO_TYPE_NAME.put(1018, "_LSEG"); - OID_TO_TYPE_NAME.put(1019, "_PATH"); - OID_TO_TYPE_NAME.put(1020, "_BOX"); - OID_TO_TYPE_NAME.put(1021, "_FLOAT4"); - OID_TO_TYPE_NAME.put(1022, "_FLOAT8"); - OID_TO_TYPE_NAME.put(1028, "_OID"); - OID_TO_TYPE_NAME.put(1033, "ACLITEM"); - OID_TO_TYPE_NAME.put(1034, "_ACLITEM"); - OID_TO_TYPE_NAME.put(1042, "BPCHAR"); - OID_TO_TYPE_NAME.put(1043, "VARCHAR"); - OID_TO_TYPE_NAME.put(1082, "DATE"); - OID_TO_TYPE_NAME.put(1083, "TIME"); - OID_TO_TYPE_NAME.put(1114, "TIMESTAMP"); - OID_TO_TYPE_NAME.put(1184, "TIMESTAMPTZ"); - OID_TO_TYPE_NAME.put(1186, "INTERVAL"); - OID_TO_TYPE_NAME.put(1266, "TIMETZ"); - OID_TO_TYPE_NAME.put(1560, "BIT"); - OID_TO_TYPE_NAME.put(1562, "VARBIT"); - OID_TO_TYPE_NAME.put(1700, "NUMERIC"); - OID_TO_TYPE_NAME.put(1790, "REFCURSOR"); - OID_TO_TYPE_NAME.put(2202, "REGPROCEDURE"); - OID_TO_TYPE_NAME.put(2203, "REGOPER"); - OID_TO_TYPE_NAME.put(2204, "REGOPERATOR"); - OID_TO_TYPE_NAME.put(2205, "REGCLASS"); - OID_TO_TYPE_NAME.put(2206, "REGTYPE"); - OID_TO_TYPE_NAME.put(2950, "UUID"); - OID_TO_TYPE_NAME.put(3220, "PG_LSN"); - OID_TO_TYPE_NAME.put(3614, "TSVECTOR"); - OID_TO_TYPE_NAME.put(3615, "TSQUERY"); - OID_TO_TYPE_NAME.put(3734, "REGCONFIG"); - OID_TO_TYPE_NAME.put(3769, "REGDICTIONARY"); - OID_TO_TYPE_NAME.put(3802, "JSONB"); - OID_TO_TYPE_NAME.put(3904, "INT4RANGE"); - OID_TO_TYPE_NAME.put(3906, "NUMRANGE"); - OID_TO_TYPE_NAME.put(3908, "TSRANGE"); - OID_TO_TYPE_NAME.put(3910, "TSTZRANGE"); - OID_TO_TYPE_NAME.put(3912, "DATERANGE"); - OID_TO_TYPE_NAME.put(3926, "INT8RANGE"); - OID_TO_TYPE_NAME.put(4072, "JSONPATH"); + TYPE_MAPPINGS.put("character varying", "VARCHAR"); + TYPE_MAPPINGS.put("integer", "INT"); + TYPE_MAPPINGS.put("boolean", "BOOL"); + TYPE_MAPPINGS.put("character", "CHAR"); + TYPE_MAPPINGS.put("timestamp without time zone", "TIMESTAMP"); + TYPE_MAPPINGS.put("timestamp with time zone", "TIMESTAMPZ"); + TYPE_MAPPINGS.put("double precision", "DOUBLE"); } private RisingwavePgsqlTypeMapping() @@ -119,8 +38,8 @@ private RisingwavePgsqlTypeMapping() } public static String typeName( - int oid) + String type) { - return OID_TO_TYPE_NAME.get(oid); + return TYPE_MAPPINGS.get(type); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index b591c0713f..1455a884c3 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -20,7 +20,10 @@ import java.io.InputStreamReader; import java.io.Reader; import java.io.StringReader; +import java.nio.ByteOrder; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.LongFunction; @@ -39,10 +42,9 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveBindingConfig; import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveCommandType; import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveRouteConfig; -import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwavePgsqlTypeMapping; -import io.aklivity.zilla.runtime.binding.risingwave.internal.types.Array32FW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.OctetsFW; +import io.aklivity.zilla.runtime.binding.risingwave.internal.types.String32FW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.AbortFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.BeginFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.DataFW; @@ -50,7 +52,6 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.ExtensionFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.FlushFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlBeginExFW; -import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlColumnInfoFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlDataExFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlStatus; @@ -99,12 +100,11 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory private final AbortFW.Builder abortRW = new AbortFW.Builder(); private final FlushFW.Builder flushRW = new FlushFW.Builder(); + private final String32FW columnRO = new String32FW(ByteOrder.BIG_ENDIAN); + private final ResetFW resetRO = new ResetFW(); private final WindowFW windowRO = new WindowFW(); - private final ResetFW.Builder resetRW = new ResetFW.Builder(); - private final WindowFW.Builder windowRW = new WindowFW.Builder(); - private final ExtensionFW extensionRO = new ExtensionFW(); private final PgsqlBeginExFW pgsqlBeginExRO = new PgsqlBeginExFW(); private final PgsqlDataExFW pgsqlDataExRO = new PgsqlDataExFW(); @@ -114,8 +114,8 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory private final PgsqlDataExFW.Builder dataExRW = new PgsqlDataExFW.Builder(); private final PgsqlFlushExFW.Builder flushExRW = new PgsqlFlushExFW.Builder(); - private final Array32FW.Builder columnsRW = - new Array32FW.Builder<>(new PgsqlColumnInfoFW.Builder(), new PgsqlColumnInfoFW()); + private final ResetFW.Builder resetRW = new ResetFW.Builder(); + private final WindowFW.Builder windowRW = new WindowFW.Builder(); private final BufferPool bufferPool; private final RisingwaveConfiguration config; @@ -136,14 +136,17 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory private final PgsqlFlushCommand proxyFlushCommand = this::proxyFlushCommand; private final PgsqlFlushCommand ignoreFlushCommand = this::ignoreFlushCommand; + private final PgsqlDataCommand proxyDataCommand = this::proxyDataCommand; + private final PgsqlDataCommand rowDataCommand = this::rowDataCommand; + private final Object2ObjectHashMap clientTransforms; { Object2ObjectHashMap clientTransforms = new Object2ObjectHashMap<>(); - clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::onDecodeCreateTableCommand); - clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::onDecodeCreateMaterializedViewCommand); - clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::onDecodeCreateFunctionCommand); - clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::onDecodeUnknownCommand); + clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::decodeCreateTableCommand); + clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::decodeCreateMaterializedViewCommand); + clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::decodeCreateFunctionCommand); + clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand); this.clientTransforms = clientTransforms; } @@ -231,6 +234,8 @@ private final class PgsqlServer private final Map parameters; private final LongArrayQueue responses; private final IntArrayQueue queries; + private final List columnTypes; + private final List columnDescriptions; private final Map columns; private final String database; @@ -280,6 +285,8 @@ private PgsqlServer( this.database = dbValue.substring(0, dbValue.length() - 1); this.columns = new Object2ObjectHashMap<>(); + this.columnTypes = new ArrayList<>(); + this.columnDescriptions = new ArrayList<>(); this.streamsByRouteIds = new Long2ObjectHashMap<>(); this.responses = new LongArrayQueue(); this.queries = new IntArrayQueue(); @@ -773,6 +780,7 @@ private final class PgsqlClient private int messageOffset; private PgsqlFlushCommand typeCommand; + private PgsqlDataCommand dataCommand; private PgsqlFlushCommand completionCommand; private PgsqlClient( @@ -784,6 +792,7 @@ private PgsqlClient( this.originId = originId; this.routedId = routedId; + this.dataCommand = proxyDataCommand; this.typeCommand = proxyFlushCommand; this.completionCommand = ignoreFlushCommand; } @@ -869,7 +878,8 @@ private void onAppData( { final OctetsFW extension = data.extension(); final OctetsFW payload = data.payload(); - server.doAppData(routedId, traceId, authorization, flags, + + dataCommand.handle(server, routedId, traceId, authorization, flags, payload.buffer(), payload.offset(), payload.limit(), extension); } } @@ -1003,6 +1013,7 @@ private void onAppReadyFlush( long authorization, PgsqlFlushExFW pgsqlFlushEx) { + dataCommand = proxyDataCommand; server.onQueryReady(traceId, authorization, pgsqlFlushEx); } @@ -1121,6 +1132,8 @@ private void doAppWindow( { state = RisingwaveState.openReply(state); + replyMax = server.replyMax; + doWindow(app, originId, routedId, replyId, replySeq, replyAck, server.replyMax, traceId, authorization, server.replyBudgetId, server.replyPadding); } @@ -1447,7 +1460,7 @@ private void doWindow( sender.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof()); } - private void onDecodeCreateTableCommand( + private void decodeCreateTableCommand( PgsqlServer server, long traceId, long authorization, @@ -1493,7 +1506,7 @@ else if (server.commandsProcessed == 1) } } - private void onDecodeCreateMaterializedViewCommand( + private void decodeCreateMaterializedViewCommand( PgsqlServer server, long traceId, long authorization, @@ -1511,6 +1524,7 @@ private void onDecodeCreateMaterializedViewCommand( final RisingwaveBindingConfig binding = server.binding; final CreateView statement = (CreateView) parseStatement(buffer, offset, length); PgsqlFlushCommand typeCommand = ignoreFlushCommand; + PgsqlDataCommand dataCommand = proxyDataCommand; String newStatement = ""; int progress = 0; @@ -1523,6 +1537,8 @@ else if (server.commandsProcessed == 1) { newStatement = binding.describeView.generate(statement); typeCommand = typeFlushCommand; + dataCommand = rowDataCommand; + server.columns.clear(); } else if (server.commandsProcessed == 2) { @@ -1530,7 +1546,7 @@ else if (server.commandsProcessed == 2) } else if (server.commandsProcessed == 3) { - newStatement = binding.createSink.generate(statement, server.database); + newStatement = binding.createSink.generate(server.database, server.columns, statement); } statementBuffer.putBytes(progress, newStatement.getBytes()); @@ -1542,11 +1558,12 @@ else if (server.commandsProcessed == 3) final PgsqlClient client = server.streamsByRouteIds.get(route.id); client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress); client.typeCommand = typeCommand; + client.dataCommand = dataCommand; client.completionCommand = ignoreFlushCommand; } } - private void onDecodeCreateFunctionCommand( + private void decodeCreateFunctionCommand( PgsqlServer server, long traceId, long authorization, @@ -1583,7 +1600,7 @@ private void onDecodeCreateFunctionCommand( } } - private void onDecodeUnknownCommand( + private void decodeUnknownCommand( PgsqlServer server, long traceId, long authorization, @@ -1630,17 +1647,75 @@ private void typeFlushCommand( long authorization, PgsqlFlushExFW flushEx) { - server.columns.clear(); + server.columnTypes.clear(); flushEx.type().columns() .forEach(c -> { String name = c.name().asString(); name = name.substring(0, name.length() - 1); - String type = RisingwavePgsqlTypeMapping.typeName(c.typeOid()); - server.columns.put(name, type); + server.columnTypes.add(name); }); + } + + private void rowDataCommand( + PgsqlServer server, + long traceId, + long authorization, + long routedId, + int flags, + DirectBuffer buffer, + int offset, + int limit, + OctetsFW extension) + { + int progress = offset; + + final List columnDescriptions = server.columnDescriptions; + + if ((flags & FLAGS_INIT) != 0x00) + { + columnDescriptions.clear(); + progress += Short.BYTES; + } + + column: + while (progress < limit) + { + String32FW column = columnRO.tryWrap(buffer, progress, limit); + + if (column == null) + { + break column; + } + + columnDescriptions.add(column.asString()); - server.doParseQuery(traceId, authorization); + progress = column.limit(); + } + + int nameIndex = server.columnTypes.indexOf("Name"); + int typeIndex = server.columnTypes.indexOf("Type"); + int isHiddenIndex = server.columnTypes.indexOf("Is Hidden"); + + if ("false".equals(columnDescriptions.get(isHiddenIndex))) + { + server.columns.put(columnDescriptions.get(nameIndex), columnDescriptions.get(typeIndex)); + } + + } + + private void proxyDataCommand( + PgsqlServer server, + long traceId, + long authorization, + long routedId, + int flags, + DirectBuffer buffer, + int offset, + int limit, + OctetsFW extension) + { + server.doAppData(routedId, traceId, authorization, flags, buffer, offset, limit, extension); } private RisingwaveCommandType decodeCommandType( @@ -1738,4 +1813,19 @@ void handle( long authorization, PgsqlFlushExFW flushEx); } + + @FunctionalInterface + private interface PgsqlDataCommand + { + void handle( + PgsqlServer server, + long traceId, + long authorization, + long routedId, + int flags, + DirectBuffer buffer, + int offset, + int limit, + OctetsFW extension); + } } From 1f96e33b9914f4312de97d44263cecdb0f8d1e9e Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 19 Sep 2024 19:43:26 -0700 Subject: [PATCH 11/11] Apply feedback from PR --- .../create.materialized.view/client.rpt | 32 +++++++++---------- .../create.materialized.view/server.rpt | 32 +++++++++---------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt index 1aa216cf2f..3da60e9f42 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt @@ -106,28 +106,28 @@ read zilla:data.ext ${pgsql:dataEx() .row() .build() .build()} -read [0x00 0x04] # Field Count - [0x00 0x00 0x00 0x02] # Length - "id" # Data - [0x00 0x00 0x00 0x11] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data - [0x00 0x00 0x00 0x05] # Length - "false" # Data - [0xff 0xff 0xff 0xff] # Length +read [0x00 0x04] # Field Count + [0x00 0x00 0x00 0x02] # Length + "id" # Data + [0x00 0x00 0x00 0x11] # Length + "character varying" # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data + [0xff 0xff 0xff 0xff] # Length read zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .row() .build() .build()} -read [0x00 0x04] # Field Count - [0x00 0x00 0x00 0x04] # Length - [0x63 0x69 0x74 0x79] # Data - [0x00 0x00 0x00 0x11] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data - [0x00 0x00 0x00 0x05] # Length - "false" # Data - [0xff 0xff 0xff 0xff] +read [0x00 0x04] # Field Count + [0x00 0x00 0x00 0x04] # Length + "city" # Data + [0x00 0x00 0x00 0x11] # Length + "character varying" # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data + [0xff 0xff 0xff 0xff] # Length read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt index c6f3fe3afc..b0e6c2c687 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt @@ -109,14 +109,14 @@ write zilla:data.ext ${pgsql:dataEx() .row() .build() .build()} -write [0x00 0x04] # Field Count - [0x00 0x00 0x00 0x02] # Length - "id" # Data - [0x00 0x00 0x00 0x11] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data - [0x00 0x00 0x00 0x05] # Length - "false" # Data - [0xff 0xff 0xff 0xff] # Length +write [0x00 0x04] # Field Count + [0x00 0x00 0x00 0x02] # Length + "id" # Data + [0x00 0x00 0x00 0x11] # Length + "character varying" # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data + [0xff 0xff 0xff 0xff] # Length write flush @@ -125,14 +125,14 @@ write zilla:data.ext ${pgsql:dataEx() .row() .build() .build()} -write [0x00 0x04] # Field Count - [0x00 0x00 0x00 0x04] # Length - [0x63 0x69 0x74 0x79] # Data - [0x00 0x00 0x00 0x11] # Length - [0x63 0x68 0x61 0x72 0x61 0x63 0x74 0x65 0x72 0x20 0x76 0x61 0x72 0x79 0x69 0x6e 0x67] # Data - [0x00 0x00 0x00 0x05] # Length - "false" # Data - [0xff 0xff 0xff 0xff] # Length +write [0x00 0x04] # Field Count + [0x00 0x00 0x00 0x04] # Length + "city" # Data + [0x00 0x00 0x00 0x11] # Length + "character varying" # Data + [0x00 0x00 0x00 0x05] # Length + "false" # Data + [0xff 0xff 0xff 0xff] # Length write flush write advise zilla:flush ${pgsql:flushEx()