Skip to content

Commit

Permalink
Support gRPC client stream/unary oneway (#1384)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitk-me authored Feb 4, 2025
1 parent 869533f commit cf16ddb
Show file tree
Hide file tree
Showing 17 changed files with 1,041 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public GrpcKafkaWithConfig adaptFromJson(
}
}

String newReplyTo = object.getString(REPLY_TO_NAME);
String newReplyTo = null;
if (object.containsKey(REPLY_TO_NAME))
{
newReplyTo = object.getString(REPLY_TO_NAME);
}

return new GrpcKafkaWithConfig(
new GrpcKafkaWithProduceConfig(newTopic, newProduceAcks, newProduceKey, newOverrides, newReplyTo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ public class GrpcKafkaWithProduceResult
this.nameBuffer = new ExpandableDirectByteBuffer();
this.nameBuffer.putStringWithoutLengthAscii(0, META_PREFIX);

hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
hash.updateHash(correlation.method.value());
hash.updateHash(method.value());
hash.updateHash(correlation.replyTo.value());
hash.updateHash(replyTo.value());
if (hasReplyTo())
{
hash.updateHash(correlation.service.value());
hash.updateHash(service.value());
hash.updateHash(correlation.method.value());
hash.updateHash(method.value());
hash.updateHash(correlation.replyTo.value());
hash.updateHash(replyTo.value());
}

if (overrides != null)
{
Expand Down Expand Up @@ -153,10 +156,14 @@ public void headers(
overrides.forEach(o -> builder.item(o::header));
}

builder.item(this::service);
builder.item(this::method);
builder.item(this::replyTo);
builder.item(this::correlationId);
if (hasReplyTo())
{
builder.item(this::service);
builder.item(this::method);
builder.item(this::replyTo);
builder.item(this::correlationId);
}

metadata.forEach(m -> builder.item(i -> metadata(i, m)));
}

Expand Down Expand Up @@ -241,4 +248,9 @@ public void filters(
.valueLen(hashCorrelationId.sizeof())
.value(hashCorrelationId.value(), 0, hashCorrelationId.sizeof()))));
}

public boolean hasReplyTo()
{
return replyTo != null && replyTo.value() != null;
}
}
Loading

0 comments on commit cf16ddb

Please sign in to comment.