Skip to content

Commit

Permalink
Merge pull request #30 from reactivegroup/feature/rpc_log
Browse files Browse the repository at this point in the history
feat: add metric for rpc headers
  • Loading branch information
kevinten10 authored Nov 2, 2021
2 parents 0b7f314 + 8f2fe7e commit 3edbe01
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>capa-parent</artifactId>
<groupId>group.rxcloud</groupId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-examples</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class DemoRpcClient {
/**
* Identifier in Capa for the service this client will invoke.
*/
private static final String SERVICE_APP_ID = "test";
private static final String SERVICE_APP_ID = "12345.helloworld";

public static void main(String[] args) {
CapaRpcClient capaRpcClient = new CapaRpcClientBuilder().build();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>group.rxcloud</groupId>
<artifactId>capa-parent</artifactId>
<packaging>pom</packaging>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
<name>capa-sdk-parent</name>
<description>SDK for Capa.</description>
<url>https://github.com/reactivegroup</url>
Expand Down
2 changes: 1 addition & 1 deletion sdk-component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>group.rxcloud</groupId>
<artifactId>capa-parent</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-sdk-component</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion sdk-infrastructure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>capa-parent</artifactId>
<groupId>group.rxcloud</groupId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-sdk-infrastructure</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion sdk-spi-demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>capa-parent</artifactId>
<groupId>group.rxcloud</groupId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-sdk-spi-demo</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion sdk-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>capa-parent</artifactId>
<groupId>group.rxcloud</groupId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-sdk-spi</artifactId>
Expand Down
44 changes: 42 additions & 2 deletions sdk-spi/src/main/java/group/rxcloud/capa/spi/http/CapaHttpSpi.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import group.rxcloud.capa.spi.config.CapaSpiOptionsLoader;
import group.rxcloud.capa.spi.config.CapaSpiProperties;
import group.rxcloud.capa.spi.config.RpcServiceOptions;
import group.rxcloud.cloudruntimes.domain.core.invocation.HttpExtension;
import group.rxcloud.cloudruntimes.utils.TypeRef;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
Expand All @@ -47,8 +48,8 @@ public CapaHttpSpi(OkHttpClient httpClient, CapaObjectSerializer objectSerialize
/**
* Templates, delegate to specific http invoker.
*
* @param httpMethod Ignore, fix to POST. TODO
* @param urlParameters Ignore, fix to EMPTY. TODO
* @param httpMethod Ignore, fix to POST. FIXME
* @param urlParameters Ignore, fix to EMPTY. FIXME
*/
@Override
protected <T> CompletableFuture<HttpResponse<T>> doInvokeApi(String httpMethod,
Expand All @@ -72,6 +73,22 @@ protected <T> CompletableFuture<HttpResponse<T>> doInvokeApi(String httpMethod,
logger.debug("[CapaHttpSpi] invoke rpc context[{}]", context);
}
}
// FIXME Ignore, fix to POST.
if (!HttpExtension.POST.getMethod().toString().equalsIgnoreCase(httpMethod)) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaHttpSpi] invoke rpc httpMethod[{}] only support POST now.",
httpMethod);
}
httpMethod = HttpExtension.POST.getMethod().toString();
}
// FIXME Ignore, fix to EMPTY.
if (urlParameters != null && !urlParameters.isEmpty()) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaHttpSpi] invoke rpc urlParameters[{}] not supported now.",
urlParameters);
}
urlParameters = null;
}

// parse url path segments
Objects.requireNonNull(pathSegments, "pathSegments");
Expand All @@ -96,6 +113,29 @@ protected <T> CompletableFuture<HttpResponse<T>> doInvokeApi(String httpMethod,
// spi invoke
CompletableFuture<HttpResponse<T>> invokeSpiApi =
invokeSpiApi(appId, method, requestData, headers, type, rpcServiceOptions);
invokeSpiApi.whenComplete((tHttpResponse, throwable) -> {
if (throwable != null) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaHttpSpi] invoke rpc response error",
throwable);
}
return;
}
if (tHttpResponse == null) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaHttpSpi] invoke rpc response empty[{}]",
tHttpResponse);
}
return;
}
final int responseStatusCode = tHttpResponse.getStatusCode();
final Map<String, String> responseHeaders = tHttpResponse.getHeaders();
final T responseBody = tHttpResponse.getBody();
if (logger.isDebugEnabled()) {
logger.debug("[CapaHttpSpi] invoke rpc response code[{}] headers[{}] body[{}]",
responseStatusCode, responseHeaders, responseBody);
}
});
return invokeSpiApi;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import group.rxcloud.cloudruntimes.domain.core.invocation.Metadata;
import group.rxcloud.cloudruntimes.utils.TypeRef;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Callback;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.MediaType;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.slf4j.Logger;
Expand All @@ -42,7 +43,7 @@
import java.util.concurrent.CompletableFuture;

/**
* The type Capa serialize http spi.
* The Capa http spi with default serializer process.
*/
public abstract class CapaSerializeHttpSpi extends CapaHttpSpi {

Expand All @@ -68,9 +69,17 @@ protected byte[] getRequestWithSerialize(Object requestData) {
try {
return objectSerializer.serialize(requestData);
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaSerializeHttpSpi] serialize rpc request[{}] io error",
requestData, e);
}
throw new CapaException(CapaErrorContext.PARAMETER_RPC_REQUEST_SERIALIZE_ERROR,
"Request Type: " + requestData.getClass().getName());
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaSerializeHttpSpi] serialize rpc request[{}] error",
requestData, e);
}
throw new CapaException(CapaErrorContext.PARAMETER_RPC_REQUEST_SERIALIZE_ERROR,
"Request Type: " + requestData.getClass().getName(), e);
}
Expand All @@ -84,7 +93,6 @@ protected byte[] getRequestWithSerialize(Object requestData) {
* @return the request body with byte[] serialize
*/
protected RequestBody getRequestBodyWithSerialize(Object requestData, Map<String, String> headers) {
byte[] serializedRequestBody = getRequestWithSerialize(requestData);
final String contentType = headers != null
? headers.get(Metadata.CONTENT_TYPE)
: null;
Expand All @@ -97,11 +105,27 @@ protected RequestBody getRequestBodyWithSerialize(Object requestData, Map<String
? REQUEST_BODY_EMPTY_JSON
: RequestBody.Companion.create(new byte[0], mediaType);
} else {
byte[] serializedRequestBody = getRequestWithSerialize(requestData);
body = RequestBody.Companion.create(serializedRequestBody, mediaType);
}
return body;
}

/**
* Gets request headers with given params.
*
* @param headersParams user given params
* @return the request headers
*/
protected Headers getRequestHeaderWithParams(Map<String, String> headersParams) {
okhttp3.Headers.Builder headersBuilder = new okhttp3.Headers.Builder();
if (headersParams == null || headersParams.size() == 0) {
return headersBuilder.build();
}
headersParams.forEach(headersBuilder::add);
return headersBuilder.build();
}

/**
* Http async call
*/
Expand Down Expand Up @@ -130,21 +154,23 @@ protected <T> CompletableFuture<HttpResponse<T>> doAsyncInvoke0(Request request,
* @return the response body with byte[] deserialize
*/
protected <T> HttpResponse<T> getResponseBodyWithDeserialize(TypeRef<T> type, HttpResponse<byte[]> httpResponse) {
final int httpResponseStatusCode = httpResponse.getStatusCode();
final Map<String, String> httpResponseHeaders = httpResponse.getHeaders();
final byte[] httpResponseBody = httpResponse.getBody();
try {
T responseObject = objectSerializer.deserialize(httpResponseBody, type);
return new HttpResponse<>(responseObject, httpResponse.getHeaders(), httpResponse.getStatusCode());
return new HttpResponse<>(responseObject, httpResponseHeaders, httpResponseStatusCode);
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaSerializeHttpSpi] deserialize rpc response[{}] type[{}] io error",
httpResponseBody, type, e);
logger.warn("[CapaSerializeHttpSpi] deserialize rpc statusCode[{}] headers[{}] response[{}] type[{}] io error",
httpResponseStatusCode, httpResponseHeaders, httpResponseBody, type, e);
}
throw new CapaException(CapaErrorContext.PARAMETER_RPC_RESPONSE_DESERIALIZE_ERROR,
"Response Type: " + type, e);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("[CapaSerializeHttpSpi] deserialize rpc response[{}] type[{}] error",
httpResponseBody, type, e);
logger.warn("[CapaSerializeHttpSpi] deserialize rpc statusCode[{}] headers[{}] response[{}] type[{}] error",
httpResponseStatusCode, httpResponseHeaders, httpResponseBody, type, e);
}
throw new CapaException(CapaErrorContext.PARAMETER_RPC_RESPONSE_DESERIALIZE_ERROR,
"Response Type: " + type, e);
Expand Down Expand Up @@ -190,9 +216,15 @@ public void onResponse(Call call, Response response) throws IOException {
return;
}

Map<String, String> mapHeaders = new HashMap<>();
// response.headers()
// .forEach(pair -> mapHeaders.put(pair.getFirst(), pair.getSecond()));
Map<String, String> mapHeaders;
Headers responseHeaders = response.headers();
if (responseHeaders == null || responseHeaders.size() == 0) {
mapHeaders = new HashMap<>(2, 1);
} else {
mapHeaders = new HashMap<>(responseHeaders.size() << 1);
responseHeaders.forEach(pair -> mapHeaders.put(pair.getFirst(), pair.getSecond()));
}

HttpResponse<byte[]> httpResponse = new HttpResponse<>(bodyBytes, mapHeaders, response.code());
future.complete(httpResponse);
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>group.rxcloud</groupId>
<artifactId>capa-parent</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.4.RELEASE</version>
</parent>

<artifactId>capa-sdk</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
context,
type))
.flatMap(httpResponse -> {
T object = httpResponse.getBody();
if (object != null) {
return Mono.just(object);
final T responseBody = httpResponse.getBody();
if (responseBody != null) {
return Mono.just(responseBody);
}
return Mono.empty();
});
Expand Down

0 comments on commit 3edbe01

Please sign in to comment.