Skip to content

Commit

Permalink
fix: Set x-goog-request-params for streaming pull request
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalaboulhosn committed Mar 14, 2023
1 parent 13cbbf9 commit 3185a3e
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
Expand Down Expand Up @@ -86,6 +88,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private Duration inititalStreamAckDeadline;

private final Map<String, List<String>> streamMetadata;

private final SubscriberStub subscriberStub;
private final int channelAffinity;
private final String subscription;
Expand Down Expand Up @@ -134,6 +138,9 @@ private StreamingSubscriberConnection(Builder builder) {
inititalStreamAckDeadline = builder.maxDurationPerAckExtension;
}

streamMetadata =
ImmutableMap.of("x-goog-request-params", ImmutableList.of("subscription=" + subscription));

subscriberStub = builder.subscriberStub;
channelAffinity = builder.channelAffinity;

Expand Down Expand Up @@ -273,7 +280,9 @@ private void initialize() {
.streamingPullCallable()
.splitCall(
responseObserver,
GrpcCallContext.createDefault().withChannelAffinity(channelAffinity));
GrpcCallContext.createDefault()
.withChannelAffinity(channelAffinity)
.withExtraHeaders(streamMetadata));

logger.log(Level.FINER, "Initializing stream to subscription {0}", subscription);
// We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
Expand Down

0 comments on commit 3185a3e

Please sign in to comment.