Skip to content

Commit

Permalink
Add usesTablets property to KeyspaceMetadata
Browse files Browse the repository at this point in the history
Adds the field to `KeyspaceMetadata` that informs if the keyspace has tablets
enabled. It is initialized with value `false` and updated to `true` only if
the keyspace is discovered in `system_schema.scylla_keyspaces` and its entry
has non-null value in `initial_tablets` column. The update happens on schema
refresh.

Updates `Metadata#getReplicas` to use this keyspace metadata information to
decide whether to look for replicas in the token map or tablet map.

Fixes #380.
  • Loading branch information
Bouncheck committed Jan 7, 2025
1 parent f4de8e7 commit 788c35f
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class KeyspaceMetadata {
final Map<String, AggregateMetadata> aggregates =
new ConcurrentHashMap<String, AggregateMetadata>();

// Scylla feature
private boolean usesTablets = false;

@VisibleForTesting
@Deprecated
KeyspaceMetadata(String name, boolean durableWrites, Map<String, String> replication) {
Expand Down Expand Up @@ -458,4 +461,12 @@ void add(UserType type) {
ReplicationStrategy replicationStrategy() {
return strategy;
}

void setUsesTablets(boolean predicate) {
this.usesTablets = predicate;
}

public boolean usesTablets() {
return this.usesTablets;
}
}
35 changes: 20 additions & 15 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -558,26 +558,31 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
public Set<Host> getReplicas(
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) {
keyspace = handleId(keyspace);
table = handleId(table);
TokenMap current = tokenMap;
if (current == null) {
return Collections.emptySet();
} else {
if (partitioner == null) {
partitioner = current.factory;
}
// If possible, try tablet lookup first
Token token = null;
if (partitioner == null && current != null) {
partitioner = current.factory;
}
if (partitioner != null) {
token = partitioner.hash(partitionKey);
}

// Tablets:
KeyspaceMetadata ksMetadata = getKeyspace(keyspace);
if (ksMetadata != null && ksMetadata.usesTablets()) {
if (keyspace != null && table != null) {
Token token = partitioner.hash(partitionKey);
assert (token instanceof Token.TokenLong64);
Set<Host> replicas = tabletMap.getReplicas(keyspace, table, (long) token.getValue());
if (!replicas.isEmpty()) {
return replicas;
}
return tabletMap.getReplicas(keyspace, table, (long) token.getValue());
} else {
return Collections.emptySet();
}
// Fall back to tokenMap
Set<Host> hosts = current.getReplicas(keyspace, partitioner.hash(partitionKey));
return hosts == null ? Collections.<Host>emptySet() : hosts;
}

// TokenMap:
if (current == null) return Collections.<Host>emptySet();
Set<Host> hosts = current.getReplicas(keyspace, token);
return hosts == null ? Collections.<Host>emptySet() : hosts;
}

/**
Expand Down
134 changes: 126 additions & 8 deletions driver-core/src/main/java/com/datastax/driver/core/SchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@

import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +59,9 @@ abstract class SchemaParser {
private static final SchemaParser V3_PARSER = new V3SchemaParser();
private static final SchemaParser V4_PARSER = new V4SchemaParser();

private static final String SELECT_SCYLLA_KEYSPACES =
"SELECT * FROM system_schema.scylla_keyspaces";

static SchemaParser forVersion(VersionNumber cassandraVersion) {
if (cassandraVersion.getMajor() >= 4) return V4_PARSER;
if (cassandraVersion.getMajor() >= 3) return V3_PARSER;
Expand Down Expand Up @@ -197,7 +203,6 @@ void refresh(

private Map<String, KeyspaceMetadata> buildKeyspaces(
SystemRows rows, VersionNumber cassandraVersion, Cluster cluster) {

Map<String, KeyspaceMetadata> keyspaces = new LinkedHashMap<String, KeyspaceMetadata>();
for (Row keyspaceRow : rows.keyspaces) {
KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion);
Expand Down Expand Up @@ -239,6 +244,13 @@ private Map<String, KeyspaceMetadata> buildKeyspaces(
for (MaterializedViewMetadata view : views.values()) {
keyspace.add(view);
}
Row scyllaKeyspacesRow = rows.scyllaKeyspaces.getOrDefault(keyspace.getName(), null);
if (scyllaKeyspacesRow != null) {
if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets")
&& !scyllaKeyspacesRow.isNull("initial_tablets")) {
keyspace.setUsesTablets(true);
}
}
keyspaces.put(keyspace.getName(), keyspace);
}
if (rows.virtualKeyspaces != null) {
Expand Down Expand Up @@ -619,6 +631,29 @@ private void updateViews(
}
}

static Set<String> toKeyspaceSet(ResultSet rs) {
if (rs == null) return Collections.emptySet();

Set<String> result = new HashSet<>();
for (Row row : rs) {
result.add(row.getString(KeyspaceMetadata.KS_NAME));
}
return result;
}

static Map<String, Row> groupByKeyspacePk(ResultSet rs) {
// Assumes keyspace name is full primary key, therefore
// each keyspace name identifies at most one row
if (rs == null) return Collections.emptyMap();

Map<String, Row> result = new HashMap<String, Row>();
for (Row row : rs) {
String ksName = row.getString(KeyspaceMetadata.KS_NAME);
result.put(ksName, row);
}
return result;
}

static Map<String, List<Row>> groupByKeyspace(ResultSet rs) {
if (rs == null) return Collections.emptyMap();

Expand Down Expand Up @@ -696,6 +731,25 @@ private static ResultSet get(ResultSetFuture future)
return (future == null) ? null : future.get();
}

private static ResultSet getIfExists(ResultSetFuture future)
throws InterruptedException, ExecutionException {
// Some of Scylla specific tables/columns may not exist depending on version.
// This method is meant to try to get results without failing whole schema parse
// if something additional does not exist.
if (future == null) return null;
try {
ResultSet resultSet = future.get();
return resultSet;
} catch (ExecutionException ex) {
if (ex.getCause() instanceof InvalidQueryException) {
// meant to handle keyspace/table does not exist exceptions
return null;
}
// rethrow if it's something else
throw ex;
}
}

/**
* The rows from the system tables that we want to parse to metadata classes. The format of these
* rows depends on the Cassandra version, but our parsing code knows how to handle the
Expand All @@ -713,6 +767,7 @@ private static class SystemRows {
final ResultSet virtualKeyspaces;
final Map<String, List<Row>> virtualTables;
final Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns;
final Map<String, Row> scyllaKeyspaces;

public SystemRows(
ResultSet keyspaces,
Expand All @@ -725,7 +780,8 @@ public SystemRows(
Map<String, Map<String, List<Row>>> indexes,
ResultSet virtualKeyspaces,
Map<String, List<Row>> virtualTables,
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns) {
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns,
Map<String, Row> scyllaKeyspaces) {
this.keyspaces = keyspaces;
this.tables = tables;
this.columns = columns;
Expand All @@ -737,6 +793,7 @@ public SystemRows(
this.virtualKeyspaces = virtualKeyspaces;
this.virtualTables = virtualTables;
this.virtualColumns = virtualColumns;
this.scyllaKeyspaces = scyllaKeyspaces;
}
}

Expand Down Expand Up @@ -790,7 +847,8 @@ else if (targetType == AGGREGATE)
cfFuture = null,
colsFuture = null,
functionsFuture = null,
aggregatesFuture = null;
aggregatesFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand All @@ -812,6 +870,21 @@ else if (targetType == AGGREGATE)
if (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == AGGREGATE)
aggregatesFuture = queryAsync(SELECT_AGGREGATES + whereClause, connection, protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -824,7 +897,8 @@ else if (targetType == AGGREGATE)
Collections.<String, Map<String, List<Row>>>emptyMap(),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1197,9 +1271,19 @@ private Map<String, KeyspaceMetadata> buildSchema(
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();

Map<String, KeyspaceMetadata> keyspaces = new LinkedHashMap<String, KeyspaceMetadata>();
ResultSetFuture scyllaKeyspacesFuture =
queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
ResultSet keyspacesData = queryAsync(SELECT_KEYSPACES, connection, protocolVersion).get();
Map<String, Row> scyllaKeyspacesData = groupByKeyspacePk(getIfExists(scyllaKeyspacesFuture));
for (Row keyspaceRow : keyspacesData) {
KeyspaceMetadata keyspace = KeyspaceMetadata.build(keyspaceRow, cassandraVersion);
Row scyllaKeyspacesRow = scyllaKeyspacesData.getOrDefault(keyspace.getName(), null);
if (scyllaKeyspacesRow != null) {
if (scyllaKeyspacesRow.getColumnDefinitions().contains("initial_tablets")
&& !scyllaKeyspacesRow.isNull("initial_tablets")) {
keyspace.setUsesTablets(true);
}
}
keyspaces.put(keyspace.getName(), keyspace);
}

Expand Down Expand Up @@ -1288,7 +1372,8 @@ SystemRows fetchSystemRows(
functionsFuture = null,
aggregatesFuture = null,
indexesFuture = null,
viewsFuture = null;
viewsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1356,6 +1441,21 @@ SystemRows fetchSystemRows(
connection,
protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1367,7 +1467,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1499,7 +1600,8 @@ SystemRows fetchSystemRows(
viewsFuture = null,
virtualKeyspacesFuture = null,
virtualTableFuture = null,
virtualColumnsFuture = null;
virtualColumnsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1589,6 +1691,21 @@ SystemRows fetchSystemRows(
protocolVersion);
}

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1600,7 +1717,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
get(virtualKeyspacesFuture),
groupByKeyspace(get(virtualTableFuture)),
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME));
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME),
groupByKeyspacePk(getIfExists(scyllaKsFuture)));
}
}
}

0 comments on commit 788c35f

Please sign in to comment.