Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streaming service resolution and header passing #291

Merged
merged 9 commits into from
Jun 21, 2021
8 changes: 4 additions & 4 deletions grpc-ballerina/streaming_client.bal
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public client class StreamingClient {
if (externIsBidirectional(self)) {
if (self.serverStream is stream<anydata, Error?>) {
var nextRecord = (<stream<anydata, Error?>>self.serverStream).next();
var headerMap = externGetHeaderMap(self);
var headerMap = externGetHeaderMap(self, true);
if (headerMap is map<string|string[]>) {
headers = headerMap;
}
Expand All @@ -78,7 +78,7 @@ public client class StreamingClient {
if (result is stream<anydata, Error?>) {
self.serverStream = result;
var nextRecord = (<stream<anydata, Error?>>self.serverStream).next();
var headerMap = externGetHeaderMap(self);
var headerMap = externGetHeaderMap(self, true);
if (headerMap is map<string|string[]>) {
headers = headerMap;
}
Expand All @@ -95,7 +95,7 @@ public client class StreamingClient {
}
} else {
var result = externReceive(self);
var headerMap = externGetHeaderMap(self);
var headerMap = externGetHeaderMap(self, false);
if (headerMap is map<string|string[]>) {
headers = headerMap;
}
Expand Down Expand Up @@ -135,7 +135,7 @@ isolated function externIsBidirectional(StreamingClient streamConnection) return
'class: "org.ballerinalang.net.grpc.nativeimpl.streamingclient.FunctionUtils"
} external;

isolated function externGetHeaderMap(StreamingClient streamConnection) returns map<string|string[]>? =
isolated function externGetHeaderMap(StreamingClient streamConnection, boolean isBidirectional) returns map<string|string[]>? =
@java:Method {
'class: "org.ballerinalang.net.grpc.nativeimpl.streamingclient.FunctionUtils"
} external;
8 changes: 4 additions & 4 deletions grpc-ballerina/tests/01_advanced_type_service_pb.bal
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ public type ContextNil record {|
map<string|string[]> headers;
|};

public type ContextString record {|
string content;
map<string|string[]> headers;
|};
//public type ContextString record {|
// string content;
// map<string|string[]> headers;
//|};

public type ContextStockRequest record {|
StockRequest content;
Expand Down
54 changes: 27 additions & 27 deletions grpc-ballerina/tests/06_server_streaming_service_pb.bal
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,29 @@ public isolated client class HelloWorld45Client {
}
}

public class StringStream {
private stream<anydata, Error?> anydataStream;

public isolated function init(stream<anydata, Error?> anydataStream) {
self.anydataStream = anydataStream;
}

public isolated function next() returns record {|string value;|}|Error? {
var streamValue = self.anydataStream.next();
if (streamValue is ()) {
return streamValue;
} else if (streamValue is Error) {
return streamValue;
} else {
record {|string value;|} nextRecord = {value: <string>streamValue.value};
return nextRecord;
}
}

public isolated function close() returns Error? {
return self.anydataStream.close();
}
}
//public class StringStream {
// private stream<anydata, Error?> anydataStream;
//
// public isolated function init(stream<anydata, Error?> anydataStream) {
// self.anydataStream = anydataStream;
// }
//
// public isolated function next() returns record {|string value;|}|Error? {
// var streamValue = self.anydataStream.next();
// if (streamValue is ()) {
// return streamValue;
// } else if (streamValue is Error) {
// return streamValue;
// } else {
// record {|string value;|} nextRecord = {value: <string>streamValue.value};
// return nextRecord;
// }
// }
//
// public isolated function close() returns Error? {
// return self.anydataStream.close();
// }
//}

public client class HelloWorld45StringCaller {
private Caller caller;
Expand Down Expand Up @@ -92,10 +92,10 @@ public client class HelloWorld45StringCaller {
}
}

public type ContextStringStream record {|
stream<string, error?> content;
map<string|string[]> headers;
|};
//public type ContextStringStream record {|
// stream<string, error?> content;
// map<string|string[]> headers;
//|};

//public type ContextString record {|
// string content;
Expand Down
34 changes: 34 additions & 0 deletions grpc-ballerina/tests/45_services_with_headers.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

syntax = "proto3";

service HeadersService {
rpc unary (HSReq) returns (HSRes);
rpc serverStr (HSReq) returns (stream HSRes);
rpc clientStr (stream HSReq) returns (HSRes);
rpc bidirectionalStr (stream HSReq) returns (stream HSRes);
}

message HSReq {
string name = 1;
string message = 2;
}

message HSRes {
string name = 1;
string message = 2;
}
95 changes: 95 additions & 0 deletions grpc-ballerina/tests/45_services_with_headers_client.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2021 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/test;

@test:Config {enable: true}
function testUnaryWithHeadersContext() returns error? {
HeadersServiceClient ep = check new ("http://localhost:9145");
HSReq reqMsg = {name: "Ann", message: "Hey"};
map<string|string[]> headers = {"unary-req-header": ["1234567890", "2233445677"]};

ContextHSRes res = check ep->unaryContext({content: reqMsg, headers: headers});
test:assertEquals(res.content, reqMsg);
test:assertEquals(res.headers["unary-res-header"], "2233445677");
}

@test:Config {enable: true}
function testServerStreamingWithHeadersContext() returns error? {
HeadersServiceClient ep = check new ("http://localhost:9145");
HSReq reqMsg = {name: "Ann", message: "Hey"};
map<string|string[]> headers = {"server-steaming-req-header": ["1234567890", "2233445677"]};

ContextHSResStream res = check ep->serverStrContext({content: reqMsg, headers: headers});
test:assertEquals(res.headers["server-steaming-res-header"], "2233445677");

}

@test:Config {enable: true}
function testClientStreamingWithContextHeaders() returns error? {
HeadersServiceClient ep = check new ("http://localhost:9145");
ClientStrStreamingClient streamingClient = check ep->clientStr();

HSRes[] responses = [
{name: "Ann", message: "Hey"}
];
int i = 0;
map<string|string[]> headers = {"client-steaming-req-header": ["1234567890", "2233445677"]};
foreach HSRes res in responses {
if i == 0 {
check streamingClient->sendContextHSReq({content: res, headers: headers});
} else {
check streamingClient->sendHSReq(res);
}
i += 1;
}
check streamingClient->complete();
ContextHSRes? res = check streamingClient->receiveContextHSRes();
if res is ContextHSRes {
test:assertEquals(res.content, {name: "Ann", message: "Hey"});
test:assertEquals(res.headers["client-steaming-res-header"], "2233445677");
} else {
test:assertFail(msg = "Expected output not found");
}
}

@test:Config {enable: true}
function testBidirectionalStreamingWithContextHeaders() returns error? {
HeadersServiceClient ep = check new ("http://localhost:9145");
BidirectionalStrStreamingClient streamingClient = check ep->bidirectionalStr();

HSRes[] responses = [
{name: "Ann", message: "Hey"}
];
int i = 0;
map<string|string[]> headers = {"bidi-steaming-req-header": ["1234567890", "2233445677"]};
foreach HSRes res in responses {
if i == 0 {
check streamingClient->sendContextHSReq({content: res, headers: headers});
} else {
check streamingClient->sendHSReq(res);
}
i += 1;
}
check streamingClient->complete();
ContextHSRes? res = check streamingClient->receiveContextHSRes();
if res is ContextHSRes {
test:assertEquals(res.content, {name: "Ann", message: "Hey"});
test:assertEquals(res.headers["bidi-steaming-res-header"], "2233445677");
} else {
test:assertFail(msg = "Expected output not found");
}
}
Loading