Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding gRPC server for cluster mode #3572

Merged
merged 1 commit into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class CoreModuleConfig extends ModuleConfig {
private String gRPCHost;
private int gRPCPort;
private boolean gRPCSslEnabled = false;
private String gRPCSslKeyPath;
private String gRPCSslCertChainPath;
private String gRPCSslTrustedCAPath;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
private int gRPCMaxConcurrentCallsPerConnection;
private int gRPCMaxMessageSize;

/**
* The max length of the service name.
*/
Expand Down Expand Up @@ -195,4 +206,84 @@ public int getRemoteTimeout() {
public void setRemoteTimeout(int remoteTimeout) {
this.remoteTimeout = remoteTimeout;
}

public String getGRPCHost() {
return gRPCHost;
}

public void setGRPCHost(String gRPCHost) {
this.gRPCHost = gRPCHost;
}

public int getGRPCPort() {
return gRPCPort;
}

public void setGRPCPort(int gRPCPort) {
this.gRPCPort = gRPCPort;
}

public boolean getGRPCSslEnabled() {
return gRPCSslEnabled;
}

public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
this.gRPCSslEnabled = gRPCSslEnabled;
}

public String getGRPCSslKeyPath() {
return gRPCSslKeyPath;
}

public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
this.gRPCSslKeyPath = gRPCSslKeyPath;
}

public String getGRPCSslCertChainPath() {
return gRPCSslCertChainPath;
}

public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
this.gRPCSslCertChainPath = gRPCSslCertChainPath;
}

public String getGRPCSslTrustedCAPath() {
return gRPCSslTrustedCAPath;
}

public void setGRPCSslTrustedCAPath(String gRPCSslTrustedCAPath) {
this.gRPCSslTrustedCAPath = gRPCSslTrustedCAPath;
}

public int getGRPCThreadPoolSize() {
return gRPCThreadPoolSize;
}

public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
this.gRPCThreadPoolSize = gRPCThreadPoolSize;
}

public int getGRPCThreadPoolQueueSize() {
return gRPCThreadPoolQueueSize;
}

public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
}

public int getGRPCMaxConcurrentCallsPerConnection() {
return gRPCMaxConcurrentCallsPerConnection;
}

public void setGRPCMaxConcurrentCallsPerConnection(int gRPCMaxConcurrentCallsPerConnection) {
this.gRPCMaxConcurrentCallsPerConnection = gRPCMaxConcurrentCallsPerConnection;
}

public int getGRPCMaxMessageSize() {
return gRPCMaxMessageSize;
}

public void setGRPCMaxMessageSize(int gRPCMaxMessageSize) {
this.gRPCMaxMessageSize = gRPCMaxMessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package zipkin.server.core;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
Expand Down Expand Up @@ -48,8 +52,12 @@
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
Expand All @@ -69,8 +77,10 @@
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import zipkin.server.core.services.EmptyComponentLibraryCatalogService;
import zipkin.server.core.services.EmptyGRPCHandlerRegister;
import zipkin.server.core.services.EmptyHTTPHandlerRegister;
import zipkin.server.core.services.EmptyNetworkAddressAliasCache;
import zipkin.server.core.services.ZipkinConfigService;
Expand All @@ -85,6 +95,8 @@ public class CoreModuleProvider extends ModuleProvider {
private final ZipkinSourceReceiverImpl receiver;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
private RemoteClientManager remoteClientManager;
private GRPCServer grpcServer;

public CoreModuleProvider() {
this.annotationScan = new AnnotationScan();
Expand Down Expand Up @@ -138,12 +150,43 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
throw new ModuleStartException(e.getMessage(), e);
}

if (moduleConfig.getGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
null
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}
if (moduleConfig.getGRPCMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getGRPCMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getGRPCMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getGRPCMaxMessageSize());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();

if (moduleConfig.getGRPCSslEnabled()) {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
moduleConfig.getGRPCSslTrustedCAPath()
);
} else {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager()));
this.registerServiceImplementation(DownSamplingConfigService.class, new DownSamplingConfigService(Collections.emptyList()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new EmptyGRPCHandlerRegister());
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(HTTPHandlerRegister.class, new EmptyHTTPHandlerRegister());
this.registerServiceImplementation(IComponentLibraryCatalogService.class, new EmptyComponentLibraryCatalogService());
this.registerServiceImplementation(SourceReceiver.class, receiver);
Expand Down Expand Up @@ -177,7 +220,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(ContinuousProfilingQueryService.class, new ContinuousProfilingQueryService(getManager()));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
this.registerServiceImplementation(OALEngineLoaderService.class, new OALEngineLoaderService(getManager()));
this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout()));
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
this.registerServiceImplementation(UITemplateManagementService.class, new UITemplateManagementService(getManager()));
this.registerServiceImplementation(UIMenuManagementService.class, new UIMenuManagementService(getManager(), swConfig));

Expand All @@ -198,16 +241,39 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());

try {
receiver.scan();
annotationScan.scan();
} catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}

Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString());
ClusterCoordinator coordinator = this.getManager()
.find(ClusterModule.NAME)
.provider()
.getService(ClusterCoordinator.class);
coordinator.registerWatcher(remoteClientManager);
coordinator.start();
RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
coordinator.registerRemote(gRPCServerInstance);
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
try {
if (!RunningMode.isInitMode()) {
grpcServer.start();
remoteClientManager.start();
}
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
PersistenceTimer.INSTANCE.start(getManager(), swConfig);
DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig);
Expand Down
10 changes: 10 additions & 0 deletions zipkin-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ core:
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}
gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0}
gRPCPort: ${ZIPKIN_GRPC_PORT:11800}
gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1}
gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1}
gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false}
gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""}
gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""}
gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""}
gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0}
gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0}

storage:
selector: ${ZIPKIN_STORAGE:h2}
Expand Down
Loading