diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 2415d960f4c..4ce2185032f 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -47,6 +47,11 @@ ../skywalking/oap-server/server-receiver-plugin/receiver-proto ../skywalking/oap-server/server-receiver-plugin/zipkin-receiver-plugin ../skywalking/oap-server/server-cluster-plugin/cluster-standalone-plugin + ../skywalking/oap-server/server-cluster-plugin/cluster-consul-plugin + ../skywalking/oap-server/server-cluster-plugin/cluster-etcd-plugin + ../skywalking/oap-server/server-cluster-plugin/cluster-kubernetes-plugin + ../skywalking/oap-server/server-cluster-plugin/cluster-nacos-plugin + ../skywalking/oap-server/server-cluster-plugin/cluster-zookeeper-plugin ../skywalking/oap-server/server-storage-plugin ../skywalking/oap-server/server-library ../skywalking/oap-server/server-query-plugin/zipkin-query-plugin diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java index 79bca876bd1..4b45a038157 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -64,6 +64,21 @@ public class CoreModuleConfig extends ModuleConfig { */ private int traceSampleRate = 10000; + /** + * The number of threads used to prepare metrics data to the storage. + */ + private int prepareThreads = 2; + + /** + * The period of doing data persistence. Unit is second. + */ + private int persistentPeriod = 25; + + /** + * Timeout for cluster internal communication, in seconds. + */ + private int remoteTimeout = 20; + private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join( Const.COMMA, "http.method" @@ -72,6 +87,8 @@ public class CoreModuleConfig extends ModuleConfig { public org.apache.skywalking.oap.server.core.CoreModuleConfig toSkyWalkingConfig() { final org.apache.skywalking.oap.server.core.CoreModuleConfig result = new org.apache.skywalking.oap.server.core.CoreModuleConfig(); result.setServiceCacheRefreshInterval(serviceCacheRefreshInterval); + result.setPrepareThreads(prepareThreads); + result.setPersistentPeriod(persistentPeriod); return result; } @@ -154,4 +171,28 @@ public int getTraceSampleRate() { public void setTraceSampleRate(int traceSampleRate) { this.traceSampleRate = traceSampleRate; } + + public int getPrepareThreads() { + return prepareThreads; + } + + public void setPrepareThreads(int prepareThreads) { + this.prepareThreads = prepareThreads; + } + + public int getPersistentPeriod() { + return persistentPeriod; + } + + public void setPersistentPeriod(int persistentPeriod) { + this.persistentPeriod = persistentPeriod; + } + + public int getRemoteTimeout() { + return remoteTimeout; + } + + public void setRemoteTimeout(int remoteTimeout) { + this.remoteTimeout = remoteTimeout; + } } diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java index 75eb7781fdb..88941af11cd 100644 --- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 The OpenZipkin Authors + * Copyright 2015-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,6 +16,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; +import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache; import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache; import org.apache.skywalking.oap.server.core.command.CommandService; @@ -50,13 +51,16 @@ import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; import org.apache.skywalking.oap.server.core.source.SourceReceiver; -import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; import org.apache.skywalking.oap.server.core.status.ServerStatusService; +import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; +import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.IModelManager; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator; import org.apache.skywalking.oap.server.core.storage.model.StorageModels; +import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter; import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter; import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService; @@ -71,17 +75,20 @@ import zipkin.server.core.services.EmptyNetworkAddressAliasCache; import zipkin.server.core.services.ZipkinConfigService; +import java.io.IOException; import java.util.Collections; public class CoreModuleProvider extends ModuleProvider { private CoreModuleConfig moduleConfig; private EndpointNameGrouping endpointNameGrouping; - private final SourceReceiverImpl receiver; + private final ZipkinSourceReceiverImpl receiver; + private final AnnotationScan annotationScan; private final StorageModels storageModels; public CoreModuleProvider() { - this.receiver = new SourceReceiverImpl(); + this.annotationScan = new AnnotationScan(); + this.receiver = new ZipkinSourceReceiverImpl(); this.storageModels = new StorageModels(); } @@ -121,6 +128,16 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { ); this.registerServiceImplementation(NamingControl.class, namingControl); + annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager())); + + AnnotationScan scopeScan = new AnnotationScan(); + scopeScan.registerListener(new DefaultScopeDefine.Listener()); + try { + scopeScan.scan(); + } catch (Exception e) { + throw new ModuleStartException(e.getMessage(), e); + } + final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig(); this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager())); this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this)); @@ -134,7 +151,6 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService); this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService); this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); - this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager())); this.registerServiceImplementation(ModelCreator.class, storageModels); this.registerServiceImplementation(IModelManager.class, storageModels); this.registerServiceImplementation(ModelManipulator.class, storageModels); @@ -161,7 +177,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { this.registerServiceImplementation(ContinuousProfilingQueryService.class, new ContinuousProfilingQueryService(getManager())); this.registerServiceImplementation(CommandService.class, new CommandService(getManager())); this.registerServiceImplementation(OALEngineLoaderService.class, new OALEngineLoaderService(getManager())); - this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager(), 0)); + this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout())); this.registerServiceImplementation(UITemplateManagementService.class, new UITemplateManagementService(getManager())); this.registerServiceImplementation(UIMenuManagementService.class, new UIMenuManagementService(getManager(), swConfig)); @@ -182,12 +198,19 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { @Override public void start() throws ServiceNotProvidedException, ModuleStartException { - + try { + receiver.scan(); + annotationScan.scan(); + } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) { + throw new ModuleStartException(e.getMessage(), e); + } } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException { - + final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig(); + PersistenceTimer.INSTANCE.start(getManager(), swConfig); + DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig); } @Override diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java new file mode 100644 index 00000000000..c4c33d79d2e --- /dev/null +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java @@ -0,0 +1,28 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.core; + +import org.apache.skywalking.oap.server.core.analysis.DispatcherManager; +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteDispatcher; + +public class ZipkinDispatcherManager extends DispatcherManager { + + @Override + public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException { + if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteDispatcher.class)) { + super.addIfAsSourceDispatcher(aClass); + } + } +} diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java new file mode 100644 index 00000000000..ce29494b666 --- /dev/null +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.core; + +import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener; +import org.apache.skywalking.oap.server.core.source.ISource; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; + +import java.io.IOException; + +public class ZipkinSourceReceiverImpl implements SourceReceiver { + private final ZipkinDispatcherManager mgr; + + public ZipkinSourceReceiverImpl() { + mgr = new ZipkinDispatcherManager(); + } + + @Override + public void receive(ISource source) { + mgr.forward(source); + } + + @Override + public DispatcherDetectorListener getDispatcherDetectorListener() { + return mgr; + } + + public void scan() throws IOException, IllegalAccessException, InstantiationException { + mgr.scan(); + } +} diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java new file mode 100644 index 00000000000..a1ba4398eb6 --- /dev/null +++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.server.core; + +import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener; +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData; +import org.apache.skywalking.oap.server.core.storage.StorageException; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; + +public class ZipkinStreamAnnotationListener extends StreamAnnotationListener { + + public ZipkinStreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) { + super(moduleDefineHolder); + } + + @Override + public void notify(Class aClass) throws StorageException { + // only including all zipkin streaming + if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteData.class)) { + super.notify(aClass); + } + } +} diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml index 60e03dce20a..4fd71d24d31 100644 --- a/zipkin-server/server-starter/src/main/resources/application.yml +++ b/zipkin-server/server-starter/src/main/resources/application.yml @@ -34,6 +34,10 @@ core: searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method} # The trace sample rate precision is 1/10000, should be between 0 and 10000 traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000} + # The number of threads used to prepare metrics data to the storage. + prepareThreads: ${ZIPKIN_PREPARE_THREADS:2} + # The period of doing data persistence. Unit is second.Default value is 25s + persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25} storage: selector: ${ZIPKIN_STORAGE:h2} @@ -217,4 +221,52 @@ telemetry: cluster: selector: standalone - standalone: \ No newline at end of file + standalone: + # Please check your ZooKeeper is 3.5+, However, it is also compatible with ZooKeeper 3.4.x. Replace the ZooKeeper 3.5+ + # library the oap-libs folder with your ZooKeeper 3.4.x library. + zookeeper: + namespace: ${ZIPKIN_NAMESPACE:""} + hostPort: ${ZIPKIN_CLUSTER_ZK_HOST_PORT:localhost:2181} + # Retry Policy + baseSleepTimeMs: ${ZIPKIN_CLUSTER_ZK_SLEEP_TIME:1000} # initial amount of time to wait between retries + maxRetries: ${ZIPKIN_CLUSTER_ZK_MAX_RETRIES:3} # max number of times to retry + # Enable ACL + enableACL: ${ZIPKIN_ZK_ENABLE_ACL:false} # disable ACL in default + schema: ${ZIPKIN_ZK_SCHEMA:digest} # only support digest schema + expression: ${ZIPKIN_ZK_EXPRESSION:zipkin:zipkin} + internalComHost: ${ZIPKIN_CLUSTER_INTERNAL_COM_HOST:""} + internalComPort: ${ZIPKIN_CLUSTER_INTERNAL_COM_PORT:-1} + kubernetes: + namespace: ${ZIPKIN_CLUSTER_K8S_NAMESPACE:default} + labelSelector: ${ZIPKIN_CLUSTER_K8S_LABEL:app=collector,release=zipkin} + uidEnvName: ${ZIPKIN_CLUSTER_K8S_UID:ZIPKIN_COLLECTOR_UID} + consul: + serviceName: ${ZIPKIN_SERVICE_NAME:"Zipkin_Cluster"} + # Consul cluster nodes, example: 10.0.0.1:8500,10.0.0.2:8500,10.0.0.3:8500 + hostPort: ${ZIPKIN_CLUSTER_CONSUL_HOST_PORT:localhost:8500} + aclToken: ${ZIPKIN_CLUSTER_CONSUL_ACLTOKEN:""} + internalComHost: ${ZIPKIN_CLUSTER_INTERNAL_COM_HOST:""} + internalComPort: ${ZIPKIN_CLUSTER_INTERNAL_COM_PORT:-1} + etcd: + # etcd cluster nodes, example: 10.0.0.1:2379,10.0.0.2:2379,10.0.0.3:2379 + endpoints: ${ZIPKIN_CLUSTER_ETCD_ENDPOINTS:localhost:2379} + namespace: ${ZIPKIN_CLUSTER_ETCD_NAMESPACE:/zipkin} + serviceName: ${ZIPKIN_CLUSTER_ETCD_SERVICE_NAME:"Zipkin_Cluster"} + authentication: ${ZIPKIN_CLUSTER_ETCD_AUTHENTICATION:false} + user: ${ZIPKIN_CLUSTER_ETCD_USER:} + password: ${ZIPKIN_CLUSTER_ETCD_PASSWORD:} + internalComHost: ${ZIPKIN_CLUSTER_INTERNAL_COM_HOST:""} + internalComPort: ${ZIPKIN_CLUSTER_INTERNAL_COM_PORT:-1} + nacos: + serviceName: ${ZIPKIN_SERVICE_NAME:"Zipkin_Cluster"} + hostPort: ${ZIPKIN_CLUSTER_NACOS_HOST_PORT:localhost:8848} + # Nacos Configuration namespace + namespace: ${ZIPKIN_CLUSTER_NACOS_NAMESPACE:"public"} + # Nacos auth username + username: ${ZIPKIN_CLUSTER_NACOS_USERNAME:""} + password: ${ZIPKIN_CLUSTER_NACOS_PASSWORD:""} + # Nacos auth accessKey + accessKey: ${ZIPKIN_CLUSTER_NACOS_ACCESSKEY:""} + secretKey: ${ZIPKIN_CLUSTER_NACOS_SECRETKEY:""} + internalComHost: ${ZIPKIN_CLUSTER_INTERNAL_COM_HOST:""} + internalComPort: ${ZIPKIN_CLUSTER_INTERNAL_COM_PORT:-1} \ No newline at end of file