Skip to content

Commit

Permalink
cassandra: fail at startup if search enabled, but SASI disabled (#3741)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
codefromthecrypt authored Mar 7, 2024
1 parent e535720 commit b6f5f7f
Show file tree
Hide file tree
Showing 16 changed files with 305 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.internal.ClosedComponentException;

import static com.linecorp.armeria.common.HttpStatus.BAD_REQUEST;
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -29,7 +30,10 @@ public HttpResponse handleException(ServiceRequestContext ctx, HttpRequest req,
if (cause instanceof IllegalArgumentException) {
return HttpResponse.of(BAD_REQUEST, ANY_TEXT_TYPE, message);
} else {
LOGGER.warn("Unexpected error handling request.", cause);
// Don't fill logs with exceptions about closed components.
if (!(cause instanceof ClosedComponentException)) {
LOGGER.warn("Unexpected error handling {} {}", req.method(), req.path());
}

return HttpResponse.of(INTERNAL_SERVER_ERROR, ANY_TEXT_TYPE, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
*/
package zipkin2.server.internal.health;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.Component;
import zipkin2.internal.ClosedComponentException;
import zipkin2.internal.Nullable;

final class ComponentHealth {
static final Logger LOGGER = LoggerFactory.getLogger(ComponentHealth.class);
static final String STATUS_UP = "UP", STATUS_DOWN = "DOWN";

static ComponentHealth ofComponent(Component component) {
Expand All @@ -23,7 +27,13 @@ static ComponentHealth ofComponent(Component component) {
}
if (t == null) return new ComponentHealth(component.toString(), STATUS_UP, null);
String message = t.getMessage();
String error = t.getClass().getName() + (message != null ? ": " + message : "");
String error;
if (t instanceof ClosedComponentException) {
error = message;
} else {
error = t.getClass().getSimpleName() + (message != null ? ": " + message : "");
}
LOGGER.debug(error);
return new ComponentHealth(component.toString(), STATUS_DOWN, error);
}

Expand Down
2 changes: 2 additions & 0 deletions zipkin-server/src/main/resources/zipkin-server-shared.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ logging:
com.datastax.oss.driver.internal.core.DefaultMavenCoordinates: 'WARN'
# We exclude Geo codec and Graph extensions to keep size down
com.datastax.oss.driver.internal.core.context.InternalDriverContext: 'WARN'
# Avoid logs about adding handlers
com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor: 'WARN'
# Use of native clocks in Cassandra is not insightful
com.datastax.oss.driver.internal.core.time.Clock: 'WARN'
# Unless it's serious we don't want to know
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ class ComponentHealthTest {
}
});

assertThat(health.error)
.isEqualTo("java.io.IOException: socket disconnect");
assertThat(health.error).isEqualTo("IOException: socket disconnect");
}

@Test void doesntAddNullMessageToDetails() {
Expand All @@ -31,7 +30,6 @@ class ComponentHealthTest {
}
});

assertThat(health.error)
.isEqualTo("com.linecorp.armeria.common.ClosedSessionException");
assertThat(health.error).isEqualTo("ClosedSessionException");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,19 @@ class CassandraSpanStore implements SpanStore, Traces, ServiceAndSpanNames { //n
@Nullable final SelectTraceIdsFromServiceRemoteService.Factory traceIdsFromServiceRemoteService;

CassandraSpanStore(CassandraStorage storage) {
CqlSession session = storage.session();
Schema.Metadata metadata = storage.metadata();
int maxTraceCols = storage.maxTraceCols;
indexFetchMultiplier = storage.indexFetchMultiplier;
boolean strictTraceId = storage.strictTraceId;
searchEnabled = storage.searchEnabled;
this(storage.session(),
storage.metadata(),
Schema.ensureKeyspaceMetadata(storage.session(), storage.keyspace),
storage.maxTraceCols,
storage.indexFetchMultiplier,
storage.strictTraceId,
storage.searchEnabled);
}

CassandraSpanStore(CqlSession session, Schema.Metadata metadata, KeyspaceMetadata keyspace,
int maxTraceCols, int indexFetchMultiplier, boolean strictTraceId, boolean searchEnabled) {
this.indexFetchMultiplier = indexFetchMultiplier;
this.searchEnabled = searchEnabled;
spans = new SelectFromSpan.Factory(session, strictTraceId, maxTraceCols);
dependencies = new SelectDependencies.Factory(session);

Expand All @@ -72,8 +78,7 @@ class CassandraSpanStore implements SpanStore, Traces, ServiceAndSpanNames { //n
return;
}

KeyspaceMetadata md = Schema.ensureKeyspaceMetadata(session, storage.keyspace);
indexTtl = KeyspaceMetadataUtil.getDefaultTtl(md, TABLE_TRACE_BY_SERVICE_SPAN);
indexTtl = KeyspaceMetadataUtil.getDefaultTtl(keyspace, TABLE_TRACE_BY_SERVICE_SPAN);
serviceNames = new SelectServiceNames.Factory(session).create();
if (metadata.hasRemoteService) {
remoteServiceNames = new SelectRemoteServiceNames.Factory(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import java.util.Set;
import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.internal.ClosedComponentException;
import zipkin2.internal.Nullable;
import zipkin2.storage.AutocompleteTags;
import zipkin2.storage.ServiceAndSpanNames;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.Traces;
import zipkin2.storage.cassandra.internal.CassandraStorageBuilder;
import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall;

/**
Expand Down Expand Up @@ -47,8 +47,6 @@ public static Builder newBuilder() {
}

public static final class Builder extends CassandraStorageBuilder<Builder> {
SessionFactory sessionFactory = SessionFactory.DEFAULT;

Builder() {
super(Schema.DEFAULT_KEYSPACE);
}
Expand All @@ -70,22 +68,8 @@ public static final class Builder extends CassandraStorageBuilder<Builder> {
return super.ensureSchema(ensureSchema);
}

/** Override to control how sessions are created. */
public Builder sessionFactory(SessionFactory sessionFactory) {
if (sessionFactory == null) throw new NullPointerException("sessionFactory == null");
this.sessionFactory = sessionFactory;
return this;
}

@Override public CassandraStorage build() {
AuthProvider authProvider = null;
if (username != null) {
authProvider = new ProgrammaticPlainTextAuthProvider(username, password);
}
return new CassandraStorage(strictTraceId, searchEnabled, autocompleteKeys, autocompleteTtl,
autocompleteCardinality, contactPoints, localDc, poolingOptions(), authProvider, useSsl,
sslHostnameValidation, sessionFactory, keyspace, ensureSchema, maxTraceCols,
indexFetchMultiplier);
return new CassandraStorage(this);
}
}

Expand All @@ -99,39 +83,36 @@ autocompleteCardinality, contactPoints, localDc, poolingOptions(), authProvider,
final boolean useSsl;
final boolean sslHostnameValidation;
final String keyspace;
final boolean ensureSchema;

final int maxTraceCols, indexFetchMultiplier;

final LazySession session;

CassandraStorage(boolean strictTraceId, boolean searchEnabled, Set<String> autocompleteKeys,
int autocompleteTtl, int autocompleteCardinality, String contactPoints, String localDc,
Map<DriverOption, Integer> poolingOptions, AuthProvider authProvider, boolean useSsl,
boolean sslHostnameValidation, SessionFactory sessionFactory, String keyspace,
boolean ensureSchema, int maxTraceCols, int indexFetchMultiplier) {
CassandraStorage(CassandraStorageBuilder<?> builder) {
// Assign generic configuration for all storage components
this.strictTraceId = strictTraceId;
this.searchEnabled = searchEnabled;
this.autocompleteKeys = autocompleteKeys;
this.autocompleteTtl = autocompleteTtl;
this.autocompleteCardinality = autocompleteCardinality;
this.strictTraceId = builder.strictTraceId;
this.searchEnabled = builder.searchEnabled;
this.autocompleteKeys = builder.autocompleteKeys;
this.autocompleteTtl = builder.autocompleteTtl;
this.autocompleteCardinality = builder.autocompleteCardinality;

// Assign configuration used to create a session
this.contactPoints = contactPoints;
this.localDc = localDc;
this.poolingOptions = poolingOptions;
this.authProvider = authProvider;
this.useSsl = useSsl;
this.sslHostnameValidation = sslHostnameValidation;
this.ensureSchema = ensureSchema;
this.keyspace = keyspace;
this.contactPoints = builder.contactPoints;
this.localDc = builder.localDc;
this.poolingOptions = builder.poolingOptions();
if (builder.username != null) {
this.authProvider = new ProgrammaticPlainTextAuthProvider(builder.username, builder.password);
} else {
this.authProvider = null;
}
this.useSsl = builder.useSsl;
this.sslHostnameValidation = builder.sslHostnameValidation;
this.keyspace = builder.keyspace;

// Assign configuration used to control queries
this.maxTraceCols = maxTraceCols;
this.indexFetchMultiplier = indexFetchMultiplier;
this.maxTraceCols = builder.maxTraceCols;
this.indexFetchMultiplier = builder.indexFetchMultiplier;

this.session = new LazySession(sessionFactory, this);
this.session = new LazySession(this, builder.sessionFactory, builder.ensureSchema);
}

/** close is typically called from a different thread */
Expand Down Expand Up @@ -202,7 +183,7 @@ Schema.Metadata metadata() {
}

@Override public CheckResult check() {
if (closeCalled) throw new IllegalStateException("closed");
if (closeCalled) throw new ClosedComponentException();
try {
session.healthCheck();
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,45 @@
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.storage.cassandra.internal;
package zipkin2.storage.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverOption;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import zipkin2.internal.Nullable;
import zipkin2.storage.QueryRequest;
import zipkin2.storage.StorageComponent;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_MAX_REQUESTS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE;

public abstract class CassandraStorageBuilder<B extends CassandraStorageBuilder<B>>
abstract class CassandraStorageBuilder<B extends CassandraStorageBuilder<B>>
extends StorageComponent.Builder {
protected boolean strictTraceId = true, searchEnabled = true;
protected Set<String> autocompleteKeys = Set.of();
protected int autocompleteTtl = (int) TimeUnit.HOURS.toMillis(1);
protected int autocompleteCardinality = 5 * 4000; // Ex. 5 site tags with cardinality 4000 each

protected String contactPoints = "localhost";
CassandraStorage.SessionFactory sessionFactory = CassandraStorage.SessionFactory.DEFAULT;
boolean strictTraceId = true, searchEnabled = true;
Set<String> autocompleteKeys = Set.of();
int autocompleteTtl = (int) TimeUnit.HOURS.toMillis(1);
int autocompleteCardinality = 5 * 4000; // Ex. 5 site tags with cardinality 4000 each

String contactPoints = "localhost";
// Driver v4 requires this, so take a guess! When we are wrong, the user can override anyway
protected String localDc = "datacenter1";
@Nullable protected String username, password;
protected boolean useSsl = false;
protected boolean sslHostnameValidation = true;
String localDc = "datacenter1";
@Nullable String username, password;
boolean useSsl = false;
boolean sslHostnameValidation = true;

protected String keyspace;
protected boolean ensureSchema = true;
String keyspace;
BiFunction<CassandraStorage, CqlSession, Schema.Metadata> ensureSchema = Schema::ensure;

protected int maxTraceCols = 100_000;
protected int indexFetchMultiplier = 3;
int maxTraceCols = 100_000;
int indexFetchMultiplier = 3;

// Zipkin collectors can create out a lot of async requests in bursts, so we
// increase some properties beyond the norm.
Expand All @@ -49,14 +51,14 @@ public abstract class CassandraStorageBuilder<B extends CassandraStorageBuilder<
// Ported from java-driver v3 PoolingOptions.setMaxQueueSize(40960)
final int maxRequestsPerConnection = 40960 / poolLocalSize;

protected Map<DriverOption, Integer> poolingOptions() {
Map<DriverOption, Integer> poolingOptions() {
Map<DriverOption, Integer> result = new LinkedHashMap<>();
result.put(CONNECTION_POOL_LOCAL_SIZE, poolLocalSize);
result.put(CONNECTION_MAX_REQUESTS, maxRequestsPerConnection);
return result;
}

protected CassandraStorageBuilder(String defaultKeyspace) {
CassandraStorageBuilder(String defaultKeyspace) {
keyspace = defaultKeyspace;
}

Expand Down Expand Up @@ -148,8 +150,19 @@ public B keyspace(String keyspace) {
return (B) this;
}

/** Override to control how sessions are created. */
public B sessionFactory(CassandraStorage.SessionFactory sessionFactory) {
if (sessionFactory == null) throw new NullPointerException("sessionFactory == null");
this.sessionFactory = sessionFactory;
return (B) this;
}

public B ensureSchema(boolean ensureSchema) {
this.ensureSchema = ensureSchema;
if (ensureSchema) {
this.ensureSchema = Schema::ensure;
} else {
this.ensureSchema = Schema::validate;
}
return (B) this;
}

Expand Down
Loading

0 comments on commit b6f5f7f

Please sign in to comment.