Skip to content

Commit

Permalink
Support Cassandra Storage in v3
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Oct 7, 2023
1 parent ffbffd0 commit f6c2a53
Show file tree
Hide file tree
Showing 35 changed files with 3,539 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-v3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
- name: receiver-zipkin-kafka
- name: receiver-zipkin-rabbitmq
- name: receiver-zipkin-scribe
- name: storage-cassandra
steps:
- name: Checkout Repository
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<module>receiver-zipkin-activemq</module>
<module>receiver-zipkin-rabbitmq</module>
<module>receiver-zipkin-scribe</module>
<module>storage-cassandra</module>
</modules>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -64,6 +64,16 @@ public class CoreModuleConfig extends ModuleConfig {
*/
private int traceSampleRate = 10000;

/**
* The number of threads used to prepare metrics data to the storage.
*/
private int prepareThreads = 2;

/**
* The period of doing data persistence. Unit is second.
*/
private int persistentPeriod = 25;

private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
Const.COMMA,
"http.method"
Expand All @@ -72,6 +82,8 @@ public class CoreModuleConfig extends ModuleConfig {
public org.apache.skywalking.oap.server.core.CoreModuleConfig toSkyWalkingConfig() {
final org.apache.skywalking.oap.server.core.CoreModuleConfig result = new org.apache.skywalking.oap.server.core.CoreModuleConfig();
result.setServiceCacheRefreshInterval(serviceCacheRefreshInterval);
result.setPrepareThreads(prepareThreads);
result.setPersistentPeriod(persistentPeriod);
return result;
}

Expand Down Expand Up @@ -154,4 +166,20 @@ public int getTraceSampleRate() {
public void setTraceSampleRate(int traceSampleRate) {
this.traceSampleRate = traceSampleRate;
}

public int getPrepareThreads() {
return prepareThreads;
}

public void setPrepareThreads(int prepareThreads) {
this.prepareThreads = prepareThreads;
}

public int getPersistentPeriod() {
return persistentPeriod;
}

public void setPersistentPeriod(int persistentPeriod) {
this.persistentPeriod = persistentPeriod;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,6 +16,7 @@
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
Expand Down Expand Up @@ -50,13 +51,17 @@
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
Expand All @@ -69,19 +74,23 @@
import zipkin.server.core.services.EmptyGRPCHandlerRegister;
import zipkin.server.core.services.EmptyHTTPHandlerRegister;
import zipkin.server.core.services.EmptyNetworkAddressAliasCache;
import zipkin.server.core.services.SelfSenderService;
import zipkin.server.core.services.ZipkinConfigService;

import java.io.IOException;
import java.util.Collections;

public class CoreModuleProvider extends ModuleProvider {
private CoreModuleConfig moduleConfig;

private EndpointNameGrouping endpointNameGrouping;
private final SourceReceiverImpl receiver;
private final ZipkinSourceReceiverImpl receiver;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;

public CoreModuleProvider() {
this.receiver = new SourceReceiverImpl();
this.annotationScan = new AnnotationScan();
this.receiver = new ZipkinSourceReceiverImpl();
this.storageModels = new StorageModels();
}

Expand Down Expand Up @@ -121,6 +130,16 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
);
this.registerServiceImplementation(NamingControl.class, namingControl);

annotationScan.registerListener(new ZipkinStreamAnnotationListener(getManager()));

AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener());
try {
scopeScan.scan();
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
Expand All @@ -133,8 +152,8 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
final WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
// no cluster mode for zipkin, for sending the streaming data to the local
this.registerServiceImplementation(RemoteSenderService.class, new SelfSenderService(getManager()));
this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(ModelManipulator.class, storageModels);
Expand Down Expand Up @@ -182,12 +201,19 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {

try {
receiver.scan();
annotationScan.scan();
} catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
PersistenceTimer.INSTANCE.start(getManager(), swConfig);
DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteDispatcher;

public class ZipkinDispatcherManager extends DispatcherManager {

@Override
public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteDispatcher.class)) {
super.addIfAsSourceDispatcher(aClass);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;

import java.io.IOException;

public class ZipkinSourceReceiverImpl implements SourceReceiver {
private final ZipkinDispatcherManager mgr;

public ZipkinSourceReceiverImpl() {
mgr = new ZipkinDispatcherManager();
}

@Override
public void receive(ISource source) {
mgr.forward(source);
}

@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return mgr;
}

public void scan() throws IOException, IllegalAccessException, InstantiationException {
mgr.scan();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core;

import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagAutocompleteData;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;

public class ZipkinStreamAnnotationListener extends StreamAnnotationListener {

public ZipkinStreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
}

@Override
public void notify(Class aClass) throws StorageException {
// only including all zipkin streaming
if (aClass.getSimpleName().startsWith("Zipkin") || aClass.equals(TagAutocompleteData.class)) {
super.notify(aClass);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.core.services;

import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.SelfRemoteClient;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.library.module.ModuleManager;

public class SelfSenderService extends RemoteSenderService {
private final ModuleManager moduleManager;
private SelfRemoteClient self;

public SelfSenderService(ModuleManager moduleManager) {
super(moduleManager);
this.moduleManager = moduleManager;
}

private SelfRemoteClient getSelf() {
if (self == null) {
self = new SelfRemoteClient(moduleManager, new Address("127.0.0.1", 0, true));
}
return self;
}

@Override
public void send(String nextWorkName, StreamData streamData, Selector selector) {
getSelf().push(nextWorkName, streamData);
}
}
5 changes: 5 additions & 0 deletions zipkin-server/server-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${skywalking.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>storage-cassandra</artifactId>
<version>${project.version}</version>
</dependency>

<!-- zipkin receiver -->
<dependency>
Expand Down
19 changes: 19 additions & 0 deletions zipkin-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ core:
searchableTracesTags: ${ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
# The trace sample rate precision is 1/10000, should be between 0 and 10000
traceSampleRate: ${ZIPKIN_SAMPLE_RATE:10000}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}

storage:
selector: ${ZIPKIN_STORAGE:h2}
Expand Down Expand Up @@ -125,6 +129,21 @@ storage:
superDatasetBlockIntervalHours: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_BLOCK_INTERVAL_HOURS:4} # Unit is hour
superDatasetSegmentIntervalDays: ${ZIPKIN_STORAGE_BANYANDB_SUPER_DATASET_SEGMENT_INTERVAL_DAYS:1} # Unit is day
specificGroupSettings: ${ZIPKIN_STORAGE_BANYANDB_SPECIFIC_GROUP_SETTINGS:""} # For example, {"group1": {"blockIntervalHours": 4, "segmentIntervalDays": 1}}
cassandra:
keyspace: ${ZIPKIN_STORAGE_CASSANDRA_KEYSPACE:zipkin}
# Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'.
contactPoints: ${ZIPKIN_STORAGE_CASSANDRA_CONTACT_POINTS:localhost}
# Name of the datacenter that will be considered "local" for load balancing.
localDc: ${ZIPKIN_STORAGE_CASSANDRA_LOCAL_DC:datacenter1}
# Will throw an exception on startup if authentication fails.
username: ${ZIPKIN_STORAGE_CASSANDRA_USERNAME:}
password: ${ZIPKIN_STORAGE_CASSANDRA_PASSWORD:}
# Max pooled connections per datacenter-local host.
maxConnections: ${ZIPKIN_STORAGE_CASSANDRA_MAX_CONNECTIONS:8}
# Using ssl for connection, rely on Keystore
use-ssl: ${ZIPKIN_STORAGE_CASSANDRA_USE_SSL:false}
maxSizeOfBatchCql: ${ZIPKIN_STORAGE_CASSANDRA_MAX_SIZE_OF_BATCH_CQL:2000}
asyncBatchPersistentPoolSize: ${ZIPKIN_STORAGE_CASSANDRA_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}

receiver-zipkin-http:
selector: ${ZIPKIN_RECEIVER_ZIPKIN_HTTP:default}
Expand Down
Loading

0 comments on commit f6c2a53

Please sign in to comment.