diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml
index 6170c8532d..ed7e7b8a13 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/config/proxy.yaml
@@ -23,23 +23,8 @@ catalogs:
id: 1
schema: |-
{
- "type": "record",
- "name": "cities",
- "namespace": "dev",
- "fields": [
- {
- "name": "description",
- "type": string
- },
- {
- "name": "id",
- "type": string
- },
- {
- "name": "name",
- "type": string
- }
- ]
+ "schemaType": "AVRO",
+ "schema": "{\"type\": \"record\", \"name\": \"cities\", \"namespace\": \"dev\", \"fields\": [ {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"city\", \"type\": \"string\"}]}"
}
bindings:
app0:
diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
index 7a21d70c2c..c39986a2ab 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/client.rpt
@@ -13,40 +13,6 @@
# specific language governing permissions and limitations under the License.
#
-connect "zilla://streams/app1"
- option zilla:window 8192
- option zilla:transmission "half-duplex"
-
-write zilla:begin.ext ${kafka:beginEx()
- .typeId(zilla:id("kafka"))
- .request()
- .describeCluster()
- .includeAuthorizedOperations("false")
- .build()
- .build()}
-
-connected
-
-read zilla:begin.ext ${kafka:matchBeginEx()
- .typeId(zilla:id("kafka"))
- .response()
- .describeCluster()
- .throttle(0)
- .error(0)
- .clusterId("cluster-0")
- .controllerId(0)
- .broker()
- .brokerId(1)
- .host("broker1.example.com")
- .port(9092)
- .build()
- .authorizedOperations(0)
- .build()
- .build()}
-
-write close
-read closed
-
connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "half-duplex"
@@ -59,7 +25,6 @@ write zilla:begin.ext ${kafka:beginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
- .assignment(0, 1)
.config("cleanup.policy", "compact")
.build()
.timeout(30000)
diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
index 5c3159fad5..2504756e81 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/kafka/create.topic/server.rpt
@@ -21,39 +21,6 @@ accept ${serverAddress}
accepted
-read zilla:begin.ext ${kafka:matchBeginEx()
- .typeId(zilla:id("kafka"))
- .request()
- .describeCluster()
- .includeAuthorizedOperations("false")
- .build()
- .build()}
-
-connected
-
-write zilla:begin.ext ${kafka:beginEx()
- .typeId(zilla:id("kafka"))
- .response()
- .describeCluster()
- .throttle(0)
- .error(0)
- .clusterId("cluster-0")
- .controllerId(0)
- .broker()
- .brokerId(1)
- .host("broker1.example.com")
- .port(9092)
- .build()
- .authorizedOperations(0)
- .build()
- .build()}
-write flush
-
-read closed
-write close
-
-accepted
-
read zilla:begin.ext ${kafka:matchBeginEx()
.typeId(zilla:id("kafka"))
.request()
@@ -62,7 +29,6 @@ read zilla:begin.ext ${kafka:matchBeginEx()
.name("dev.cities")
.partitionCount(1)
.replicas(1)
- .assignment(0, 1)
.config("cleanup.policy", "compact")
.build()
.timeout(30000)
diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt
index b9944d55aa..837addc041 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE TOPIC IF NOT EXISTS cities "
- "(description VARCHAR, id VARCHAR, name VARCHAR, PRIMARY KEY (id));"
+ "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));"
[0x00]
write flush
diff --git a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt
index bb35350c94..3f159cb599 100644
--- a/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt
+++ b/incubator/binding-pgsql-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/pgsql/kafka/streams/pgsql/create.topic/server.rpt
@@ -34,8 +34,8 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-read "CREATE TOPIC IF NOT EXISTS cities "
- "(description VARCHAR, id VARCHAR, name VARCHAR, PRIMARY KEY (id));"
+read "CREATE TOPIC IF NOT EXISTS cities "
+ "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));"
[0x00]
write advise zilla:flush ${pgsql:flushEx()
diff --git a/incubator/binding-pgsql-kafka/pom.xml b/incubator/binding-pgsql-kafka/pom.xml
index 4bc80c1154..5779c09cb9 100644
--- a/incubator/binding-pgsql-kafka/pom.xml
+++ b/incubator/binding-pgsql-kafka/pom.xml
@@ -24,7 +24,7 @@
- 0.83
+ 0.82
0
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java
index fb39285f95..3ad98392cf 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaBindingConfig.java
@@ -20,6 +20,7 @@
import java.util.function.LongFunction;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.PgsqlKafkaConfiguration;
+import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema.PgsqlKafkaKeyAvroSchemaTemplate;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema.PgsqlKafkaValueAvroSchemaTemplate;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
@@ -33,6 +34,7 @@ public final class PgsqlKafkaBindingConfig
public final List routes;
public final CatalogHandler catalog;
public final PgsqlKafkaValueAvroSchemaTemplate avroValueSchema;
+ public final PgsqlKafkaKeyAvroSchemaTemplate avroKeySchema;
public PgsqlKafkaBindingConfig(
PgsqlKafkaConfiguration config,
@@ -46,6 +48,7 @@ public PgsqlKafkaBindingConfig(
this.catalog = supplyCatalog.apply(binding.catalogs.get(0).id);
this.avroValueSchema = new PgsqlKafkaValueAvroSchemaTemplate(config.kafkaAvroSchemaNamespace());
+ this.avroKeySchema = new PgsqlKafkaKeyAvroSchemaTemplate(config.kafkaAvroSchemaNamespace());
}
public PgsqlKafkaRouteConfig resolve(
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java
new file mode 100644
index 0000000000..c92a3319cb
--- /dev/null
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaKeyAvroSchemaTemplate.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.schema;
+
+import java.util.List;
+
+import net.sf.jsqlparser.statement.create.table.ColumnDefinition;
+import net.sf.jsqlparser.statement.create.table.CreateTable;
+import net.sf.jsqlparser.statement.create.table.Index;
+
+public class PgsqlKafkaKeyAvroSchemaTemplate extends PgsqlKafkaAvroSchemaTemplate
+{
+ private static final String DATABASE_PLACEHOLDER = "{database}";
+
+ private final StringBuilder schemaBuilder = new StringBuilder();
+ private final String namespace;
+
+ public PgsqlKafkaKeyAvroSchemaTemplate(
+ String namespace)
+ {
+ this.namespace = namespace;
+ }
+
+ public String generateSchema(
+ String database,
+ CreateTable createTable)
+ {
+ schemaBuilder.setLength(0);
+
+ final String newNamespace = namespace.replace(DATABASE_PLACEHOLDER, database);
+ final String recordName = String.format("%s_key", createTable.getTable().getName());
+
+ schemaBuilder.append("{\n");
+ schemaBuilder.append("\"schemaType\": \"AVRO\",\n");
+ schemaBuilder.append("\"schema\": \""); // Begin the schema field
+
+ // Building the actual Avro schema
+ schemaBuilder.append("{\\\"type\\\": \\\"record\\\",");
+ schemaBuilder.append(" \\\"name\\\": \\\"").append(recordName).append("\\\",");
+ schemaBuilder.append(" \\\"namespace\\\": \\\"").append(newNamespace).append("\\\",");
+ schemaBuilder.append(" \\\"fields\\\": [");
+
+ for (ColumnDefinition column : createTable.getColumnDefinitions())
+ {
+ String fieldName = column.getColumnName();
+ String pgsqlType = column.getColDataType().getDataType();
+
+ String avroType = convertPgsqlTypeToAvro(pgsqlType);
+
+ schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
+ schemaBuilder.append(" \\\"type\\\": [\\\"").append(avroType).append("\\\", \\\"null\\\"] },");
+ }
+
+ // Remove the last comma and close the fields array
+ schemaBuilder.setLength(schemaBuilder.length() - 1);
+ schemaBuilder.append("]");
+
+ // Closing the Avro schema
+ schemaBuilder.append("}\"\n}");
+
+ return schemaBuilder.toString();
+ }
+
+ public String primaryKey(
+ CreateTable statement)
+ {
+ String primaryKey = null;
+
+ final List indexes = statement.getIndexes();
+
+ if (indexes != null && !indexes.isEmpty())
+ {
+ match:
+ for (Index index : indexes)
+ {
+ if ("PRIMARY KEY".equalsIgnoreCase(index.getType()))
+ {
+ final List primaryKeyColumns = index.getColumns();
+ primaryKey = primaryKeyColumns.get(0).columnName;
+ break match;
+ }
+ }
+ }
+
+ return primaryKey;
+ }
+}
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
index 193243ca24..a6a712fe86 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/schema/PgsqlKafkaValueAvroSchemaTemplate.java
@@ -43,10 +43,14 @@ public String generateSchema(
final String recordName = createTable.getTable().getName();
schemaBuilder.append("{\n");
- schemaBuilder.append("\"type\": \"record\",\n");
- schemaBuilder.append("\"name\": \"").append(recordName).append("\",\n");
- schemaBuilder.append("\"namespace\": \"").append(newNamespace).append("\",\n");
- schemaBuilder.append("\"fields\": [\n");
+ schemaBuilder.append("\"schemaType\": \"AVRO\",\n");
+ schemaBuilder.append("\"schema\": \""); // Begin the schema field
+
+ // Building the actual Avro schema
+ schemaBuilder.append("{\\\"type\\\": \\\"record\\\",");
+ schemaBuilder.append(" \\\"name\\\": \\\"").append(recordName).append("\\\",");
+ schemaBuilder.append(" \\\"namespace\\\": \\\"").append(newNamespace).append("\\\",");
+ schemaBuilder.append(" \\\"fields\\\": [");
for (ColumnDefinition column : createTable.getColumnDefinitions())
{
@@ -55,15 +59,16 @@ public String generateSchema(
String avroType = convertPgsqlTypeToAvro(pgsqlType);
- schemaBuilder.append(" {\n");
- schemaBuilder.append(" \"name\": \"").append(fieldName).append("\",\n");
- schemaBuilder.append(" \"type\": ").append(avroType).append("\n");
- schemaBuilder.append(" },\n");
+ schemaBuilder.append(" {\\\"name\\\": \\\"").append(fieldName).append("\\\",");
+ schemaBuilder.append(" \\\"type\\\": \\\"").append(avroType).append("\\\"},");
}
- // Remove the last comma after the last field and close the JSON array
- schemaBuilder.setLength(schemaBuilder.length() - 2); // Remove last comma
- schemaBuilder.append("\n]\n}");
+ // Remove the last comma and close the fields array
+ schemaBuilder.setLength(schemaBuilder.length() - 1);
+ schemaBuilder.append("]");
+
+ // Closing the Avro schema
+ schemaBuilder.append("}\"\n}");
return schemaBuilder.toString();
}
@@ -91,4 +96,27 @@ public String primaryKey(
return primaryKey;
}
+
+ public int primaryKeyCount(
+ CreateTable statement)
+ {
+ int primaryKeyCount = 0;
+
+ final List indexes = statement.getIndexes();
+
+ if (indexes != null && !indexes.isEmpty())
+ {
+ match:
+ for (Index index : indexes)
+ {
+ if ("PRIMARY KEY".equalsIgnoreCase(index.getType()))
+ {
+ primaryKeyCount = index.getColumns().size();
+ break match;
+ }
+ }
+ }
+
+ return primaryKeyCount;
+ }
}
diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
index 7eda97fe93..e173ce5ad6 100644
--- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
+++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java
@@ -48,7 +48,6 @@
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaBeginExFW;
-import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.KafkaDescribeClusterResponseBeginExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlBeginExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlDataExFW;
import io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.types.stream.PgsqlFlushExFW;
@@ -68,9 +67,16 @@
public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
{
+ private static final String AVRO_KEY_SCHEMA = """
+ {
+ "schemaType": "AVRO",
+ "schema": "{\\"type\\": \\"string\\"}"
+ }""";
+
+
private static final Byte STATEMENT_SEMICOLON = ';';
private static final int END_OF_FIELD = 0x00;
- private static final int NO_ERROR = 0x00;
+ private static final int NO_ERROR_SCHEMA_VERSION_ID = -1;
private static final int FLAGS_INIT = 0x02;
private static final int FLAGS_CONT = 0x00;
@@ -226,9 +232,7 @@ private final class PgsqlProxy
private final String database;
private final PgsqlKafkaBindingConfig binding;
private final KafkaCreateTopicsProxy createTopicsProxy;
- private final KafkaDescribeClusterProxy describeClusterProxy;
- private final List brokers;
private final IntArrayQueue queries;
private final long initialId;
@@ -275,10 +279,8 @@ private PgsqlProxy(
String dbValue = parameters.get("database\u0000");
this.database = dbValue.substring(0, dbValue.length() - 1);
this.queries = new IntArrayQueue();
- this.brokers = new ArrayList<>();
this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
- this.describeClusterProxy = new KafkaDescribeClusterProxy(routedId, resolvedId, this);
}
private void onAppMessage(
@@ -486,14 +488,6 @@ private void onCommandCompleted(
doAppWindow(traceId, authorization);
}
- private void onKafkaDescribeClusterBegin(
- long traceId,
- long authorization)
- {
- commandsProcessed++;
- doParseQuery(traceId, authorization);
- }
-
public void onKafkaCreateTopicsBegin(
long traceId,
long authorization)
@@ -686,7 +680,6 @@ private void cleanup(
long traceId,
long authorization)
{
- describeClusterProxy.doKafkaAbortAndReset(traceId, authorization);
createTopicsProxy.doKafkaAbortAndReset(traceId, authorization);
doAppAbortAndReset(traceId, authorization);
@@ -974,7 +967,6 @@ private void doKafkaBegin(
initialMax = delegate.initialMax;
state = PgsqlKafkaState.openingInitial(state);
- final int numBrokers = delegate.brokers.size();
final int partitionCount = config.kafkaCreateTopicsPartitionCount();
final KafkaBeginExFW kafkaBeginEx =
@@ -985,18 +977,8 @@ private void doKafkaBegin(
.topics(ct ->
topics.forEach(t -> ct.item(i -> i
.name(t)
- .partitionCount(1)
+ .partitionCount(partitionCount)
.replicas(config.kafkaCreateTopicsReplicas())
- .assignments(a -> a
- .item(ai ->
- {
- for (int p = 0; p < partitionCount; p++)
- {
- int brokerIndex = p % numBrokers;
- int brokerId = delegate.brokers.get(brokerIndex);
- ai.partitionId(p).leaderId(brokerId);
- }
- }))
.configs(cf -> cf
.item(ci -> ci.name("cleanup.policy").value(deletionPolicy))))))
.timeout(config.kafkaTopicRequestTimeoutMs())
@@ -1048,74 +1030,6 @@ protected void onKafkaBegin(
}
}
- private final class KafkaDescribeClusterProxy extends KafkaProxy
- {
- private KafkaDescribeClusterProxy(
- long originId,
- long routedId,
- PgsqlProxy delegate)
- {
- super(originId, routedId, delegate);
- }
-
- protected void doKafkaBegin(
- long traceId,
- long authorization)
- {
- state = PgsqlKafkaState.openingInitial(state);
-
- final KafkaBeginExFW kafkaBeginEx =
- kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
- .typeId(kafkaTypeId)
- .request(r -> r
- .describeCluster(d -> d.includeAuthorizedOperations(0)))
- .build();
-
- kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
- traceId, authorization, 0, kafkaBeginEx);
- }
-
- @Override
- protected void onKafkaBegin(
- BeginFW begin)
- {
- final long sequence = begin.sequence();
- final long acknowledge = begin.acknowledge();
- final long traceId = begin.traceId();
- final long authorization = begin.authorization();
- final OctetsFW extension = begin.extension();
-
- assert acknowledge <= sequence;
- assert sequence >= replySeq;
- assert acknowledge >= replyAck;
-
- replySeq = sequence;
- replyAck = acknowledge;
- state = PgsqlKafkaState.openingReply(state);
-
- assert replyAck <= replySeq;
-
- final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
- final KafkaBeginExFW kafkaBeginEx =
- beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;
-
- final KafkaDescribeClusterResponseBeginExFW describeCluster = kafkaBeginEx.response().describeCluster();
-
- if (describeCluster.error() == NO_ERROR)
- {
- describeCluster.brokers().forEach(b -> delegate.brokers.add(b.brokerId()));
- delegate.onKafkaDescribeClusterBegin(traceId, authorization);
-
- doKafkaWindow(traceId, authorization);
- doKafkaEnd(traceId, authorization);
- }
- else
- {
- delegate.cleanup(traceId, authorization);
- }
- }
- }
-
private void doBegin(
final MessageConsumer receiver,
final long originId,
@@ -1328,29 +1242,43 @@ private void decodeCreateTopicCommand(
int offset,
int length)
{
- if (server.commandsProcessed == 2)
+ if (server.commandsProcessed == 1)
{
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.CREATE_TOPIC_COMMAND);
}
else if (server.commandsProcessed == 0)
{
- server.describeClusterProxy.doKafkaBegin(traceId, authorization);
- }
- else if (server.commandsProcessed == 1)
- {
- final CreateTable statement = (CreateTable) parseStatement(buffer, offset, length);
- final String topic = statement.getTable().getName();
+ final CreateTable createTable = (CreateTable) parseStatement(buffer, offset, length);
+ final String topic = createTable.getTable().getName();
topics.clear();
topics.add(String.format("%s.%s", server.database, topic));
final PgsqlKafkaBindingConfig binding = server.binding;
- final String schema = binding.avroValueSchema.generateSchema(server.database, statement);
- final String subject = String.format("%s.%s-value", server.database, topic);
+ final String primaryKey = binding.avroValueSchema.primaryKey(createTable);
+
+ int versionId = NO_ERROR_SCHEMA_VERSION_ID;
+ if (primaryKey != null)
+ {
+ //TODO: assign versionId to avoid test failure
+ final String subjectKey = String.format("%s.%s-key", server.database, topic);
+
+ final int primaryKeyCount = binding.avroValueSchema.primaryKeyCount(createTable);
+ String keySchema = primaryKeyCount > 1
+ ? binding.avroKeySchema.generateSchema(server.database, createTable)
+ : AVRO_KEY_SCHEMA;
+ binding.catalog.register(subjectKey, keySchema);
+ }
+ if (versionId != NO_VERSION_ID)
+ {
+ final String subjectValue = String.format("%s.%s-value", server.database, topic);
+ final String schemaValue = binding.avroValueSchema.generateSchema(server.database, createTable);
+ versionId = binding.catalog.register(subjectValue, schemaValue);
+ }
- if (binding.catalog.register(subject, schema) != NO_VERSION_ID)
+ if (versionId != NO_VERSION_ID)
{
- final String policy = binding.avroValueSchema.primaryKey(statement) != null
+ final String policy = primaryKey != null
? "compact"
: "delete";
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
index e5fd9bcf54..3da60e9f42 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/client.rpt
@@ -56,14 +56,14 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-write "DESCRIBE MATERIALIZED VIEW distinct_cities;"
+write "DESCRIBE distinct_cities;"
[0x00]
read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.type()
.column()
- .name("id")
+ .name("Name")
.tableOid(0)
.index(0)
.typeOid(1043)
@@ -72,7 +72,7 @@ read advised zilla:flush ${pgsql:flushEx()
.format("TEXT")
.build()
.column()
- .name("city")
+ .name("Type")
.tableOid(0)
.index(0)
.typeOid(1043)
@@ -80,8 +80,54 @@ read advised zilla:flush ${pgsql:flushEx()
.modifier(-1)
.format("TEXT")
.build()
+ .column()
+ .name("Is Hidden")
+ .tableOid(0)
+ .index(0)
+ .typeOid(1043)
+ .length(4)
+ .modifier(-1)
+ .format("TEXT")
+ .build()
+ .column()
+ .name("Description")
+ .tableOid(0)
+ .index(0)
+ .typeOid(1043)
+ .length(4)
+ .modifier(-1)
+ .format("TEXT")
+ .build()
+ .build()
+ .build()}
+
+read zilla:data.ext ${pgsql:dataEx()
+ .typeId(zilla:id("pgsql"))
+ .row()
+ .build()
+ .build()}
+read [0x00 0x04] # Field Count
+ [0x00 0x00 0x00 0x02] # Length
+ "id" # Data
+ [0x00 0x00 0x00 0x11] # Length
+ "character varying" # Data
+ [0x00 0x00 0x00 0x05] # Length
+ "false" # Data
+ [0xff 0xff 0xff 0xff] # Length
+
+read zilla:data.ext ${pgsql:dataEx()
+ .typeId(zilla:id("pgsql"))
+ .row()
.build()
.build()}
+read [0x00 0x04] # Field Count
+ [0x00 0x00 0x00 0x04] # Length
+ "city" # Data
+ [0x00 0x00 0x00 0x11] # Length
+ "character varying" # Data
+ [0x00 0x00 0x00 0x05] # Length
+ "false" # Data
+ [0xff 0xff 0xff 0xff] # Length
read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
@@ -102,13 +148,13 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-write "CREATE SINK dev.distinct_cities_sink\n"
+write "CREATE SINK distinct_cities_sink\n"
"FROM distinct_cities\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.distinct_cities',\n"
- " primary_key='key'\n"
+ " primary_key='city'\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry='http://localhost:8081'\n"
");"
@@ -149,7 +195,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE TOPIC IF NOT EXISTS distinct_cities "
- "(id VARCHAR, city VARCHAR);"
+ "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));"
[0x00]
write flush
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
index 508ec1a960..b0e6c2c687 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.materialized.view/server.rpt
@@ -57,14 +57,14 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-read "DESCRIBE MATERIALIZED VIEW distinct_cities;"
+read "DESCRIBE distinct_cities;"
[0x00]
write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.type()
.column()
- .name("id")
+ .name("Name")
.tableOid(0)
.index(0)
.typeOid(1043)
@@ -73,7 +73,25 @@ write advise zilla:flush ${pgsql:flushEx()
.format("TEXT")
.build()
.column()
- .name("city")
+ .name("Type")
+ .tableOid(0)
+ .index(0)
+ .typeOid(1043)
+ .length(4)
+ .modifier(-1)
+ .format("TEXT")
+ .build()
+ .column()
+ .name("Is Hidden")
+ .tableOid(0)
+ .index(0)
+ .typeOid(1043)
+ .length(4)
+ .modifier(-1)
+ .format("TEXT")
+ .build()
+ .column()
+ .name("Description")
.tableOid(0)
.index(0)
.typeOid(1043)
@@ -84,6 +102,39 @@ write advise zilla:flush ${pgsql:flushEx()
.build()
.build()}
+write flush
+
+write zilla:data.ext ${pgsql:dataEx()
+ .typeId(zilla:id("pgsql"))
+ .row()
+ .build()
+ .build()}
+write [0x00 0x04] # Field Count
+ [0x00 0x00 0x00 0x02] # Length
+ "id" # Data
+ [0x00 0x00 0x00 0x11] # Length
+ "character varying" # Data
+ [0x00 0x00 0x00 0x05] # Length
+ "false" # Data
+ [0xff 0xff 0xff 0xff] # Length
+
+write flush
+
+write zilla:data.ext ${pgsql:dataEx()
+ .typeId(zilla:id("pgsql"))
+ .row()
+ .build()
+ .build()}
+write [0x00 0x04] # Field Count
+ [0x00 0x00 0x00 0x04] # Length
+ "city" # Data
+ [0x00 0x00 0x00 0x11] # Length
+ "character varying" # Data
+ [0x00 0x00 0x00 0x05] # Length
+ "false" # Data
+ [0xff 0xff 0xff 0xff] # Length
+write flush
+
write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
@@ -105,13 +156,13 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-read "CREATE SINK dev.distinct_cities_sink\n"
+read "CREATE SINK distinct_cities_sink\n"
"FROM distinct_cities\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.distinct_cities',\n"
- " primary_key='key'\n"
+ " primary_key='city'\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry='http://localhost:8081'\n"
");"
@@ -152,9 +203,9 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
-read "CREATE TOPIC IF NOT EXISTS distinct_cities "
- "(id VARCHAR, city VARCHAR);"
- [0x00]
+read "CREATE TOPIC IF NOT EXISTS distinct_cities "
+ "(id VARCHAR, city VARCHAR, PRIMARY KEY (id, city));"
+ [0x00]
write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
index 80ac51e3ed..fb70359d1e 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TABLE IF NOT EXISTS cities (\n"
" *,\n"
- " PRIMARY KEY (id)\n"
+ " PRIMARY KEY (key)\n"
") INCLUDE KEY AS key\n"
"WITH (\n"
" connector='kafka',\n"
@@ -42,7 +42,6 @@ write "CREATE TABLE IF NOT EXISTS cities (\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
index deea693f80..6479311838 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt
@@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TABLE IF NOT EXISTS cities (\n"
" *,\n"
- " PRIMARY KEY (id)\n"
+ " PRIMARY KEY (key)\n"
") INCLUDE KEY AS key\n"
"WITH (\n"
" connector='kafka',\n"
@@ -46,7 +46,6 @@ read "CREATE TABLE IF NOT EXISTS cities (\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt
index 34b1eb1260..5635455b3c 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt
@@ -39,8 +39,7 @@ write "CREATE SOURCE IF NOT EXISTS weather (*)\n"
" topic='dev.weather',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
- ") FORMAT UPSERT ENCODE AVRO (\n"
+ ") FORMAT PLAIN ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
[0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt
index b4e06de315..52a524dbc4 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt
@@ -43,8 +43,7 @@ read "CREATE SOURCE IF NOT EXISTS weather (*)\n"
" topic='dev.weather',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
- ") FORMAT UPSERT ENCODE AVRO (\n"
+ ") FORMAT PLAIN ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
[0x00]
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
index c1a8d42a24..ec71b51e3c 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt
@@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TABLE IF NOT EXISTS cities (\n"
" *,\n"
- " PRIMARY KEY (id)\n"
+ " PRIMARY KEY (key)\n"
") INCLUDE KEY AS key\n"
"WITH (\n"
" connector='kafka',\n"
@@ -42,7 +42,6 @@ write "CREATE TABLE IF NOT EXISTS cities (\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
index 014dd4d23a..69ae550915 100644
--- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
+++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt
@@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TABLE IF NOT EXISTS cities (\n"
" *,\n"
- " PRIMARY KEY (id)\n"
+ " PRIMARY KEY (key)\n"
") INCLUDE KEY AS key\n"
"WITH (\n"
" connector='kafka',\n"
@@ -46,7 +46,6 @@ read "CREATE TABLE IF NOT EXISTS cities (\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
- ")\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java
index 49b1b8e8e3..602e124808 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java
@@ -30,7 +30,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateSourceTemplate;
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTableTemplate;
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTopicTemplate;
-import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeMaterializedViewTemplate;
+import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeTemplate;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
@@ -45,7 +45,7 @@ public final class RisingwaveBindingConfig
public final List routes;
public final RisingwaveCreateTopicTemplate createTopic;
public final RisingwaveCreateMaterializedViewTemplate createView;
- public final RisingwaveDescribeMaterializedViewTemplate describeView;
+ public final RisingwaveDescribeTemplate describeView;
public final RisingwaveCreateTableTemplate createTable;
public final RisingwaveCreateSourceTemplate createSource;
public final RisingwaveCreateSinkTemplate createSink;
@@ -88,7 +88,7 @@ public RisingwaveBindingConfig(
this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location);
this.createTopic = new RisingwaveCreateTopicTemplate();
this.createView = new RisingwaveCreateMaterializedViewTemplate();
- this.describeView = new RisingwaveDescribeMaterializedViewTemplate();
+ this.describeView = new RisingwaveDescribeTemplate();
this.createFunction = new RisingwaveCreateFunctionTemplate(udf);
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
index 5fc3f62614..1c34fcba3c 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java
@@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.binding.risingwave.internal.statement;
+import java.util.Map;
+
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.view.CreateView;
@@ -25,12 +27,11 @@ public class RisingwaveCreateSinkTemplate extends RisingwaveCommandTemplate
WITH (
connector='kafka',
properties.bootstrap.server='%s',
- topic='%s',
- primary_key='key'
+ topic='%s.%s',
+ primary_key='%s'
) FORMAT UPSERT ENCODE AVRO (
schema.registry='%s'
);\u0000""";
- private final String sinkFormat = "%s.%s";
private final String bootstrapServer;
private final String schemaRegistry;
@@ -44,13 +45,14 @@ public RisingwaveCreateSinkTemplate(
}
public String generate(
- Statement statement,
- String database)
+ String database,
+ Map columns,
+ Statement statement)
{
CreateView createView = (CreateView) statement;
String viewName = createView.getView().getName();
- String sinkName = String.format(sinkFormat, database, viewName);
+ String primaryKey = columns.keySet().iterator().next();
- return String.format(sqlFormat, sinkName, viewName, bootstrapServer, sinkName, schemaRegistry);
+ return String.format(sqlFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry);
}
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java
index 1ee9f0e1f7..26cb3e096b 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java
@@ -27,8 +27,7 @@ public class RisingwaveCreateSourceTemplate extends RisingwaveCommandTemplate
topic='%s.%s',
scan.startup.mode='latest',
scan.startup.timestamp.millis='%d'
- )
- ) FORMAT UPSERT ENCODE AVRO (
+ ) FORMAT PLAIN ENCODE AVRO (
schema.registry = '%s'
);\u0000""";
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
index 3b64a685eb..8b264a3576 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java
@@ -22,7 +22,7 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate
private final String sqlFormat = """
CREATE TABLE IF NOT EXISTS %s (
*,
- PRIMARY KEY (%s)
+ PRIMARY KEY (key)
) INCLUDE KEY AS key
WITH (
connector='kafka',
@@ -30,7 +30,6 @@ PRIMARY KEY (%s)
topic='%s.%s',
scan.startup.mode='latest',
scan.startup.timestamp.millis='%d'
- )
) FORMAT UPSERT ENCODE AVRO (
schema.registry = '%s'
);\u0000""";
@@ -55,9 +54,8 @@ public String generate(
{
CreateTable createTable = (CreateTable) statement;
String table = createTable.getTable().getName();
- String primaryKey = getPrimaryKey(createTable);
- return String.format(sqlFormat, table, primaryKey, bootstrapServer, database,
+ return String.format(sqlFormat, table, bootstrapServer, database,
table, scanStartupMil, schemaRegistry);
}
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java
index f6baa8c54f..0e716228ad 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java
@@ -28,6 +28,7 @@ public class RisingwaveCreateTopicTemplate extends RisingwaveCommandTemplate
private final String fieldFormat = "%s %s, ";
private final StringBuilder fieldBuilder = new StringBuilder();
+ private final StringBuilder primaryKeyBuilder = new StringBuilder();
public RisingwaveCreateTopicTemplate()
{
@@ -57,11 +58,18 @@ public String generate(
{
String topic = createView.getView().getName();
+ primaryKeyBuilder.setLength(0);
+ columns.keySet().forEach(k -> primaryKeyBuilder.append(k).append(", "));
+ primaryKeyBuilder.delete(primaryKeyBuilder.length() - 2, primaryKeyBuilder.length());
+
+ String primaryKey = String.format(primaryKeyFormat, primaryKeyBuilder);
+
fieldBuilder.setLength(0);
- columns.forEach((k, v) -> fieldBuilder.append(String.format(fieldFormat, k, v)));
+ columns.forEach((k, v) -> fieldBuilder.append(String.format(fieldFormat, k,
+ RisingwavePgsqlTypeMapping.typeName(v))));
fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length());
- return String.format(sqlFormat, topic, fieldBuilder, "");
+ return String.format(sqlFormat, topic, fieldBuilder, primaryKey);
}
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java
similarity index 84%
rename from incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java
rename to incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java
index 81748ddcdf..238d75bb06 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeMaterializedViewTemplate.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDescribeTemplate.java
@@ -17,12 +17,12 @@
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.view.CreateView;
-public class RisingwaveDescribeMaterializedViewTemplate extends RisingwaveCommandTemplate
+public class RisingwaveDescribeTemplate extends RisingwaveCommandTemplate
{
private final String sqlFormat = """
- DESCRIBE MATERIALIZED VIEW %s;\u0000""";
+ DESCRIBE %s;\u0000""";
- public RisingwaveDescribeMaterializedViewTemplate()
+ public RisingwaveDescribeTemplate()
{
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
index 49a5615935..54079411bd 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwavePgsqlTypeMapping.java
@@ -20,98 +20,17 @@
public final class RisingwavePgsqlTypeMapping
{
- private static final Map OID_TO_TYPE_NAME = new Object2ObjectHashMap<>();
+ private static final Map TYPE_MAPPINGS = new Object2ObjectHashMap<>();
static
{
- // Initialize the mapping of OIDs to uppercase type names
- OID_TO_TYPE_NAME.put(16, "BOOL");
- OID_TO_TYPE_NAME.put(17, "BYTEA");
- OID_TO_TYPE_NAME.put(18, "CHAR");
- OID_TO_TYPE_NAME.put(19, "NAME");
- OID_TO_TYPE_NAME.put(20, "INT8");
- OID_TO_TYPE_NAME.put(21, "INT2");
- OID_TO_TYPE_NAME.put(22, "INT2VECTOR");
- OID_TO_TYPE_NAME.put(23, "INT4");
- OID_TO_TYPE_NAME.put(24, "REGPROC");
- OID_TO_TYPE_NAME.put(25, "TEXT");
- OID_TO_TYPE_NAME.put(26, "OID");
- OID_TO_TYPE_NAME.put(27, "TID");
- OID_TO_TYPE_NAME.put(28, "XID");
- OID_TO_TYPE_NAME.put(29, "CID");
- OID_TO_TYPE_NAME.put(30, "OIDVECTOR");
- OID_TO_TYPE_NAME.put(114, "JSON");
- OID_TO_TYPE_NAME.put(142, "XML");
- OID_TO_TYPE_NAME.put(194, "PG_NODE_TREE");
- OID_TO_TYPE_NAME.put(600, "POINT");
- OID_TO_TYPE_NAME.put(601, "LSEG");
- OID_TO_TYPE_NAME.put(602, "PATH");
- OID_TO_TYPE_NAME.put(603, "BOX");
- OID_TO_TYPE_NAME.put(604, "POLYGON");
- OID_TO_TYPE_NAME.put(628, "LINE");
- OID_TO_TYPE_NAME.put(700, "FLOAT4");
- OID_TO_TYPE_NAME.put(701, "FLOAT8");
- OID_TO_TYPE_NAME.put(705, "UNKNOWN");
- OID_TO_TYPE_NAME.put(718, "CIRCLE");
- OID_TO_TYPE_NAME.put(790, "MONEY");
- OID_TO_TYPE_NAME.put(829, "MACADDR");
- OID_TO_TYPE_NAME.put(869, "INET");
- OID_TO_TYPE_NAME.put(1000, "_BOOL");
- OID_TO_TYPE_NAME.put(1001, "_BYTEA");
- OID_TO_TYPE_NAME.put(1002, "_CHAR");
- OID_TO_TYPE_NAME.put(1003, "_NAME");
- OID_TO_TYPE_NAME.put(1005, "_INT2");
- OID_TO_TYPE_NAME.put(1006, "_INT2VECTOR");
- OID_TO_TYPE_NAME.put(1007, "_INT4");
- OID_TO_TYPE_NAME.put(1008, "_REGPROC");
- OID_TO_TYPE_NAME.put(1009, "_TEXT");
- OID_TO_TYPE_NAME.put(1010, "_TID");
- OID_TO_TYPE_NAME.put(1011, "_XID");
- OID_TO_TYPE_NAME.put(1012, "_CID");
- OID_TO_TYPE_NAME.put(1013, "_OIDVECTOR");
- OID_TO_TYPE_NAME.put(1014, "_BPCHAR");
- OID_TO_TYPE_NAME.put(1015, "_VARCHAR");
- OID_TO_TYPE_NAME.put(1016, "_INT8");
- OID_TO_TYPE_NAME.put(1017, "_POINT");
- OID_TO_TYPE_NAME.put(1018, "_LSEG");
- OID_TO_TYPE_NAME.put(1019, "_PATH");
- OID_TO_TYPE_NAME.put(1020, "_BOX");
- OID_TO_TYPE_NAME.put(1021, "_FLOAT4");
- OID_TO_TYPE_NAME.put(1022, "_FLOAT8");
- OID_TO_TYPE_NAME.put(1028, "_OID");
- OID_TO_TYPE_NAME.put(1033, "ACLITEM");
- OID_TO_TYPE_NAME.put(1034, "_ACLITEM");
- OID_TO_TYPE_NAME.put(1042, "BPCHAR");
- OID_TO_TYPE_NAME.put(1043, "VARCHAR");
- OID_TO_TYPE_NAME.put(1082, "DATE");
- OID_TO_TYPE_NAME.put(1083, "TIME");
- OID_TO_TYPE_NAME.put(1114, "TIMESTAMP");
- OID_TO_TYPE_NAME.put(1184, "TIMESTAMPTZ");
- OID_TO_TYPE_NAME.put(1186, "INTERVAL");
- OID_TO_TYPE_NAME.put(1266, "TIMETZ");
- OID_TO_TYPE_NAME.put(1560, "BIT");
- OID_TO_TYPE_NAME.put(1562, "VARBIT");
- OID_TO_TYPE_NAME.put(1700, "NUMERIC");
- OID_TO_TYPE_NAME.put(1790, "REFCURSOR");
- OID_TO_TYPE_NAME.put(2202, "REGPROCEDURE");
- OID_TO_TYPE_NAME.put(2203, "REGOPER");
- OID_TO_TYPE_NAME.put(2204, "REGOPERATOR");
- OID_TO_TYPE_NAME.put(2205, "REGCLASS");
- OID_TO_TYPE_NAME.put(2206, "REGTYPE");
- OID_TO_TYPE_NAME.put(2950, "UUID");
- OID_TO_TYPE_NAME.put(3220, "PG_LSN");
- OID_TO_TYPE_NAME.put(3614, "TSVECTOR");
- OID_TO_TYPE_NAME.put(3615, "TSQUERY");
- OID_TO_TYPE_NAME.put(3734, "REGCONFIG");
- OID_TO_TYPE_NAME.put(3769, "REGDICTIONARY");
- OID_TO_TYPE_NAME.put(3802, "JSONB");
- OID_TO_TYPE_NAME.put(3904, "INT4RANGE");
- OID_TO_TYPE_NAME.put(3906, "NUMRANGE");
- OID_TO_TYPE_NAME.put(3908, "TSRANGE");
- OID_TO_TYPE_NAME.put(3910, "TSTZRANGE");
- OID_TO_TYPE_NAME.put(3912, "DATERANGE");
- OID_TO_TYPE_NAME.put(3926, "INT8RANGE");
- OID_TO_TYPE_NAME.put(4072, "JSONPATH");
+ TYPE_MAPPINGS.put("character varying", "VARCHAR");
+ TYPE_MAPPINGS.put("integer", "INT");
+ TYPE_MAPPINGS.put("boolean", "BOOL");
+ TYPE_MAPPINGS.put("character", "CHAR");
+ TYPE_MAPPINGS.put("timestamp without time zone", "TIMESTAMP");
+ TYPE_MAPPINGS.put("timestamp with time zone", "TIMESTAMPZ");
+ TYPE_MAPPINGS.put("double precision", "DOUBLE");
}
private RisingwavePgsqlTypeMapping()
@@ -119,8 +38,8 @@ private RisingwavePgsqlTypeMapping()
}
public static String typeName(
- int oid)
+ String type)
{
- return OID_TO_TYPE_NAME.get(oid);
+ return TYPE_MAPPINGS.get(type);
}
}
diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java
index 6b434124f4..1455a884c3 100644
--- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java
+++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java
@@ -20,7 +20,10 @@
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringReader;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.LongFunction;
@@ -39,10 +42,9 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveBindingConfig;
import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveCommandType;
import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveRouteConfig;
-import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwavePgsqlTypeMapping;
-import io.aklivity.zilla.runtime.binding.risingwave.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.OctetsFW;
+import io.aklivity.zilla.runtime.binding.risingwave.internal.types.String32FW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.AbortFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.BeginFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.DataFW;
@@ -50,7 +52,6 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlBeginExFW;
-import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlColumnInfoFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlDataExFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlStatus;
@@ -99,12 +100,11 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory
private final AbortFW.Builder abortRW = new AbortFW.Builder();
private final FlushFW.Builder flushRW = new FlushFW.Builder();
+ private final String32FW columnRO = new String32FW(ByteOrder.BIG_ENDIAN);
+
private final ResetFW resetRO = new ResetFW();
private final WindowFW windowRO = new WindowFW();
- private final ResetFW.Builder resetRW = new ResetFW.Builder();
- private final WindowFW.Builder windowRW = new WindowFW.Builder();
-
private final ExtensionFW extensionRO = new ExtensionFW();
private final PgsqlBeginExFW pgsqlBeginExRO = new PgsqlBeginExFW();
private final PgsqlDataExFW pgsqlDataExRO = new PgsqlDataExFW();
@@ -114,8 +114,8 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory
private final PgsqlDataExFW.Builder dataExRW = new PgsqlDataExFW.Builder();
private final PgsqlFlushExFW.Builder flushExRW = new PgsqlFlushExFW.Builder();
- private final Array32FW.Builder columnsRW =
- new Array32FW.Builder<>(new PgsqlColumnInfoFW.Builder(), new PgsqlColumnInfoFW());
+ private final ResetFW.Builder resetRW = new ResetFW.Builder();
+ private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final BufferPool bufferPool;
private final RisingwaveConfiguration config;
@@ -136,14 +136,17 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory
private final PgsqlFlushCommand proxyFlushCommand = this::proxyFlushCommand;
private final PgsqlFlushCommand ignoreFlushCommand = this::ignoreFlushCommand;
+ private final PgsqlDataCommand proxyDataCommand = this::proxyDataCommand;
+ private final PgsqlDataCommand rowDataCommand = this::rowDataCommand;
+
private final Object2ObjectHashMap clientTransforms;
{
Object2ObjectHashMap clientTransforms =
new Object2ObjectHashMap<>();
- clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::onDecodeCreateTableCommand);
- clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::onDecodeCreateMaterializedViewCommand);
- clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::onDecodeCreateFunctionCommand);
- clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::onDecodeUnknownCommand);
+ clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::decodeCreateTableCommand);
+ clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::decodeCreateMaterializedViewCommand);
+ clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::decodeCreateFunctionCommand);
+ clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
this.clientTransforms = clientTransforms;
}
@@ -231,6 +234,8 @@ private final class PgsqlServer
private final Map parameters;
private final LongArrayQueue responses;
private final IntArrayQueue queries;
+ private final List columnTypes;
+ private final List columnDescriptions;
private final Map columns;
private final String database;
@@ -244,6 +249,7 @@ private final class PgsqlServer
private int initialMax;
private int initialPad;
private long initialBudgetId;
+ private long affinity;
private long replySeq;
private long replyAck;
@@ -279,6 +285,8 @@ private PgsqlServer(
this.database = dbValue.substring(0, dbValue.length() - 1);
this.columns = new Object2ObjectHashMap<>();
+ this.columnTypes = new ArrayList<>();
+ this.columnDescriptions = new ArrayList<>();
this.streamsByRouteIds = new Long2ObjectHashMap<>();
this.responses = new LongArrayQueue();
this.queries = new IntArrayQueue();
@@ -329,7 +337,8 @@ private void onAppBegin(
{
final long traceId = begin.traceId();
final long authorization = begin.authorization();
- final long affinity = begin.affinity();
+
+ this.affinity = begin.affinity();
state = RisingwaveState.openingInitial(state);
@@ -751,11 +760,12 @@ private final class PgsqlClient
private MessageConsumer app;
- private final long initialId;
- private final long replyId;
private final long originId;
private final long routedId;
+ private long initialId;
+ private long replyId;
+
private long initialSeq;
private long initialAck;
private int initialMax;
@@ -770,6 +780,7 @@ private final class PgsqlClient
private int messageOffset;
private PgsqlFlushCommand typeCommand;
+ private PgsqlDataCommand dataCommand;
private PgsqlFlushCommand completionCommand;
private PgsqlClient(
@@ -780,9 +791,8 @@ private PgsqlClient(
this.server = server;
this.originId = originId;
this.routedId = routedId;
- this.initialId = supplyInitialId.applyAsLong(routedId);
- this.replyId = supplyReplyId.applyAsLong(initialId);
+ this.dataCommand = proxyDataCommand;
this.typeCommand = proxyFlushCommand;
this.completionCommand = ignoreFlushCommand;
}
@@ -868,7 +878,8 @@ private void onAppData(
{
final OctetsFW extension = data.extension();
final OctetsFW payload = data.payload();
- server.doAppData(routedId, traceId, authorization, flags,
+
+ dataCommand.handle(server, routedId, traceId, authorization, flags,
payload.buffer(), payload.offset(), payload.limit(), extension);
}
}
@@ -926,14 +937,9 @@ private void onAppEnd(
final long traceId = end.traceId();
final long authorization = end.authorization();
- if (!RisingwaveState.closed(server.state))
- {
- state = RisingwaveState.closeReply(state);
-
- doAppEnd(traceId, authorization);
+ state = RisingwaveState.closeReply(state);
- server.cleanup(traceId, authorization);
- }
+ doAppEnd(traceId, authorization);
}
private void onAppAbort(
@@ -1007,6 +1013,7 @@ private void onAppReadyFlush(
long authorization,
PgsqlFlushExFW pgsqlFlushEx)
{
+ dataCommand = proxyDataCommand;
server.onQueryReady(traceId, authorization, pgsqlFlushEx);
}
@@ -1015,8 +1022,16 @@ private void doAppBegin(
long authorization,
long affinity)
{
+ if (RisingwaveState.closed(state))
+ {
+ state = 0;
+ }
+
if (!RisingwaveState.initialOpening(state))
{
+ this.initialId = supplyInitialId.applyAsLong(routedId);
+ this.replyId = supplyReplyId.applyAsLong(initialId);
+
Consumer beginEx = e -> e.set((b, o, l) -> beginExRW.wrap(b, o, l)
.typeId(pgsqlTypeId)
.parameters(p -> server.parameters.forEach((k, v) -> p.item(i -> i.name(k).value(v))))
@@ -1066,7 +1081,7 @@ private void doAppEnd(
long traceId,
long authorization)
{
- if (RisingwaveState.initialOpened(state))
+ if (RisingwaveState.initialOpening(state))
{
state = RisingwaveState.closeInitial(state);
@@ -1117,6 +1132,8 @@ private void doAppWindow(
{
state = RisingwaveState.openReply(state);
+ replyMax = server.replyMax;
+
doWindow(app, originId, routedId, replyId, replySeq, replyAck, server.replyMax,
traceId, authorization, server.replyBudgetId, server.replyPadding);
}
@@ -1134,6 +1151,7 @@ private void doPgsqlQuery(
final int lengthMax = Math.min(initialWin - initialPad, remaining);
if (RisingwaveState.initialOpened(state) &&
+ !RisingwaveState.initialClosing(state) &&
lengthMax > 0 && remaining > 0)
{
final int deferred = remaining - length;
@@ -1159,6 +1177,10 @@ private void doPgsqlQuery(
messageOffset += lengthMax;
}
+ else
+ {
+ doAppBegin(traceId, authorization, server.affinity);
+ }
}
}
@@ -1438,7 +1460,7 @@ private void doWindow(
sender.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof());
}
- private void onDecodeCreateTableCommand(
+ private void decodeCreateTableCommand(
PgsqlServer server,
long traceId,
long authorization,
@@ -1484,7 +1506,7 @@ else if (server.commandsProcessed == 1)
}
}
- private void onDecodeCreateMaterializedViewCommand(
+ private void decodeCreateMaterializedViewCommand(
PgsqlServer server,
long traceId,
long authorization,
@@ -1502,6 +1524,7 @@ private void onDecodeCreateMaterializedViewCommand(
final RisingwaveBindingConfig binding = server.binding;
final CreateView statement = (CreateView) parseStatement(buffer, offset, length);
PgsqlFlushCommand typeCommand = ignoreFlushCommand;
+ PgsqlDataCommand dataCommand = proxyDataCommand;
String newStatement = "";
int progress = 0;
@@ -1514,6 +1537,8 @@ else if (server.commandsProcessed == 1)
{
newStatement = binding.describeView.generate(statement);
typeCommand = typeFlushCommand;
+ dataCommand = rowDataCommand;
+ server.columns.clear();
}
else if (server.commandsProcessed == 2)
{
@@ -1521,7 +1546,7 @@ else if (server.commandsProcessed == 2)
}
else if (server.commandsProcessed == 3)
{
- newStatement = binding.createSink.generate(statement, server.database);
+ newStatement = binding.createSink.generate(server.database, server.columns, statement);
}
statementBuffer.putBytes(progress, newStatement.getBytes());
@@ -1533,11 +1558,12 @@ else if (server.commandsProcessed == 3)
final PgsqlClient client = server.streamsByRouteIds.get(route.id);
client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress);
client.typeCommand = typeCommand;
+ client.dataCommand = dataCommand;
client.completionCommand = ignoreFlushCommand;
}
}
- private void onDecodeCreateFunctionCommand(
+ private void decodeCreateFunctionCommand(
PgsqlServer server,
long traceId,
long authorization,
@@ -1574,7 +1600,7 @@ private void onDecodeCreateFunctionCommand(
}
}
- private void onDecodeUnknownCommand(
+ private void decodeUnknownCommand(
PgsqlServer server,
long traceId,
long authorization,
@@ -1621,17 +1647,75 @@ private void typeFlushCommand(
long authorization,
PgsqlFlushExFW flushEx)
{
- server.columns.clear();
+ server.columnTypes.clear();
flushEx.type().columns()
.forEach(c ->
{
String name = c.name().asString();
name = name.substring(0, name.length() - 1);
- String type = RisingwavePgsqlTypeMapping.typeName(c.typeOid());
- server.columns.put(name, type);
+ server.columnTypes.add(name);
});
+ }
+
+ private void rowDataCommand(
+ PgsqlServer server,
+ long traceId,
+ long authorization,
+ long routedId,
+ int flags,
+ DirectBuffer buffer,
+ int offset,
+ int limit,
+ OctetsFW extension)
+ {
+ int progress = offset;
- server.doParseQuery(traceId, authorization);
+ final List columnDescriptions = server.columnDescriptions;
+
+ if ((flags & FLAGS_INIT) != 0x00)
+ {
+ columnDescriptions.clear();
+ progress += Short.BYTES;
+ }
+
+ column:
+ while (progress < limit)
+ {
+ String32FW column = columnRO.tryWrap(buffer, progress, limit);
+
+ if (column == null)
+ {
+ break column;
+ }
+
+ columnDescriptions.add(column.asString());
+
+ progress = column.limit();
+ }
+
+ int nameIndex = server.columnTypes.indexOf("Name");
+ int typeIndex = server.columnTypes.indexOf("Type");
+ int isHiddenIndex = server.columnTypes.indexOf("Is Hidden");
+
+ if ("false".equals(columnDescriptions.get(isHiddenIndex)))
+ {
+ server.columns.put(columnDescriptions.get(nameIndex), columnDescriptions.get(typeIndex));
+ }
+
+ }
+
+ private void proxyDataCommand(
+ PgsqlServer server,
+ long traceId,
+ long authorization,
+ long routedId,
+ int flags,
+ DirectBuffer buffer,
+ int offset,
+ int limit,
+ OctetsFW extension)
+ {
+ server.doAppData(routedId, traceId, authorization, flags, buffer, offset, limit, extension);
}
private RisingwaveCommandType decodeCommandType(
@@ -1729,4 +1813,19 @@ void handle(
long authorization,
PgsqlFlushExFW flushEx);
}
+
+ @FunctionalInterface
+ private interface PgsqlDataCommand
+ {
+ void handle(
+ PgsqlServer server,
+ long traceId,
+ long authorization,
+ long routedId,
+ int flags,
+ DirectBuffer buffer,
+ int offset,
+ int limit,
+ OctetsFW extension);
+ }
}
diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java
index 4c1dd223cc..51619eb6b8 100644
--- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java
+++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeClusterFactory.java
@@ -39,11 +39,11 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
-import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.ClusterBrokerFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterRequestFW;
+import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterRequestPart2FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponseFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.describe_cluster.DescribeClusterResponsePart2FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW;
@@ -99,6 +99,8 @@ public final class KafkaClientDescribeClusterFactory extends KafkaClientSaslHand
private final RequestHeaderFW.Builder requestHeaderRW = new RequestHeaderFW.Builder();
private final DescribeClusterRequestFW.Builder describeClusterRequestRW = new DescribeClusterRequestFW.Builder();
+ private final DescribeClusterRequestPart2FW.Builder describeClusterRequestPart2RW =
+ new DescribeClusterRequestPart2FW.Builder();
private final ResponseHeaderFW responseHeaderRO = new ResponseHeaderFW();
private final DescribeClusterResponseFW describeClusterResponseRO = new DescribeClusterResponseFW();
@@ -494,8 +496,8 @@ private int decodeDescribeClusterResponse(
final int throttle = describeClusterResponse.throttle();
final short error = describeClusterResponse.error();
- final String16FW message = describeClusterResponse.message();
- final String16FW clusterId = describeClusterResponse.clusterId();
+ final String message = describeClusterResponse.message().asString();
+ final String clusterId = describeClusterResponse.clusterId().asString();
final int controllerId = describeClusterResponse.controllerId();
final int brokerCount = describeClusterResponse.brokerCount();
@@ -517,7 +519,9 @@ private int decodeDescribeClusterResponse(
final DescribeClusterResponsePart2FW responsePart2 =
describeClusterResponsePart2RO.wrap(buffer, progress, limit);
- final int authorizedOperations = responsePart2.authorizedOperations();
+ final int authorizedOperations = responsePart2.clusterAuthorizedOperations();
+
+ progress = responsePart2.limit();
client.onDecodeDescribeClusterResponse(traceId, authorization, throttle,
error, message, clusterId, controllerId, brokers, authorizedOperations);
@@ -720,8 +724,8 @@ private void doApplicationBegin(
long authorization,
int throttle,
short error,
- String16FW message,
- String16FW clusterId,
+ String message,
+ String clusterId,
int controllerId,
List brokers,
int authorizedOperations)
@@ -1239,11 +1243,19 @@ private void doEncodeDescribeClusterRequest(
final DescribeClusterRequestFW describeClusterRequest =
describeClusterRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
- .includeAuthorizedOperations(authorizedOperations)
+ .taggedFields(0)
.build();
encodeProgress = describeClusterRequest.limit();
+ DescribeClusterRequestPart2FW describeClusterRequestPart2 =
+ describeClusterRequestPart2RW.wrap(encodeBuffer, encodeProgress, encodeLimit)
+ .includeAuthorizedOperations(authorizedOperations)
+ .taggedFields(0)
+ .build();
+
+ encodeProgress = describeClusterRequestPart2.limit();
+
final int requestId = nextRequestId++;
final int requestSize = encodeProgress - encodeOffset - RequestHeaderFW.FIELD_OFFSET_API_KEY;
@@ -1470,8 +1482,8 @@ private void onDecodeDescribeClusterResponse(
long authorization,
int throttle,
short error,
- String16FW message,
- String16FW clusterId,
+ String message,
+ String clusterId,
int controllerId,
List brokers,
int authorizedOperations)
diff --git a/runtime/binding-kafka/src/main/zilla/protocol.idl b/runtime/binding-kafka/src/main/zilla/protocol.idl
index ff606c0107..a3a8fce48f 100644
--- a/runtime/binding-kafka/src/main/zilla/protocol.idl
+++ b/runtime/binding-kafka/src/main/zilla/protocol.idl
@@ -584,32 +584,41 @@ scope protocol
scope describe_cluster
{
struct DescribeClusterRequest // v0
+ {
+ varuint32 taggedFields;
+ }
+
+ struct DescribeClusterRequestPart2
{
uint8 includeAuthorizedOperations;
+ varuint32 taggedFields;
}
struct DescribeClusterResponse
{
int32 correlationId;
+ varuint32 taggedFields;
int32 throttle;
int16 error;
- string16 message;
- string16 clusterId = null;
+ varstring message;
+ varstring clusterId;
int32 controllerId;
- int32 brokerCount;
+ varuint32n brokerCount;
}
struct ClusterBroker
{
int32 brokerId;
- string16 host;
+ varstring host;
int32 port;
- string16 rack = null;
+ varstring rack;
+ varuint32 taggedFields;
}
struct DescribeClusterResponsePart2
{
- int32 authorizedOperations;
+ int32 clusterAuthorizedOperations;
+ varuint32 taggedFields;
}
}
diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt
index 37fe52f5a9..57bdaff151 100644
--- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt
+++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/client.rpt
@@ -28,27 +28,33 @@ connect "zilla://streams/net0"
connected
-write 16 # size
+write 18 # size
60s # describe cluster
0s # v0
${newRequestId}
5s "zilla" # client id
- [0x00] # resources
+ [0x00] # tagged fields
+ [0x00] # include cluster authorized ops
+ [0x00] # tagged fields
-read 97 # size
+read 92 # size
(int:newRequestId)
+ [0x00] # tagged fields
0 # throttle time ms
0s # error code
- -1s # error message
- 9s "cluster-0" # cluster id
+ [0x00] # error message
+ [0x0a] "cluster-0" # cluster id
0 # controller id
- 2 # brokers
+ [0x03] # brokers
1 # broker id
- 19s "broker1.example.com" # host
+ [0x14] "broker1.example.com" # host
9092 # port
- -1s # rack
+ [0x00] # rack
+ [0x00] # tagged fields
2 # broker id
- 19s "broker2.example.com" # host
+ [0x14] "broker2.example.com" # host
9092 # port
- -1s # rack
+ [0x00] # rack
+ [0x00] # tagged fields
0 # cluster authorized operations
+ [0x00] # tagged fields
diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt
index 473762e7dc..f1ce509e6b 100644
--- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt
+++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/describe.cluster.v0/cluster.brokers.info/server.rpt
@@ -25,28 +25,33 @@ accepted
connected
-read 16 # size
+read 18 # size
60s # describe cluster
0s # v0
(int:newRequestId)
5s "zilla" # client id
- [0x00] # resources
+ [0x00] # tagged fields
+ [0x00] # include cluster authorized ops
+ [0x00] # tagged fields
-
-write 97 # size
+write 92 # size
${newRequestId}
+ [0x00] # tagged fields
0 # throttle time ms
0s # error code
- -1s # error message
- 9s "cluster-0" # cluster id
+ [0x00] # error message
+ [0x0a] "cluster-0" # cluster id
0 # controller id
- 2 # brokers
+ [0x03] # brokers
1 # broker id
- 19s "broker1.example.com" # host
+ [0x14] "broker1.example.com" # host
9092 # port
- -1s # rack
+ [0x00] # rack
+ [0x00] # tagged fields
2 # broker id
- 19s "broker2.example.com" # host
+ [0x14] "broker2.example.com" # host
9092 # port
- -1s # rack
+ [0x00] # rack
+ [0x00] # tagged fields
0 # cluster authorized operations
+ [0x00] # tagged fields