Skip to content

Commit

Permalink
Add usesTablets property to KeyspaceMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Bouncheck committed Jan 3, 2025
1 parent f4de8e7 commit 0a108c2
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class KeyspaceMetadata {
final Map<String, AggregateMetadata> aggregates =
new ConcurrentHashMap<String, AggregateMetadata>();

// Scylla feature
private boolean usesTablets = false;

@VisibleForTesting
@Deprecated
KeyspaceMetadata(String name, boolean durableWrites, Map<String, String> replication) {
Expand Down Expand Up @@ -458,4 +461,12 @@ void add(UserType type) {
ReplicationStrategy replicationStrategy() {
return strategy;
}

void setUsesTablets(boolean predicate) {
this.usesTablets = predicate;
}

boolean usesTablets() {
return this.usesTablets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +58,9 @@ abstract class SchemaParser {
private static final SchemaParser V3_PARSER = new V3SchemaParser();
private static final SchemaParser V4_PARSER = new V4SchemaParser();

private static final String SELECT_SCYLLA_KEYSPACES =
"SELECT keyspace_name FROM system_schema.scylla_keyspaces";

static SchemaParser forVersion(VersionNumber cassandraVersion) {
if (cassandraVersion.getMajor() >= 4) return V4_PARSER;
if (cassandraVersion.getMajor() >= 3) return V3_PARSER;
Expand Down Expand Up @@ -239,6 +244,7 @@ private Map<String, KeyspaceMetadata> buildKeyspaces(
for (MaterializedViewMetadata view : views.values()) {
keyspace.add(view);
}
keyspace.setUsesTablets(rows.scyllaKeyspaces.contains(keyspace.getName()));
keyspaces.put(keyspace.getName(), keyspace);
}
if (rows.virtualKeyspaces != null) {
Expand Down Expand Up @@ -619,6 +625,16 @@ private void updateViews(
}
}

static Set<String> toKeyspaceSet(ResultSet rs) {
if (rs == null) return Collections.emptySet();

Set<String> result = new HashSet<>();
for (Row row : rs) {
result.add(row.getString(KeyspaceMetadata.KS_NAME));
}
return result;
}

static Map<String, List<Row>> groupByKeyspace(ResultSet rs) {
if (rs == null) return Collections.emptyMap();

Expand Down Expand Up @@ -713,6 +729,7 @@ private static class SystemRows {
final ResultSet virtualKeyspaces;
final Map<String, List<Row>> virtualTables;
final Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns;
final Set<String> scyllaKeyspaces;

public SystemRows(
ResultSet keyspaces,
Expand All @@ -725,7 +742,8 @@ public SystemRows(
Map<String, Map<String, List<Row>>> indexes,
ResultSet virtualKeyspaces,
Map<String, List<Row>> virtualTables,
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns) {
Map<String, Map<String, Map<String, ColumnMetadata.Raw>>> virtualColumns,
Set<String> scyllaKeyspaces) {
this.keyspaces = keyspaces;
this.tables = tables;
this.columns = columns;
Expand All @@ -737,6 +755,7 @@ public SystemRows(
this.virtualKeyspaces = virtualKeyspaces;
this.virtualTables = virtualTables;
this.virtualColumns = virtualColumns;
this.scyllaKeyspaces = scyllaKeyspaces;
}
}

Expand Down Expand Up @@ -790,7 +809,8 @@ else if (targetType == AGGREGATE)
cfFuture = null,
colsFuture = null,
functionsFuture = null,
aggregatesFuture = null;
aggregatesFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand All @@ -812,6 +832,21 @@ else if (targetType == AGGREGATE)
if (isSchemaOrKeyspace && supportsUdfs(cassandraVersion) || targetType == AGGREGATE)
aggregatesFuture = queryAsync(SELECT_AGGREGATES + whereClause, connection, protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -824,7 +859,8 @@ else if (targetType == AGGREGATE)
Collections.<String, Map<String, List<Row>>>emptyMap(),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
toKeyspaceSet(get(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1288,7 +1324,8 @@ SystemRows fetchSystemRows(
functionsFuture = null,
aggregatesFuture = null,
indexesFuture = null,
viewsFuture = null;
viewsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1356,6 +1393,21 @@ SystemRows fetchSystemRows(
connection,
protocolVersion);

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1367,7 +1419,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
null,
Collections.<String, List<Row>>emptyMap(),
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap());
Collections.<String, Map<String, Map<String, ColumnMetadata.Raw>>>emptyMap(),
toKeyspaceSet(get(scyllaKsFuture)));
}

@Override
Expand Down Expand Up @@ -1499,7 +1552,8 @@ SystemRows fetchSystemRows(
viewsFuture = null,
virtualKeyspacesFuture = null,
virtualTableFuture = null,
virtualColumnsFuture = null;
virtualColumnsFuture = null,
scyllaKsFuture = null;

ProtocolVersion protocolVersion =
cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
Expand Down Expand Up @@ -1589,6 +1643,21 @@ SystemRows fetchSystemRows(
protocolVersion);
}

if (isSchemaOrKeyspace) {
if (targetType == KEYSPACE) {
scyllaKsFuture =
queryAsync(
SELECT_SCYLLA_KEYSPACES
+ " WHERE keyspace_name = '"
+ targetKeyspace
+ "' LIMIT 1;",
connection,
protocolVersion);
} else {
scyllaKsFuture = queryAsync(SELECT_SCYLLA_KEYSPACES, connection, protocolVersion);
}
}

return new SystemRows(
get(ksFuture),
groupByKeyspace(get(cfFuture)),
Expand All @@ -1600,7 +1669,8 @@ SystemRows fetchSystemRows(
groupByKeyspaceAndCf(get(indexesFuture), TABLE_NAME),
get(virtualKeyspacesFuture),
groupByKeyspace(get(virtualTableFuture)),
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME));
groupByKeyspaceAndCf(get(virtualColumnsFuture), cassandraVersion, TABLE_NAME),
toKeyspaceSet(get(scyllaKsFuture)));
}
}
}

0 comments on commit 0a108c2

Please sign in to comment.