Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement gRPC support for msf4j platform #488

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 127 additions & 15 deletions core/src/main/java/org/wso2/msf4j/MicroservicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.msf4j.grpc.ServerBuilder;
import org.wso2.msf4j.grpc.exception.GrpcServerException;
import org.wso2.msf4j.interceptor.RequestInterceptor;
import org.wso2.msf4j.interceptor.ResponseInterceptor;
import org.wso2.msf4j.internal.DataHolder;
Expand All @@ -39,8 +41,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import javax.ws.rs.Path;
import javax.ws.rs.ext.ExceptionMapper;

Expand All @@ -66,6 +71,14 @@ public class MicroservicesRunner {
private long startTime = System.currentTimeMillis();
private boolean isStarted;

/**
* gRPC Implementation.
*/
private boolean isGrpcService = false;
private ServerBuilder grpcBuilder = null;
private static final String HTTP_SCHEMA_KEY = "http";
private static final int DEFAULT_GRPC_PORT = 8080;

/**
* Creates a MicroservicesRunner instance which will be used for deploying microservices. Allows specifying
* ports on which the microservices in this MicroservicesRunner are deployed.
Expand All @@ -88,6 +101,62 @@ public MicroservicesRunner() {
configureTransport();
}

/**
* Creates a MicroservicesRunner instance which will be used for deploying microservices as gRPC or REST.
* Allows specifying ports on which the microservices in this MicroservicesRunner are deployed.
*
* @param port The port on which the microservices are exposed
* @param isGrpcService flag to determine whether service is deployed as gRPC or REST
*/
public MicroservicesRunner(int port, boolean isGrpcService) {
this.isGrpcService = isGrpcService;
if (isGrpcService) {
ServiceLoader<ServerBuilder> serverBuilders = ServiceLoader.load(ServerBuilder.class);
Iterator<ServerBuilder> iterator = serverBuilders.iterator();
grpcBuilder = iterator.next();
if (grpcBuilder == null) {
throw new RuntimeException("Error while registering gRPC server. Server builder service is not " +
"registered");
}
grpcBuilder.init(port);
} else {
configureTransport(port);
}
}

/**
* Creates a MicroservicesRunner instance which will take care of initializing Netty transports in the file
* pointed to by the System property <code>transports.netty.conf</code>.
* <p>
* If that System property is not specified, it will start a single Netty transport on port 8080.
* <p>
* @param isGrpcService flag to determine whether service is deployed as gRPC or REST
*/
public MicroservicesRunner(boolean isGrpcService) {
this.isGrpcService = isGrpcService;
int port = DEFAULT_GRPC_PORT;
if (isGrpcService) {
TransportsConfiguration transportsConfiguration = ConfigurationBuilder.getInstance().getConfiguration();
if (transportsConfiguration != null) {
Optional<ListenerConfiguration> configuration = transportsConfiguration.getListenerConfigurations()
.stream().filter(listenerConfiguration -> HTTP_SCHEMA_KEY.equals(listenerConfiguration
.getScheme())).findFirst();
port = configuration.isPresent() ? configuration.get().getPort() : port;
}

ServiceLoader<ServerBuilder> serverBuilders = ServiceLoader.load(ServerBuilder.class);
Iterator<ServerBuilder> iterator = serverBuilders.iterator();
grpcBuilder = iterator.next();
if (grpcBuilder == null) {
throw new RuntimeException("Error while registering gRPC server. Server builder service is not " +
"registered");
}
grpcBuilder.init(port);
} else {
configureTransport();
}
}

/**
* Deploy a microservice.
*
Expand All @@ -96,7 +165,11 @@ public MicroservicesRunner() {
*/
public MicroservicesRunner deploy(Object... microservice) {
checkState();
msRegistry.addService(microservice);
if (isGrpcService) {
grpcBuilder.addService(microservice);
} else {
msRegistry.addService(microservice);
}
return this;
}

Expand All @@ -108,6 +181,10 @@ public MicroservicesRunner deploy(Object... microservice) {
* @return this MicroservicesRunner object
*/
public MicroservicesRunner deploy(String basePath, Object microservice) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC services cannot register with the base path. only support " +
"in REST microservices");
}
Map<String, Object> valuesMap = new HashMap<>();
valuesMap.put("value", basePath);
RuntimeAnnotations.putAnnotation(microservice.getClass(), Path.class, valuesMap);
Expand All @@ -122,6 +199,9 @@ public MicroservicesRunner deploy(String basePath, Object microservice) {
* @return this MicroservicesRunner object.
*/
public MicroservicesRunner deployWebSocketEndpoint(Object webSocketEndpoint) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC streaming services is currently not supported");
}
endpointsRegistry.addEndpoint(webSocketEndpoint);
return this;
}
Expand All @@ -143,6 +223,10 @@ public MicroservicesRunner setSessionManager(SessionManager sessionManager) {
* @param requestInterceptor interceptor instances
*/
public MicroservicesRunner addGlobalRequestInterceptor(RequestInterceptor... requestInterceptor) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC service request interceptor is currently not " +
"supported");
}
msRegistry.addGlobalRequestInterceptor(requestInterceptor);
return this;
}
Expand All @@ -153,6 +237,10 @@ public MicroservicesRunner addGlobalRequestInterceptor(RequestInterceptor... req
* @param responseInterceptor interceptor instances
*/
public MicroservicesRunner addGlobalResponseInterceptor(ResponseInterceptor... responseInterceptor) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC service response interceptor is currently not " +
"supported");
}
msRegistry.addGlobalResponseInterceptor(responseInterceptor);
return this;
}
Expand All @@ -166,6 +254,10 @@ public MicroservicesRunner addGlobalResponseInterceptor(ResponseInterceptor... r
* @deprecated
*/
public MicroservicesRunner addInterceptor(Interceptor... interceptor) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC service interceptor is currently not " +
"supported");
}
msRegistry.addGlobalRequestInterceptor(interceptor);
msRegistry.addGlobalResponseInterceptor(interceptor);
return this;
Expand All @@ -178,6 +270,10 @@ public MicroservicesRunner addInterceptor(Interceptor... interceptor) {
* @return this MicroservicesRunner object
*/
public MicroservicesRunner addExceptionMapper(ExceptionMapper... exceptionMapper) {
if (isGrpcService) {
throw new UnsupportedOperationException("gRPC service exception mapping is currently not " +
"supported");
}
checkState();
msRegistry.addExceptionMapper(exceptionMapper);
return this;
Expand Down Expand Up @@ -245,26 +341,42 @@ private void checkState() {
* Start this Microservices runner. This will startup all the HTTP transports.
*/
public void start() {
msRegistry.getSessionManager().init();
handleServiceLifecycleMethods();
MSF4JHttpConnectorListener msf4JHttpConnectorListener = new MSF4JHttpConnectorListener();
MSF4JWSConnectorListener msf4JWSConnectorListener = new MSF4JWSConnectorListener();
serverConnectors.forEach(serverConnector -> {
ServerConnectorFuture serverConnectorFuture = serverConnector.start();
serverConnectorFuture.setHttpConnectorListener(msf4JHttpConnectorListener);
serverConnectorFuture.setWSConnectorListener(msf4JWSConnectorListener);
serverConnectorFuture.setPortBindingEventListener(new HttpConnectorPortBindingListener());
isStarted = true;
log.info("Microservices server started in " + (System.currentTimeMillis() - startTime) + "ms");
});
if (isGrpcService) {
try {
grpcBuilder.register();
grpcBuilder.start();
log.info("gRPC server server started in " + (System.currentTimeMillis() - startTime) + "ms");
grpcBuilder.blockUntilShutdown();
} catch (GrpcServerException | InterruptedException e) {
throw new RuntimeException("Error while starting the gRPC server.", e);
}
} else {
msRegistry.getSessionManager().init();
handleServiceLifecycleMethods();
MSF4JHttpConnectorListener msf4JHttpConnectorListener = new MSF4JHttpConnectorListener();
MSF4JWSConnectorListener msf4JWSConnectorListener = new MSF4JWSConnectorListener();
serverConnectors.forEach(serverConnector -> {
ServerConnectorFuture serverConnectorFuture = serverConnector.start();
serverConnectorFuture.setHttpConnectorListener(msf4JHttpConnectorListener);
serverConnectorFuture.setWSConnectorListener(msf4JWSConnectorListener);
serverConnectorFuture.setPortBindingEventListener(new HttpConnectorPortBindingListener());
isStarted = true;
log.info("Microservices server started in " + (System.currentTimeMillis() - startTime) + "ms");
});
}
}

/**
* Stop this Microservices runner. This will stop all the HTTP Transports.
*/
public void stop() {
serverConnectors.forEach(ServerConnector::stop);
log.info("Microservices server stopped");
if (isGrpcService) {
grpcBuilder.stop();
log.info("gRPC server stopped");
} else {
serverConnectors.forEach(ServerConnector::stop);
log.info("Microservices server stopped");
}
}

/**
Expand Down
63 changes: 63 additions & 0 deletions core/src/main/java/org/wso2/msf4j/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://wso2.com) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wso2.msf4j.grpc;

import org.wso2.msf4j.grpc.exception.GrpcServerException;

/**
* gRPC service builder initializes the gRPC server. deploys the microservices and starts the relevant transports.
*/
public interface ServerBuilder {

/**
* initialized Server Builder instance which will be used for deploying gRPC services.
* Allows specifying ports on which the microservices in this Builder are deployed.
*
* @param port The port on which the microservices are exposed
*/
public void init(int port);

/**
* Deploy a microservice.
*
* @param microservice The microservice which is to be deployed
* @return this Server Builder object
*/
public ServerBuilder addService(Object... microservice);

/**
* Shutdown grpc server.
*/
public void stop();

/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
public void blockUntilShutdown() throws InterruptedException;

/**
* Register all microservices in Server.
* @return this Server Builder object
* @throws GrpcServerException exception when there is an error in registering microservices.
*/
public ServerBuilder register() throws GrpcServerException;

/**
* Start this gRPC server. This will startup all the gRPC services.
* @throws GrpcServerException exception when there is an error in starting the server.
*/
public void start() throws GrpcServerException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://wso2.com) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wso2.msf4j.grpc.exception;

/**
* Exception that is thrown when gRPC service registration.
*/
public class GrpcServerException extends Exception {

public GrpcServerException(String message, Throwable throwable) {
super(message, throwable);
}

public GrpcServerException(String message) {
super(message);
}
}
Loading