diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java index 4b45a038157..3edc980af90 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java @@ -17,6 +17,17 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; public class CoreModuleConfig extends ModuleConfig { + private String gRPCHost; + private int gRPCPort; + private boolean gRPCSslEnabled = false; + private String gRPCSslKeyPath; + private String gRPCSslCertChainPath; + private String gRPCSslTrustedCAPath; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; + private int gRPCMaxConcurrentCallsPerConnection; + private int gRPCMaxMessageSize; + /** * The max length of the service name. */ @@ -195,4 +206,84 @@ public int getRemoteTimeout() { public void setRemoteTimeout(int remoteTimeout) { this.remoteTimeout = remoteTimeout; } + + public String getGRPCHost() { + return gRPCHost; + } + + public void setGRPCHost(String gRPCHost) { + this.gRPCHost = gRPCHost; + } + + public int getGRPCPort() { + return gRPCPort; + } + + public void setGRPCPort(int gRPCPort) { + this.gRPCPort = gRPCPort; + } + + public boolean getGRPCSslEnabled() { + return gRPCSslEnabled; + } + + public void setGRPCSslEnabled(boolean gRPCSslEnabled) { + this.gRPCSslEnabled = gRPCSslEnabled; + } + + public String getGRPCSslKeyPath() { + return gRPCSslKeyPath; + } + + public void setGRPCSslKeyPath(String gRPCSslKeyPath) { + this.gRPCSslKeyPath = gRPCSslKeyPath; + } + + public String getGRPCSslCertChainPath() { + return gRPCSslCertChainPath; + } + + public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) { + this.gRPCSslCertChainPath = gRPCSslCertChainPath; + } + + public String getGRPCSslTrustedCAPath() { + return gRPCSslTrustedCAPath; + } + + public void setGRPCSslTrustedCAPath(String gRPCSslTrustedCAPath) { + this.gRPCSslTrustedCAPath = gRPCSslTrustedCAPath; + } + + public int getGRPCThreadPoolSize() { + return gRPCThreadPoolSize; + } + + public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) { + this.gRPCThreadPoolSize = gRPCThreadPoolSize; + } + + public int getGRPCThreadPoolQueueSize() { + return gRPCThreadPoolQueueSize; + } + + public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) { + this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize; + } + + public int getGRPCMaxConcurrentCallsPerConnection() { + return gRPCMaxConcurrentCallsPerConnection; + } + + public void setGRPCMaxConcurrentCallsPerConnection(int gRPCMaxConcurrentCallsPerConnection) { + this.gRPCMaxConcurrentCallsPerConnection = gRPCMaxConcurrentCallsPerConnection; + } + + public int getGRPCMaxMessageSize() { + return gRPCMaxMessageSize; + } + + public void setGRPCMaxMessageSize(int gRPCMaxMessageSize) { + this.gRPCMaxMessageSize = gRPCMaxMessageSize; + } } diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java index 88941af11cd..83d48dd86e8 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java @@ -14,11 +14,15 @@ package zipkin.server.core; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.RunningMode; import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache; import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache; +import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator; +import org.apache.skywalking.oap.server.core.cluster.ClusterModule; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; import org.apache.skywalking.oap.server.core.command.CommandService; import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService; @@ -48,8 +52,12 @@ import org.apache.skywalking.oap.server.core.query.TopologyQueryService; import org.apache.skywalking.oap.server.core.query.TraceQueryService; import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; +import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; +import org.apache.skywalking.oap.server.core.remote.client.Address; import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; +import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.SourceReceiver; @@ -69,8 +77,10 @@ import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.library.server.ServerException; +import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; +import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext; import zipkin.server.core.services.EmptyComponentLibraryCatalogService; -import zipkin.server.core.services.EmptyGRPCHandlerRegister; import zipkin.server.core.services.EmptyHTTPHandlerRegister; import zipkin.server.core.services.EmptyNetworkAddressAliasCache; import zipkin.server.core.services.ZipkinConfigService; @@ -85,6 +95,8 @@ public class CoreModuleProvider extends ModuleProvider { private final ZipkinSourceReceiverImpl receiver; private final AnnotationScan annotationScan; private final StorageModels storageModels; + private RemoteClientManager remoteClientManager; + private GRPCServer grpcServer; public CoreModuleProvider() { this.annotationScan = new AnnotationScan(); @@ -138,12 +150,43 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { throw new ModuleStartException(e.getMessage(), e); } + if (moduleConfig.getGRPCSslEnabled()) { + grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), + moduleConfig.getGRPCSslCertChainPath(), + moduleConfig.getGRPCSslKeyPath(), + null + ); + } else { + grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort()); + } + if (moduleConfig.getGRPCMaxConcurrentCallsPerConnection() > 0) { + grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getGRPCMaxConcurrentCallsPerConnection()); + } + if (moduleConfig.getGRPCMaxMessageSize() > 0) { + grpcServer.setMaxMessageSize(moduleConfig.getGRPCMaxMessageSize()); + } + if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize()); + } + if (moduleConfig.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); + } + grpcServer.initialize(); + + if (moduleConfig.getGRPCSslEnabled()) { + this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(), + moduleConfig.getGRPCSslTrustedCAPath() + ); + } else { + this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout()); + } + final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig(); this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager())); this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this)); this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager())); this.registerServiceImplementation(DownSamplingConfigService.class, new DownSamplingConfigService(Collections.emptyList())); - this.registerServiceImplementation(GRPCHandlerRegister.class, new EmptyGRPCHandlerRegister()); + this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); this.registerServiceImplementation(HTTPHandlerRegister.class, new EmptyHTTPHandlerRegister()); this.registerServiceImplementation(IComponentLibraryCatalogService.class, new EmptyComponentLibraryCatalogService()); this.registerServiceImplementation(SourceReceiver.class, receiver); @@ -177,7 +220,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { this.registerServiceImplementation(ContinuousProfilingQueryService.class, new ContinuousProfilingQueryService(getManager())); this.registerServiceImplementation(CommandService.class, new CommandService(getManager())); this.registerServiceImplementation(OALEngineLoaderService.class, new OALEngineLoaderService(getManager())); - this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout())); + this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); this.registerServiceImplementation(UITemplateManagementService.class, new UITemplateManagementService(getManager())); this.registerServiceImplementation(UIMenuManagementService.class, new UIMenuManagementService(getManager(), swConfig)); @@ -198,16 +241,39 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + grpcServer.addHandler(new RemoteServiceHandler(getManager())); + grpcServer.addHandler(new HealthCheckServiceHandler()); + try { receiver.scan(); annotationScan.scan(); } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) { throw new ModuleStartException(e.getMessage(), e); } + + Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true); + TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString()); + ClusterCoordinator coordinator = this.getManager() + .find(ClusterModule.NAME) + .provider() + .getService(ClusterCoordinator.class); + coordinator.registerWatcher(remoteClientManager); + coordinator.start(); + RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress); + coordinator.registerRemote(gRPCServerInstance); } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { + try { + if (!RunningMode.isInitMode()) { + grpcServer.start(); + remoteClientManager.start(); + } + } catch (ServerException e) { + throw new ModuleStartException(e.getMessage(), e); + } + final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig(); PersistenceTimer.INSTANCE.start(getManager(), swConfig); DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig); diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml index 4fd71d24d31..4f7a6cf5127 100644 --- a/zipkin-server/server-starter/src/main/resources/application.yml +++ b/zipkin-server/server-starter/src/main/resources/application.yml @@ -38,6 +38,16 @@ core: prepareThreads: ${ZIPKIN_PREPARE_THREADS:2} # The period of doing data persistence. Unit is second.Default value is 25s persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25} + gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0} + gRPCPort: ${ZIPKIN_GRPC_PORT:11800} + gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1} + gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1} + gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false} + gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""} + gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""} + gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""} + gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0} + gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0} storage: selector: ${ZIPKIN_STORAGE:h2}