From a40fe35600ec12ccd5cd2b0ff664115645a6aa97 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 28 Jan 2025 12:44:21 +0530 Subject: [PATCH 1/6] Support gRPC client stream/unary oneway --- .../GrpcKafkaWithProduceConfigAdapter.java | 6 +- .../config/produce.proxy.rpc.oneway.yaml | 43 ++++++++++ .../kafka/schema/grpc.kafka.schema.patch.json | 6 +- .../client.stream.rpc.oneway/client.rpt | 49 +++++++++++ .../client.stream.rpc.oneway/server.rpt | 48 +++++++++++ .../grpc/produce/unary.rpc.oneway/client.rpt | 44 ++++++++++ .../grpc/produce/unary.rpc.oneway/server.rpt | 44 ++++++++++ .../client.stream.rpc.oneway/client.rpt | 82 +++++++++++++++++++ .../client.stream.rpc.oneway/server.rpt | 81 ++++++++++++++++++ .../kafka/produce/unary.rpc.oneway/client.rpt | 65 +++++++++++++++ .../kafka/produce/unary.rpc.oneway/server.rpt | 66 +++++++++++++++ .../grpc/kafka/streams/GrpcProduceIT.java | 18 ++++ .../grpc/kafka/streams/KafkaProduceIT.java | 18 ++++ .../binding/grpc/config/protobuf/oneway.proto | 35 ++++++++ 14 files changed, 599 insertions(+), 6 deletions(-) create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/config/produce.proxy.rpc.oneway.yaml create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/client.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/server.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/client.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/server.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt create mode 100644 specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt create mode 100644 specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/config/protobuf/oneway.proto diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceConfigAdapter.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceConfigAdapter.java index 299bbefde0..8ff40865d3 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceConfigAdapter.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceConfigAdapter.java @@ -101,7 +101,11 @@ public GrpcKafkaWithConfig adaptFromJson( } } - String newReplyTo = object.getString(REPLY_TO_NAME); + String newReplyTo = null; + if (object.containsKey(REPLY_TO_NAME)) + { + newReplyTo = object.getString(REPLY_TO_NAME); + } return new GrpcKafkaWithConfig( new GrpcKafkaWithProduceConfig(newTopic, newProduceAcks, newProduceKey, newOverrides, newReplyTo)); diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/config/produce.proxy.rpc.oneway.yaml b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/config/produce.proxy.rpc.oneway.yaml new file mode 100644 index 0000000000..70313cccf5 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/config/produce.proxy.rpc.oneway.yaml @@ -0,0 +1,43 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +guards: + test0: + type: test + options: + roles: + - service:oneway +bindings: + grpc0: + type: grpc-kafka + kind: proxy + routes: + - guarded: + test0: + - service:oneway + exit: kafka0 + when: + - method: example.OnewayService/* + metadata: + custom: test + with: + capability: produce + topic: requests + acks: leader_only + key: test + overrides: + zilla:identity: "${guarded['test0'].identity}" diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/schema/grpc.kafka.schema.patch.json b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/schema/grpc.kafka.schema.patch.json index e0b38727a8..4a9a249cbb 100644 --- a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/schema/grpc.kafka.schema.patch.json +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/schema/grpc.kafka.schema.patch.json @@ -262,11 +262,7 @@ "type": "string" } }, - "additionalProperties": false, - "required": - [ - "reply-to" - ] + "additionalProperties": false } ], "required": diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/client.rpt new file mode 100644 index 0000000000..6868dd6436 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/client.rpt @@ -0,0 +1,49 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:update "proactive" + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.OnewayService") + .method("OnewayClientStream") + .build()} +connected + +write ${grpc:protobuf() + .string(1, "Hello World1") + .build()} +write flush + +write ${grpc:protobuf() + .string(1, "Hello World2") + .build()} +write flush + +write close + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .build()} + +read ${grpc:protobuf() + .build()} + +read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/server.rpt new file mode 100644 index 0000000000..2b5198c834 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/client.stream.rpc.oneway/server.rpt @@ -0,0 +1,48 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.OnewayService") + .method("OnewayClientStream") + .build()} +connected + +read ${grpc:protobuf() + .string(1, "Hello World1") + .build()} + +read ${grpc:protobuf() + .string(1, "Hello World2") + .build()} + +read closed + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .build()} + +write ${grpc:protobuf() + .build()} +write flush + +write close diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/client.rpt new file mode 100644 index 0000000000..aa8dfc599a --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/client.rpt @@ -0,0 +1,44 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:update "proactive" + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.OnewayService") + .method("OnewayUnary") + .build()} +connected + +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write close + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .build()} + +read ${grpc:protobuf() + .build()} + +read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/server.rpt new file mode 100644 index 0000000000..fc016b9882 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/grpc/produce/unary.rpc.oneway/server.rpt @@ -0,0 +1,44 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/grpc0" + option zilla:window 8192 + option zilla:transmission "half-duplex" +accepted + +read zilla:begin.ext ${grpc:matchBeginEx() + .typeId(zilla:id("grpc")) + .scheme("http") + .authority("localhost:8080") + .service("example.OnewayService") + .method("OnewayUnary") + .build()} +connected + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read closed + +write zilla:begin.ext ${grpc:beginEx() + .typeId(zilla:id("grpc")) + .build()} + +write ${grpc:protobuf() + .build()} +write flush + +write close diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt new file mode 100644 index 0000000000..6630580ef6 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt @@ -0,0 +1,82 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} +write ${grpc:protobuf() + .string(1, "Hello World1") + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} +write ${grpc:protobuf() + .string(1, "Hello World2") + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} + +write flush + +write close +read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt new file mode 100644 index 0000000000..a81371e8ca --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt @@ -0,0 +1,81 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} +read ${grpc:protobuf() + .string(1, "Hello World1") + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} +read ${grpc:protobuf() + .string(1, "Hello World2") + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayClientStream") + .build() + .build()} +read zilla:data.null + +read closed +write close diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt new file mode 100644 index 0000000000..33c58554c5 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt @@ -0,0 +1,65 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayUnary") + .build() + .build()} +write ${grpc:protobuf() + .string(1, "Hello World") + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayUnary") + .build() + .build()} + +write flush + +write close +read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt new file mode 100644 index 0000000000..31113a2df5 --- /dev/null +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt @@ -0,0 +1,66 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/kafka0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .merged() + .capabilities("PRODUCE_ONLY") + .topic("requests") + .partition(-1, -2) + .ackMode("LEADER_ONLY") + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayUnary") + .build() + .build()} + +read ${grpc:protobuf() + .string(1, "Hello World") + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .merged() + .produce() + .deferred(0) + .partition(-1, -1) + .key("test") + .header("zilla:identity", "test") + .header("zilla:service", "example.OnewayService") + .header("zilla:method", "OnewayUnary") + .build() + .build()} +read zilla:data.null + +read closed +write close diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java index d203be7a29..0fbd88ca0a 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java @@ -63,6 +63,15 @@ public void shouldExchangeMessageInClientStream() throws Exception k3po.finish(); } + @Test + @Specification({ + "${grpc}/client.stream.rpc.oneway/client", + "${grpc}/client.stream.rpc.oneway/server"}) + public void shouldExchangeMessageInClientStreamOneway() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${grpc}/client.stream.rpc.write.abort/client", @@ -99,6 +108,15 @@ public void shouldExchangeMessageInUnary() throws Exception k3po.finish(); } + @Test + @Specification({ + "${grpc}/unary.rpc.oneway/client", + "${grpc}/unary.rpc.oneway/server"}) + public void shouldExchangeMessageInUnaryOneway() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${grpc}/unary.rpc.error/client", diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java index c75f197fb9..27df230df5 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java @@ -63,6 +63,15 @@ public void shouldExchangeMessageInClientStream() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/client.stream.rpc.oneway/client", + "${kafka}/client.stream.rpc.oneway/server"}) + public void shouldExchangeMessageInClientStreamOneway() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/client.stream.rpc.write.abort/client", @@ -99,6 +108,15 @@ public void shouldExchangeMessageInUnary() throws Exception k3po.finish(); } + @Test + @Specification({ + "${kafka}/unary.rpc.oneway/client", + "${kafka}/unary.rpc.oneway/server"}) + public void shouldExchangeMessageInUnaryOneway() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${kafka}/unary.rpc.rejected/client", diff --git a/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/config/protobuf/oneway.proto b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/config/protobuf/oneway.proto new file mode 100644 index 0000000000..a6b706f035 --- /dev/null +++ b/specs/binding-grpc.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/config/protobuf/oneway.proto @@ -0,0 +1,35 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +syntax = "proto3"; + +package example; + +import "google/protobuf/empty.proto"; + +option java_multiple_files = true; +option java_outer_classname = "OnewayProto"; + +service OnewayService +{ + rpc OnewayUnary(OnewayMessage) returns (google.protobuf.Empty); + + rpc OnewayClientStream(stream OnewayMessage) returns (google.protobuf.Empty); +} + +message OnewayMessage +{ + string message = 1; +} + From 2582f6ec218b1c7507ff56aa918aede70264286b Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Wed, 29 Jan 2025 16:57:43 +0530 Subject: [PATCH 2/6] impl update --- runtime/binding-grpc-kafka/pom.xml | 2 +- .../config/GrpcKafkaWithProduceResult.java | 15 +- .../stream/GrpcKafkaProxyFactory.java | 462 +++++++++++++++++- .../stream/GrpcKafkaProduceProxyIT.java | 20 + .../grpc/kafka/streams/GrpcProduceIT.java | 4 +- .../grpc/kafka/streams/KafkaProduceIT.java | 4 +- 6 files changed, 490 insertions(+), 17 deletions(-) diff --git a/runtime/binding-grpc-kafka/pom.xml b/runtime/binding-grpc-kafka/pom.xml index cebcf678e4..8c7a204d2a 100644 --- a/runtime/binding-grpc-kafka/pom.xml +++ b/runtime/binding-grpc-kafka/pom.xml @@ -24,7 +24,7 @@ - 0.88 + 0.87 0 diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java index d66e764ffb..f19459e78e 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java @@ -94,7 +94,11 @@ public class GrpcKafkaWithProduceResult hash.updateHash(correlation.method.value()); hash.updateHash(method.value()); hash.updateHash(correlation.replyTo.value()); - hash.updateHash(replyTo.value()); + + if (replyTo != null && replyTo.value() != null) + { + hash.updateHash(replyTo.value()); + } if (overrides != null) { @@ -155,8 +159,13 @@ public void headers( builder.item(this::service); builder.item(this::method); - builder.item(this::replyTo); - builder.item(this::correlationId); + + if (replyTo != null && replyTo.value() != null) + { + builder.item(this::replyTo); + builder.item(this::correlationId); + } + metadata.forEach(m -> builder.item(i -> metadata(i, m))); } diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index 4b26a8642c..e6d3e90b3e 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -242,13 +242,26 @@ public MessageConsumer newStream( final GrpcKafkaWithProduceResult result = route.with.resolveProduce(authorization, grpcBeginEx); - newStream = new GrpcProduceProxy( - grpc, - originId, - routedId, - initialId, - resolvedId, - result)::onGrpcMessage; + if (result.replyTo() != null && result.replyTo().value() != null) + { + newStream = new GrpcProduceProxy( + grpc, + originId, + routedId, + initialId, + resolvedId, + result)::onGrpcMessage; + } + else + { + newStream = new GrpcProduceNoReplyProxy( + grpc, + originId, + routedId, + initialId, + resolvedId, + result)::onGrpcMessage; + } break; } @@ -1636,6 +1649,437 @@ private void doGrpcReset( } } + private final class GrpcProduceNoReplyProxy extends GrpcProxy + { + private final KafkaProduceProxy delegate; + private final long resolvedId; + + private GrpcProduceNoReplyProxy( + MessageConsumer grpc, + long originId, + long routedId, + long initialId, + long resolvedId, + GrpcKafkaWithProduceResult result) + { + super(grpc, originId, routedId, initialId); + this.resolvedId = resolvedId; + this.delegate = new KafkaProduceProxy(routedId, resolvedId, this, result); + } + + private void onGrpcMessage( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onGrpcBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onGrpcData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onGrpcEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onGrpcAbort(abort); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onGrpcReset(reset); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onGrpcWindow(window); + break; + } + } + + private void onGrpcBegin( + BeginFW begin) + { + final long sequence = begin.sequence(); + final long acknowledge = begin.acknowledge(); + final long traceId = begin.traceId(); + final long authorization = begin.authorization(); + final long affinity = begin.affinity(); + + assert acknowledge <= sequence; + assert sequence >= initialSeq; + assert acknowledge >= initialAck; + + initialSeq = sequence; + initialAck = acknowledge; + state = GrpcKafkaState.openingInitial(state); + + assert initialAck <= initialSeq; + + delegate.doKafkaBegin(traceId, authorization, affinity); + } + + private void onGrpcData( + DataFW data) + { + final long sequence = data.sequence(); + final long acknowledge = data.acknowledge(); + final long traceId = data.traceId(); + final long authorization = data.authorization(); + final long budgetId = data.budgetId(); + final int reserved = data.reserved(); + final int flags = data.flags(); + final OctetsFW payload = data.payload(); + final OctetsFW extension = data.extension(); + + assert acknowledge <= sequence; + assert sequence >= initialSeq; + + initialSeq = sequence + reserved; + + assert initialAck <= initialSeq; + + Flyweight kafkaDataEx = emptyRO; + if ((flags & DATA_FLAG_INIT) != 0x00) + { + GrpcDataExFW dataEx = null; + if (extension.sizeof() > 0) + { + dataEx = extension.get(grpcDataExRO::tryWrap); + } + + final int deferred = dataEx != null ? dataEx.deferred() : 0; + kafkaDataEx = kafkaDataExRW + .wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId) + .merged(m -> m.produce(mp -> mp + .deferred(deferred) + .timestamp(now().toEpochMilli()) + .partition(p -> p.partitionId(-1).partitionOffset(-1)) + .key(delegate.result::key) + .headers(delegate.result::headers))) + .build(); + } + + delegate.doKafkaData(traceId, authorization, budgetId, reserved, flags, payload, kafkaDataEx); + } + + private void onGrpcEnd( + EndFW end) + { + final long sequence = end.sequence(); + final long acknowledge = end.acknowledge(); + final long traceId = end.traceId(); + final long authorization = end.authorization(); + + assert acknowledge <= sequence; + assert sequence >= initialSeq; + + initialSeq = sequence; + state = GrpcKafkaState.closeInitial(state); + + assert initialAck <= initialSeq; + + delegate.doKafkaEnd(traceId, authorization); + + if (!GrpcKafkaState.replyOpening(state)) + { + doGrpcBegin(traceId, authorization); + } + + if (GrpcKafkaState.replyOpening(state)) + { + doGrpcData(traceId, authorization); + } + + doGrpcEnd(traceId, authorization); + } + + private void onGrpcAbort( + AbortFW abort) + { + final long sequence = abort.sequence(); + final long acknowledge = abort.acknowledge(); + final long traceId = abort.traceId(); + final long authorization = abort.authorization(); + + assert acknowledge <= sequence; + assert sequence >= initialSeq; + + initialSeq = sequence; + state = GrpcKafkaState.closeInitial(state); + + assert initialAck <= initialSeq; + + delegate.doKafkaAbort(traceId, authorization); + } + + private void onGrpcReset( + ResetFW reset) + { + final long sequence = reset.sequence(); + final long acknowledge = reset.acknowledge(); + final int maximum = reset.maximum(); + final long traceId = reset.traceId(); + final long authorization = reset.authorization(); + + assert acknowledge <= sequence; + assert sequence <= replySeq; + assert acknowledge >= replyAck; + assert maximum >= replyMax; + + replyAck = acknowledge; + replyMax = maximum; + state = GrpcKafkaState.closeReply(state); + + assert replyAck <= replySeq; + + delegate.doKafkaReset(traceId, authorization); + } + + private void onGrpcWindow( + WindowFW window) + { + final long sequence = window.sequence(); + final long acknowledge = window.acknowledge(); + final int maximum = window.maximum(); + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int padding = window.padding(); + final int capabilities = window.capabilities(); + + assert acknowledge <= sequence; + assert sequence <= replySeq; + assert acknowledge >= replyAck; + assert maximum >= replyMax; + + replyAck = acknowledge; + replyMax = maximum; + replyBud = budgetId; + replyPad = padding; + replyCap = capabilities; + state = GrpcKafkaState.openReply(state); + + assert replyAck <= replySeq; + + delegate.doKafkaWindow(traceId); + } + + @Override + protected void onKafkaReset( + long traceId, + long authorization) + { + cleanup(traceId, authorization); + } + + @Override + protected void onKafkaAbort( + long traceId, + long authorization) + { + if (!GrpcKafkaState.replyOpening(state)) + { + doGrpcBegin(traceId, authorization); + } + + cleanup(traceId, authorization); + } + + @Override + protected void onKafkaBegin( + long traceId, + long authorization, + OctetsFW extension) + { + if (!GrpcKafkaState.replyOpening(state)) + { + doGrpcBegin(traceId, authorization); + } + } + + @Override + protected void onKafkaData( + long traceId, + long authorization, + long budgetId, + int reserved, + int flags, + OctetsFW payload, + KafkaDataExFW kafkaDataEx) + { + if (!GrpcKafkaState.replyOpening(state)) + { + doGrpcBegin(traceId, authorization); + } + + if (GrpcKafkaState.replyClosing(state)) + { + replySeq += reserved; + } + else + { + if (payload == null) + { + KafkaHeaderFW grpcStatus = kafkaDataEx.merged().fetch().headers() + .matchFirst(h -> HEADER_NAME_ZILLA_GRPC_STATUS.value().equals(h.name().value())); + + if (grpcStatus != null && + !HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value())) + { + OctetsFW value = grpcStatus.value(); + String16FW status = statusRW + .set(value.buffer(), value.offset(), value.sizeof()) + .build(); + doGrpcAbort(traceId, authorization, status); + } + else + { + doGrpcEnd(traceId, traceId); + } + } + else if (GrpcKafkaState.replyOpening(state)) + { + doGrpcData(traceId, authorization); + } + } + } + + @Override + protected void onKafkaEnd( + long traceId, + long authorization) + { + if (GrpcKafkaState.initialClosed(state)) + { + delegate.doKafkaEnd(traceId, authorization); + } + } + + @Override + protected void onKafkaWindow( + long authorization, + long traceId, + long budgetId, + int padding, + int capabilities) + { + doGrpcWindow(authorization, traceId, budgetId, padding, capabilities); + } + + private void cleanup( + long traceId, + long authorization) + { + doGrpcReset(traceId, authorization); + doGrpcAbort(traceId, authorization, HEADER_VALUE_GRPC_INTERNAL_ERROR); + + delegate.doKafkaAbort(traceId, authorization); + delegate.doKafkaReset(traceId, authorization); + } + + private void doGrpcBegin( + long traceId, + long authorization) + { + state = GrpcKafkaState.openingReply(state); + + GrpcBeginExFW beginEx = grpcBeginExRW + .wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .build(); + + doBegin(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, 0, beginEx); + } + + private void doGrpcData( + long traceId, + long authorization) + { + GrpcDataExFW dataEx = grpcDataExRW + .wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .build(); + + doData(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, 0L, DATA_FLAG_COMPLETE, 0, null, dataEx); + + assert replySeq <= replyAck + replyMax; + } + + private void doGrpcAbort( + long traceId, + long authorization, + String16FW status) + { + if (GrpcKafkaState.replyOpening(state) && + !GrpcKafkaState.replyClosed(state)) + { + final GrpcAbortExFW grpcAbortEx = + grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .status(status) + .build(); + + doAbort(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization, grpcAbortEx); + } + state = GrpcKafkaState.closeReply(state); + } + + private void doGrpcEnd( + long traceId, + long authorization) + { + if (!GrpcKafkaState.replyClosed(state)) + { + state = GrpcKafkaState.closeReply(state); + + doEnd(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax, + traceId, authorization); + } + } + + private void doGrpcWindow( + long authorization, + long traceId, + long budgetId, + int padding, + int capabilities) + { + initialAck = delegate.initialAck; + initialMax = delegate.initialMax; + + doWindow(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, budgetId, padding, capabilities); + } + + private void doGrpcReset( + long traceId, + long authorization) + { + if (!GrpcKafkaState.initialClosed(state)) + { + state = GrpcKafkaState.closeInitial(state); + + final GrpcResetExFW grpcResetEx = + grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(grpcTypeId) + .status(HEADER_VALUE_GRPC_INTERNAL_ERROR) + .build(); + + doReset(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, grpcResetEx); + } + } + } + private final class KafkaProduceProxy { private MessageConsumer kafka; @@ -1644,7 +2088,7 @@ private final class KafkaProduceProxy private final long initialId; private final long replyId; private final GrpcKafkaWithProduceResult result; - private final GrpcProduceProxy delegate; + private final GrpcProxy delegate; private int state; @@ -1663,7 +2107,7 @@ private final class KafkaProduceProxy private KafkaProduceProxy( long originId, long routedId, - GrpcProduceProxy delegate, + GrpcProxy delegate, GrpcKafkaWithProduceResult result) { this.originId = originId; diff --git a/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java b/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java index 5a0913164f..283dfc4dfa 100644 --- a/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java +++ b/runtime/binding-grpc-kafka/src/test/java/io/aklivity/zilla/runtime/blinding/grpc/kafka/internal/stream/GrpcKafkaProduceProxyIT.java @@ -58,6 +58,16 @@ public void shouldExchangeMessageWithUnaryRpc() throws Exception k3po.finish(); } + @Test + @Configuration("produce.proxy.rpc.oneway.yaml") + @Specification({ + "${grpc}/unary.rpc.oneway/client", + "${kafka}/unary.rpc.oneway/server"}) + public void shouldSendMessageWithUnaryRpcOneway() throws Exception + { + k3po.finish(); + } + @Test @Configuration("produce.proxy.rpc.yaml") @Specification({ @@ -108,6 +118,16 @@ public void shouldExchangeMessageWithClientStreamRpc() throws Exception k3po.finish(); } + @Test + @Configuration("produce.proxy.rpc.oneway.yaml") + @Specification({ + "${grpc}/client.stream.rpc.oneway/client", + "${kafka}/client.stream.rpc.oneway/server"}) + public void shouldSendMessageWithClientStreamRpcOneway() throws Exception + { + k3po.finish(); + } + @Test @Configuration("produce.proxy.rpc.yaml") @Specification({ diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java index 0fbd88ca0a..1133e33728 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/GrpcProduceIT.java @@ -67,7 +67,7 @@ public void shouldExchangeMessageInClientStream() throws Exception @Specification({ "${grpc}/client.stream.rpc.oneway/client", "${grpc}/client.stream.rpc.oneway/server"}) - public void shouldExchangeMessageInClientStreamOneway() throws Exception + public void shouldSendMessageInClientStreamOneway() throws Exception { k3po.finish(); } @@ -112,7 +112,7 @@ public void shouldExchangeMessageInUnary() throws Exception @Specification({ "${grpc}/unary.rpc.oneway/client", "${grpc}/unary.rpc.oneway/server"}) - public void shouldExchangeMessageInUnaryOneway() throws Exception + public void shouldSendMessageInUnaryOneway() throws Exception { k3po.finish(); } diff --git a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java index 27df230df5..5481a5200c 100644 --- a/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java +++ b/specs/binding-grpc-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/grpc/kafka/streams/KafkaProduceIT.java @@ -67,7 +67,7 @@ public void shouldExchangeMessageInClientStream() throws Exception @Specification({ "${kafka}/client.stream.rpc.oneway/client", "${kafka}/client.stream.rpc.oneway/server"}) - public void shouldExchangeMessageInClientStreamOneway() throws Exception + public void shouldSendMessageInClientStreamOneway() throws Exception { k3po.finish(); } @@ -112,7 +112,7 @@ public void shouldExchangeMessageInUnary() throws Exception @Specification({ "${kafka}/unary.rpc.oneway/client", "${kafka}/unary.rpc.oneway/server"}) - public void shouldExchangeMessageInUnaryOneway() throws Exception + public void shouldSendMessageInUnaryOneway() throws Exception { k3po.finish(); } From 5c22eb95c38d7d9e82192d5da6181a3ecc8091ed Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 31 Jan 2025 09:46:51 +0530 Subject: [PATCH 3/6] tombstone event update --- .../stream/GrpcKafkaProxyFactory.java | 57 +++++-------------- 1 file changed, 13 insertions(+), 44 deletions(-) diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index e6d3e90b3e..524c08b748 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -91,7 +91,7 @@ public final class GrpcKafkaProxyFactory implements GrpcKafkaStreamFactory private final byte[] headerSuffix = new byte[BIN_SUFFIX_LENGTH]; private final Varuint32FW.Builder lenRW = - new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8);; + new Varuint32FW.Builder().wrap(new UnsafeBuffer(new byte[1024 * 8]), 0, 1024 * 8); private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); @@ -1287,7 +1287,7 @@ private void onGrpcEnd( assert initialAck <= initialSeq; - producer.doKafkaEnd(traceId, authorization); + producer.doKafkaEnd(traceId, authorization, true); } private void onGrpcAbort( @@ -1470,7 +1470,7 @@ protected void onKafkaEnd( { if (GrpcKafkaState.initialClosed(state)) { - producer.doKafkaEnd(traceId, authorization); + producer.doKafkaEnd(traceId, authorization, true); correlater.doKafkaEnd(traceId, authorization); } } @@ -1785,7 +1785,7 @@ private void onGrpcEnd( assert initialAck <= initialSeq; - delegate.doKafkaEnd(traceId, authorization); + delegate.doKafkaEnd(traceId, authorization, false); if (!GrpcKafkaState.replyOpening(state)) { @@ -1913,41 +1913,6 @@ protected void onKafkaData( OctetsFW payload, KafkaDataExFW kafkaDataEx) { - if (!GrpcKafkaState.replyOpening(state)) - { - doGrpcBegin(traceId, authorization); - } - - if (GrpcKafkaState.replyClosing(state)) - { - replySeq += reserved; - } - else - { - if (payload == null) - { - KafkaHeaderFW grpcStatus = kafkaDataEx.merged().fetch().headers() - .matchFirst(h -> HEADER_NAME_ZILLA_GRPC_STATUS.value().equals(h.name().value())); - - if (grpcStatus != null && - !HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value())) - { - OctetsFW value = grpcStatus.value(); - String16FW status = statusRW - .set(value.buffer(), value.offset(), value.sizeof()) - .build(); - doGrpcAbort(traceId, authorization, status); - } - else - { - doGrpcEnd(traceId, traceId); - } - } - else if (GrpcKafkaState.replyOpening(state)) - { - doGrpcData(traceId, authorization); - } - } } @Override @@ -1957,7 +1922,7 @@ protected void onKafkaEnd( { if (GrpcKafkaState.initialClosed(state)) { - delegate.doKafkaEnd(traceId, authorization); + delegate.doKafkaEnd(traceId, authorization, false); } } @@ -2008,7 +1973,7 @@ private void doGrpcData( .build(); doData(grpc, originId, routedId, replyId, replySeq, replyAck, replyMax, - traceId, authorization, 0L, DATA_FLAG_COMPLETE, 0, null, dataEx); + traceId, authorization, 0L, DATA_FLAG_COMPLETE, 0, emptyRO, dataEx); assert replySeq <= replyAck + replyMax; } @@ -2151,7 +2116,8 @@ private void doKafkaData( private void doKafkaEnd( long traceId, - long authorization) + long authorization, + boolean tombstone) { if (!GrpcKafkaState.initialClosed(state)) { @@ -2160,7 +2126,10 @@ private void doKafkaEnd( initialMax = delegate.initialMax; state = GrpcKafkaState.closeInitial(state); - doKafkaDataNull(traceId, authorization); + if (tombstone) + { + doKafkaDataNull(traceId, authorization); + } doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, authorization); @@ -2306,7 +2275,7 @@ private void doKafkaEndAck(long authorization, long traceId) { if (GrpcKafkaState.initialClosing(state) && initialSeq == initialAck) { - doKafkaEnd(traceId, authorization); + doKafkaEnd(traceId, authorization, true); } } From 3d7ba712e91e3068ff8b337c6f955ecfae07511f Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 31 Jan 2025 20:09:23 +0530 Subject: [PATCH 4/6] addressing review comments --- .../config/GrpcKafkaWithProduceResult.java | 25 +++++++++++-------- .../stream/GrpcKafkaProxyFactory.java | 2 +- .../client.stream.rpc.oneway/client.rpt | 19 -------------- .../client.stream.rpc.oneway/server.rpt | 18 ------------- .../kafka/produce/unary.rpc.oneway/client.rpt | 17 ------------- .../kafka/produce/unary.rpc.oneway/server.rpt | 16 ------------ 6 files changed, 15 insertions(+), 82 deletions(-) diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java index f19459e78e..20cea5dfde 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaWithProduceResult.java @@ -89,14 +89,13 @@ public class GrpcKafkaWithProduceResult this.nameBuffer = new ExpandableDirectByteBuffer(); this.nameBuffer.putStringWithoutLengthAscii(0, META_PREFIX); - hash.updateHash(correlation.service.value()); - hash.updateHash(service.value()); - hash.updateHash(correlation.method.value()); - hash.updateHash(method.value()); - hash.updateHash(correlation.replyTo.value()); - - if (replyTo != null && replyTo.value() != null) + if (hasReplyTo()) { + hash.updateHash(correlation.service.value()); + hash.updateHash(service.value()); + hash.updateHash(correlation.method.value()); + hash.updateHash(method.value()); + hash.updateHash(correlation.replyTo.value()); hash.updateHash(replyTo.value()); } @@ -157,11 +156,10 @@ public void headers( overrides.forEach(o -> builder.item(o::header)); } - builder.item(this::service); - builder.item(this::method); - - if (replyTo != null && replyTo.value() != null) + if (hasReplyTo()) { + builder.item(this::service); + builder.item(this::method); builder.item(this::replyTo); builder.item(this::correlationId); } @@ -250,4 +248,9 @@ public void filters( .valueLen(hashCorrelationId.sizeof()) .value(hashCorrelationId.value(), 0, hashCorrelationId.sizeof())))); } + + public boolean hasReplyTo() + { + return replyTo != null && replyTo.value() != null; + } } diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index 524c08b748..199b0baf45 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -242,7 +242,7 @@ public MessageConsumer newStream( final GrpcKafkaWithProduceResult result = route.with.resolveProduce(authorization, grpcBeginEx); - if (result.replyTo() != null && result.replyTo().value() != null) + if (result.hasReplyTo()) { newStream = new GrpcProduceProxy( grpc, diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt index 6630580ef6..23070b5a1b 100644 --- a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/client.rpt @@ -37,8 +37,6 @@ write zilla:data.ext ${kafka:dataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") .build() .build()} write ${grpc:protobuf() @@ -54,8 +52,6 @@ write zilla:data.ext ${kafka:dataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") .build() .build()} write ${grpc:protobuf() @@ -63,20 +59,5 @@ write ${grpc:protobuf() .build()} write flush -write zilla:data.ext ${kafka:dataEx() - .typeId(zilla:id("kafka")) - .merged() - .produce() - .deferred(0) - .partition(-1, -1) - .key("test") - .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") - .build() - .build()} - -write flush - write close read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt index a81371e8ca..b3e177b4a0 100644 --- a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/client.stream.rpc.oneway/server.rpt @@ -39,8 +39,6 @@ read zilla:data.ext ${kafka:matchDataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") .build() .build()} read ${grpc:protobuf() @@ -55,27 +53,11 @@ read zilla:data.ext ${kafka:matchDataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") .build() .build()} read ${grpc:protobuf() .string(1, "Hello World2") .build()} -read zilla:data.ext ${kafka:matchDataEx() - .typeId(zilla:id("kafka")) - .merged() - .produce() - .deferred(0) - .partition(-1, -1) - .key("test") - .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayClientStream") - .build() - .build()} -read zilla:data.null - read closed write close diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt index 33c58554c5..1ac7f53e9b 100644 --- a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/client.rpt @@ -37,8 +37,6 @@ write zilla:data.ext ${kafka:dataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayUnary") .build() .build()} write ${grpc:protobuf() @@ -46,20 +44,5 @@ write ${grpc:protobuf() .build()} write flush -write zilla:data.ext ${kafka:dataEx() - .typeId(zilla:id("kafka")) - .merged() - .produce() - .deferred(0) - .partition(-1, -1) - .key("test") - .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayUnary") - .build() - .build()} - -write flush - write close read closed diff --git a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt index 31113a2df5..3798c0bff1 100644 --- a/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt +++ b/specs/binding-grpc-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/grpc/kafka/streams/kafka/produce/unary.rpc.oneway/server.rpt @@ -39,8 +39,6 @@ read zilla:data.ext ${kafka:matchDataEx() .partition(-1, -1) .key("test") .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayUnary") .build() .build()} @@ -48,19 +46,5 @@ read ${grpc:protobuf() .string(1, "Hello World") .build()} -read zilla:data.ext ${kafka:matchDataEx() - .typeId(zilla:id("kafka")) - .merged() - .produce() - .deferred(0) - .partition(-1, -1) - .key("test") - .header("zilla:identity", "test") - .header("zilla:service", "example.OnewayService") - .header("zilla:method", "OnewayUnary") - .build() - .build()} -read zilla:data.null - read closed write close From b4eab8bd41a5dbca3bad29f4af8582c5b196561c Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 4 Feb 2025 15:13:35 +0530 Subject: [PATCH 5/6] addressing review comments --- runtime/binding-grpc-kafka/pom.xml | 2 +- .../stream/GrpcKafkaProxyFactory.java | 131 ++++++++++-------- 2 files changed, 74 insertions(+), 59 deletions(-) diff --git a/runtime/binding-grpc-kafka/pom.xml b/runtime/binding-grpc-kafka/pom.xml index 8c7a204d2a..cebcf678e4 100644 --- a/runtime/binding-grpc-kafka/pom.xml +++ b/runtime/binding-grpc-kafka/pom.xml @@ -24,7 +24,7 @@ - 0.87 + 0.88 0 diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index 199b0baf45..20e50aa752 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -1287,7 +1287,7 @@ private void onGrpcEnd( assert initialAck <= initialSeq; - producer.doKafkaEnd(traceId, authorization, true); + producer.doKafkaEnd(traceId, authorization); } private void onGrpcAbort( @@ -1470,7 +1470,7 @@ protected void onKafkaEnd( { if (GrpcKafkaState.initialClosed(state)) { - producer.doKafkaEnd(traceId, authorization, true); + producer.doKafkaEnd(traceId, authorization); correlater.doKafkaEnd(traceId, authorization); } } @@ -1651,7 +1651,7 @@ private void doGrpcReset( private final class GrpcProduceNoReplyProxy extends GrpcProxy { - private final KafkaProduceProxy delegate; + private final KafkaProduceNoReplyProxy delegate; private final long resolvedId; private GrpcProduceNoReplyProxy( @@ -1664,7 +1664,7 @@ private GrpcProduceNoReplyProxy( { super(grpc, originId, routedId, initialId); this.resolvedId = resolvedId; - this.delegate = new KafkaProduceProxy(routedId, resolvedId, this, result); + this.delegate = new KafkaProduceNoReplyProxy(routedId, resolvedId, this, result); } private void onGrpcMessage( @@ -1785,7 +1785,7 @@ private void onGrpcEnd( assert initialAck <= initialSeq; - delegate.doKafkaEnd(traceId, authorization, false); + delegate.doKafkaEnd(traceId, authorization); if (!GrpcKafkaState.replyOpening(state)) { @@ -1903,18 +1903,6 @@ protected void onKafkaBegin( } } - @Override - protected void onKafkaData( - long traceId, - long authorization, - long budgetId, - int reserved, - int flags, - OctetsFW payload, - KafkaDataExFW kafkaDataEx) - { - } - @Override protected void onKafkaEnd( long traceId, @@ -1922,7 +1910,7 @@ protected void onKafkaEnd( { if (GrpcKafkaState.initialClosed(state)) { - delegate.doKafkaEnd(traceId, authorization, false); + delegate.doKafkaEnd(traceId, authorization); } } @@ -2045,31 +2033,31 @@ private void doGrpcReset( } } - private final class KafkaProduceProxy + private class KafkaProduceProxy { - private MessageConsumer kafka; - private final long originId; - private final long routedId; - private final long initialId; - private final long replyId; - private final GrpcKafkaWithProduceResult result; - private final GrpcProxy delegate; + protected MessageConsumer kafka; + protected final long originId; + protected final long routedId; + protected final long initialId; + protected final long replyId; + protected final GrpcKafkaWithProduceResult result; + protected final GrpcProxy delegate; - private int state; + protected int state; - private long initialSeq; - private long initialAck; - private int initialMax; - private long initialBud; + protected long initialSeq; + protected long initialAck; + protected int initialMax; + protected long initialBud; - private long replySeq; - private long replyAck; - private int replyMax; - private long replyBud; - private int replyPad; - private int replyCap; + protected long replySeq; + protected long replyAck; + protected int replyMax; + protected long replyBud; + protected int replyPad; + protected int replyCap; - private KafkaProduceProxy( + protected KafkaProduceProxy( long originId, long routedId, GrpcProxy delegate, @@ -2083,7 +2071,7 @@ private KafkaProduceProxy( this.replyId = supplyReplyId.applyAsLong(initialId); } - private void doKafkaBegin( + protected void doKafkaBegin( long traceId, long authorization, long affinity) @@ -2097,7 +2085,7 @@ private void doKafkaBegin( traceId, authorization, affinity, result); } - private void doKafkaData( + protected void doKafkaData( long traceId, long authorization, long budgetId, @@ -2114,10 +2102,9 @@ private void doKafkaData( assert initialSeq <= initialAck + initialMax; } - private void doKafkaEnd( + protected void doKafkaEnd( long traceId, - long authorization, - boolean tombstone) + long authorization) { if (!GrpcKafkaState.initialClosed(state)) { @@ -2126,17 +2113,14 @@ private void doKafkaEnd( initialMax = delegate.initialMax; state = GrpcKafkaState.closeInitial(state); - if (tombstone) - { - doKafkaDataNull(traceId, authorization); - } + doKafkaDataNull(traceId, authorization); doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, authorization); } } - private void doKafkaAbort( + protected void doKafkaAbort( long traceId, long authorization) { @@ -2185,7 +2169,7 @@ private void onKafkaMessage( } } - private void onKafkaBegin( + protected void onKafkaBegin( BeginFW begin) { final long sequence = begin.sequence(); @@ -2205,7 +2189,7 @@ private void onKafkaBegin( doKafkaWindow(traceId); } - private void onKafkaEnd( + protected void onKafkaEnd( EndFW end) { final long sequence = end.sequence(); @@ -2224,7 +2208,7 @@ private void onKafkaEnd( delegate.onKafkaEnd(traceId, authorization); } - private void onKafkaAbort( + protected void onKafkaAbort( AbortFW abort) { final long sequence = abort.sequence(); @@ -2243,7 +2227,7 @@ private void onKafkaAbort( delegate.onKafkaAbort(traceId, authorization); } - private void onKafkaWindow( + protected void onKafkaWindow( WindowFW window) { final long sequence = window.sequence(); @@ -2271,15 +2255,17 @@ private void onKafkaWindow( doKafkaEndAck(authorization, traceId); } - private void doKafkaEndAck(long authorization, long traceId) + protected void doKafkaEndAck( + long authorization, + long traceId) { if (GrpcKafkaState.initialClosing(state) && initialSeq == initialAck) { - doKafkaEnd(traceId, authorization, true); + doKafkaEnd(traceId, authorization); } } - private void onKafkaReset( + protected void onKafkaReset( ResetFW reset) { final long sequence = reset.sequence(); @@ -2300,7 +2286,7 @@ private void onKafkaReset( doKafkaReset(traceId, authorization); } - private void doKafkaReset( + protected void doKafkaReset( long traceId, long authorization) { @@ -2313,7 +2299,7 @@ private void doKafkaReset( } } - private void doKafkaWindow( + protected void doKafkaWindow( long traceId) { if (kafka != null && !GrpcKafkaState.replyClosed(state)) @@ -2329,7 +2315,7 @@ private void doKafkaWindow( } } - private void doKafkaDataNull( + protected void doKafkaDataNull( long traceId, long authorization) { @@ -2347,6 +2333,35 @@ private void doKafkaDataNull( } } + private final class KafkaProduceNoReplyProxy extends KafkaProduceProxy + { + private KafkaProduceNoReplyProxy( + long originId, + long routedId, + GrpcProxy delegate, + GrpcKafkaWithProduceResult result) + { + super(originId, routedId, delegate, result); + } + + @Override + protected void doKafkaEnd( + long traceId, + long authorization) + { + if (!GrpcKafkaState.initialClosed(state)) + { + initialSeq = delegate.initialSeq; + initialAck = delegate.initialAck; + initialMax = delegate.initialMax; + state = GrpcKafkaState.closeInitial(state); + + doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization); + } + } + } + private final class KafkaCorrelateProxy { private MessageConsumer kafka; From da6d1373e4a9e8cc7d6668c704d792df724cabf4 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 4 Feb 2025 15:32:13 +0530 Subject: [PATCH 6/6] addressing review comments --- .../grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java index 20e50aa752..9e33f92c1f 100644 --- a/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java +++ b/runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/stream/GrpcKafkaProxyFactory.java @@ -1651,7 +1651,7 @@ private void doGrpcReset( private final class GrpcProduceNoReplyProxy extends GrpcProxy { - private final KafkaProduceNoReplyProxy delegate; + private final KafkaProduceProxy delegate; private final long resolvedId; private GrpcProduceNoReplyProxy(