diff --git a/.github/workflows/test-v3.yml b/.github/workflows/test-v3.yml
index 4af10d7eb68..05e6c080c21 100644
--- a/.github/workflows/test-v3.yml
+++ b/.github/workflows/test-v3.yml
@@ -52,6 +52,7 @@ jobs:
- name: receiver-zipkin-kafka
- name: receiver-zipkin-rabbitmq
- name: receiver-zipkin-scribe
+ - name: storage-cassandra
steps:
- name: Checkout Repository
uses: actions/checkout@v2
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index 2415d960f4c..4986f705667 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -62,6 +62,7 @@
receiver-zipkin-activemqreceiver-zipkin-rabbitmqreceiver-zipkin-scribe
+ storage-cassandra
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
index 79bca876bd1..e93e1a1cfd8 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleConfig.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-2019 The OpenZipkin Authors
+ * Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -64,6 +64,16 @@ public class CoreModuleConfig extends ModuleConfig {
*/
private int traceSampleRate = 10000;
+ /**
+ * The number of threads used to prepare metrics data to the storage.
+ */
+ private int prepareThreads = 2;
+
+ /**
+ * The period of doing data persistence. Unit is second.
+ */
+ private int persistentPeriod = 25;
+
private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
Const.COMMA,
"http.method"
@@ -72,6 +82,8 @@ public class CoreModuleConfig extends ModuleConfig {
public org.apache.skywalking.oap.server.core.CoreModuleConfig toSkyWalkingConfig() {
final org.apache.skywalking.oap.server.core.CoreModuleConfig result = new org.apache.skywalking.oap.server.core.CoreModuleConfig();
result.setServiceCacheRefreshInterval(serviceCacheRefreshInterval);
+ result.setPrepareThreads(prepareThreads);
+ result.setPersistentPeriod(persistentPeriod);
return result;
}
@@ -154,4 +166,20 @@ public int getTraceSampleRate() {
public void setTraceSampleRate(int traceSampleRate) {
this.traceSampleRate = traceSampleRate;
}
+
+ public int getPrepareThreads() {
+ return prepareThreads;
+ }
+
+ public void setPrepareThreads(int prepareThreads) {
+ this.prepareThreads = prepareThreads;
+ }
+
+ public int getPersistentPeriod() {
+ return persistentPeriod;
+ }
+
+ public void setPersistentPeriod(int persistentPeriod) {
+ this.persistentPeriod = persistentPeriod;
+ }
}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
index 75eb7781fdb..45542b73019 100644
--- a/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/CoreModuleProvider.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-2019 The OpenZipkin Authors
+ * Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,7 @@
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
+import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
@@ -50,13 +51,17 @@
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
+import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
+import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
+import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
@@ -69,19 +74,23 @@
import zipkin.server.core.services.EmptyGRPCHandlerRegister;
import zipkin.server.core.services.EmptyHTTPHandlerRegister;
import zipkin.server.core.services.EmptyNetworkAddressAliasCache;
+import zipkin.server.core.services.SelfSenderService;
import zipkin.server.core.services.ZipkinConfigService;
+import java.io.IOException;
import java.util.Collections;
public class CoreModuleProvider extends ModuleProvider {
private CoreModuleConfig moduleConfig;
private EndpointNameGrouping endpointNameGrouping;
- private final SourceReceiverImpl receiver;
+ private final ZipkinSourceReceiverImpl receiver;
+ private final AnnotationScan annotationScan;
private final StorageModels storageModels;
public CoreModuleProvider() {
- this.receiver = new SourceReceiverImpl();
+ this.annotationScan = new AnnotationScan();
+ this.receiver = new ZipkinSourceReceiverImpl();
this.storageModels = new StorageModels();
}
@@ -121,6 +130,16 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
);
this.registerServiceImplementation(NamingControl.class, namingControl);
+ annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager()));
+
+ AnnotationScan scopeScan = new AnnotationScan();
+ scopeScan.registerListener(new DefaultScopeDefine.Listener());
+ try {
+ scopeScan.scan();
+ } catch (Exception e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+
final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
@@ -133,8 +152,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
final WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
- this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
- this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
+ // no cluster mode for zipkin, for sending the streaming data to the local
+ this.registerServiceImplementation(RemoteSenderService.class, new SelfSenderService(getManager()));
this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(ModelManipulator.class, storageModels);
@@ -182,12 +201,19 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
-
+ try {
+ receiver.scan();
+ annotationScan.scan();
+ } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
-
+ final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
+ PersistenceTimer.INSTANCE.start(getManager(), swConfig);
+ DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig);
}
@Override
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java
new file mode 100644
index 00000000000..c4c33d79d2e
--- /dev/null
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinDispatcherManager.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.core;
+
+import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteDispatcher;
+
+public class ZipkinDispatcherManager extends DispatcherManager {
+
+ @Override
+ public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
+ if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteDispatcher.class)) {
+ super.addIfAsSourceDispatcher(aClass);
+ }
+ }
+}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java
new file mode 100644
index 00000000000..ce29494b666
--- /dev/null
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinSourceReceiverImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.core;
+
+import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
+import org.apache.skywalking.oap.server.core.source.ISource;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+
+import java.io.IOException;
+
+public class ZipkinSourceReceiverImpl implements SourceReceiver {
+ private final ZipkinDispatcherManager mgr;
+
+ public ZipkinSourceReceiverImpl() {
+ mgr = new ZipkinDispatcherManager();
+ }
+
+ @Override
+ public void receive(ISource source) {
+ mgr.forward(source);
+ }
+
+ @Override
+ public DispatcherDetectorListener getDispatcherDetectorListener() {
+ return mgr;
+ }
+
+ public void scan() throws IOException, IllegalAccessException, InstantiationException {
+ mgr.scan();
+ }
+}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java
new file mode 100644
index 00000000000..a1ba4398eb6
--- /dev/null
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/ZipkinStreamAnnotationListener.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.core;
+
+import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
+
+public class ZipkinStreamAnnotationListener extends StreamAnnotationListener {
+
+ public ZipkinStreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
+ super(moduleDefineHolder);
+ }
+
+ @Override
+ public void notify(Class aClass) throws StorageException {
+ // only including all zipkin streaming
+ if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteData.class)) {
+ super.notify(aClass);
+ }
+ }
+}
diff --git a/zipkin-server/server-core/src/main/java/zipkin/server/core/services/SelfSenderService.java b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/SelfSenderService.java
new file mode 100644
index 00000000000..b94a720da87
--- /dev/null
+++ b/zipkin-server/server-core/src/main/java/zipkin/server/core/services/SelfSenderService.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015-2023 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package zipkin.server.core.services;
+
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.core.remote.client.SelfRemoteClient;
+import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+public class SelfSenderService extends RemoteSenderService {
+ private final ModuleManager moduleManager;
+ private SelfRemoteClient self;
+
+ public SelfSenderService(ModuleManager moduleManager) {
+ super(moduleManager);
+ this.moduleManager = moduleManager;
+ }
+
+ private SelfRemoteClient getSelf() {
+ if (self == null) {
+ self = new SelfRemoteClient(moduleManager, new Address("127.0.0.1", 0, true));
+ }
+ return self;
+ }
+
+ @Override
+ public void send(String nextWorkName, StreamData streamData, Selector selector) {
+ getSelf().push(nextWorkName, streamData);
+ }
+}
diff --git a/zipkin-server/server-starter/pom.xml b/zipkin-server/server-starter/pom.xml
index 7f4344dfe8b..5422c60061d 100644
--- a/zipkin-server/server-starter/pom.xml
+++ b/zipkin-server/server-starter/pom.xml
@@ -55,6 +55,11 @@
storage-elasticsearch-plugin${skywalking.version}
+
+ io.zipkin
+ storage-cassandra
+ ${project.version}
+
diff --git a/zipkin-server/server-starter/src/main/resources/application.yml b/zipkin-server/server-starter/src/main/resources/application.yml
index 60e03dce20a..91c8c99e50c 100644
--- a/zipkin-server/server-starter/src/main/resources/application.yml
+++ b/zipkin-server/server-starter/src/main/resources/application.yml
@@ -34,6 +34,10 @@ core:
searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The trace sample rate precision is 1/10000, should be between 0 and 10000
traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000}
+ # The number of threads used to prepare metrics data to the storage.
+ prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
+ # The period of doing data persistence. Unit is second.Default value is 25s
+ persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}
storage:
selector: ${ZIPKIN_STORAGE:h2}
@@ -125,6 +129,21 @@ storage:
superDatasetBlockIntervalHours: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_BLOCK_INTERVAL_HOURS:4} # Unit is hour
superDatasetSegmentIntervalDays: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_SEGMENT_INTERVAL_DAYS:1} # Unit is day
specificGroupSettings: ${ZIPKIN_STORAGE_BANYANDB_SPECIFIC_GROUP_SETTINGS:""} # For example, {"group1": {"blockIntervalHours": 4, "segmentIntervalDays": 1}}
+ cassandra:
+ keyspace: ${ZIPKIN_STORAGE_CASSANDRA_KEYSPACE:zipkin}
+ # Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'.
+ contactPoints: ${ZIPKIN_STORAGE_CASSANDRA_CONTACT_POINTS:localhost}
+ # Name of the datacenter that will be considered "local" for load balancing.
+ localDc: ${ZIPKIN_STORAGE_CASSANDRA_LOCAL_DC:datacenter1}
+ # Will throw an exception on startup if authentication fails.
+ username: ${ZIPKIN_STORAGE_CASSANDRA_USERNAME:}
+ password: ${ZIPKIN_STORAGE_CASSANDRA_PASSWORD:}
+ # Max pooled connections per datacenter-local host.
+ maxConnections: ${ZIPKIN_STORAGE_CASSANDRA_MAX_CONNECTIONS:8}
+ # Using ssl for connection, rely on Keystore
+ use-ssl: ${ZIPKIN_STORAGE_CASSANDRA_USE_SSL:false}
+ maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000}
+ asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
receiver-zipkin-http:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default}
diff --git a/zipkin-server/storage-cassandra/pom.xml b/zipkin-server/storage-cassandra/pom.xml
new file mode 100644
index 00000000000..3c96072f802
--- /dev/null
+++ b/zipkin-server/storage-cassandra/pom.xml
@@ -0,0 +1,73 @@
+
+
+ 4.0.0
+
+
+ zipkin-server-parent
+ io.zipkin
+ 2.24.4-SNAPSHOT
+
+
+ storage-cassandra
+ Storage: Cassandra
+
+
+
+ io.zipkin
+ zipkin-server-core
+ ${project.version}
+
+
+ org.apache.skywalking
+ storage-jdbc-hikaricp-plugin
+ ${skywalking.version}
+
+
+
+ io.zipkin
+ zipkin-server-core
+ ${project.version}
+
+
+
+ com.google.auto.value
+ auto-value-annotations
+ ${auto-value.version}
+
+
+ com.google.auto.value
+ auto-value
+ ${auto-value.version}
+ provided
+
+
+
+ com.datastax.oss
+ java-driver-core
+ ${java-driver.version}
+
+
+
+ com.esri.geometry
+ *
+
+
+ org.apache.tinkerpop
+ *
+
+
+
+
+
+
+ org.apache.skywalking
+ cluster-standalone-plugin
+ ${skywalking.version}
+ test
+
+
+
+
\ No newline at end of file
diff --git a/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CQLExecutor.java b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CQLExecutor.java
new file mode 100644
index 00000000000..46ea7c34ea4
--- /dev/null
+++ b/zipkin-server/storage-cassandra/src/main/java/zipkin/server/storage/cassandra/CQLExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package zipkin.server.storage.cassandra;
+
+import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CQLExecutor implements InsertRequest, UpdateRequest {
+ private final String cql;
+ private final List