Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for tablets [3.x] #237

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ public ListenableFuture<Void> apply(Message.Response response) throws Exception
if (lwt != null) {
getHost().setLwtInfo(lwt);
}
TabletInfo tabletInfo = TabletInfo.parseTabletInfo(msg.supported);
getHost().setTabletInfo(tabletInfo);
return MoreFutures.VOID_SUCCESS;
case ERROR:
Responses.Error error = (Responses.Error) response;
Expand Down Expand Up @@ -507,6 +509,13 @@ public ListenableFuture<Void> apply(Void input) throws Exception {
if (lwtInfo != null) {
lwtInfo.addOption(extraOptions);
}
TabletInfo tabletInfo = getHost().getTabletInfo();
if (tabletInfo != null
&& tabletInfo.isEnabled()
&& ProtocolFeature.CUSTOM_PAYLOADS.isSupportedBy(protocolVersion)) {
logger.debug("Enabling tablet support in OPTIONS message");
TabletInfo.addOption(extraOptions);
}
Future startupResponseFuture =
write(
new Requests.Startup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ public void onSet(
switch (response.type) {
case RESULT:
Responses.Result rm = (Responses.Result) response;

if (rm.getCustomPayload() != null
&& rm.getCustomPayload().containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)
&& (statement instanceof BoundStatement)) {
BoundStatement st = (BoundStatement) statement;
String keyspace = statement.getKeyspace();
String table =
st.preparedStatement().getPreparedId().boundValuesMetadata.variables.getTable(0);
session
.getCluster()
.getMetadata()
.getTabletMap()
.processTabletsRoutingV1Payload(
keyspace,
table,
rm.getCustomPayload().get(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY));
}
switch (rm.kind) {
case SET_KEYSPACE:
// propagate the keyspace change to other connections
Expand Down
11 changes: 11 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Host {
// Can be set concurrently but the value should always be the same.
private volatile LwtInfo lwtInfo = null;

// Whether host supports TABLETS_ROUTING_V1
private volatile TabletInfo tabletInfo = null;

enum State {
ADDED,
DOWN,
Expand Down Expand Up @@ -450,6 +453,14 @@ public void setLwtInfo(LwtInfo lwtInfo) {
this.lwtInfo = lwtInfo;
}

public TabletInfo getTabletInfo() {
return tabletInfo;
}

public void setTabletInfo(TabletInfo tabletInfo) {
this.tabletInfo = tabletInfo;
}

/**
* Returns whether the host is considered up by the driver.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ ListenableFuture<Connection> borrowConnection(
TimeUnit unit,
int maxQueueSize,
Token.Factory partitioner,
ByteBuffer routingKey) {
ByteBuffer routingKey,
String keyspace,
String table) {
Phase phase = this.phase.get();
if (phase != Phase.READY)
return Futures.immediateFailedFuture(
Expand All @@ -515,7 +517,17 @@ ListenableFuture<Connection> borrowConnection(
if (routingKey != null) {
Metadata metadata = manager.cluster.getMetadata();
Token t = metadata.newToken(partitioner, routingKey);
shardId = host.getShardingInfo().shardId(t);
shardId = -1;
if (keyspace != null && table != null) {
assert t instanceof Token.TokenLong64;
shardId =
Integer.min(
metadata.getShardForTabletToken(keyspace, table, (Token.TokenLong64) t, host),
host.getShardingInfo().getShardsCount());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add protection here against too-large shardId.

Scenario:

  1. we get routing information for a tablet
  2. node restarts with fewer shards
  3. we look up routing information, get a shard that no longer exists

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed 'this' to min(shardCount, 'this')

if (shardId == -1) { // means that tablet lookup failed
shardId = host.getShardingInfo().shardId(t);
}
} else {
shardId = RAND.nextInt(host.getShardingInfo().getShardsCount());
}
Expand Down
102 changes: 98 additions & 4 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package com.datastax.driver.core;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
Expand All @@ -35,13 +36,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,8 +63,8 @@ public class Metadata {
final ConcurrentMap<String, KeyspaceMetadata> keyspaces =
new ConcurrentHashMap<String, KeyspaceMetadata>();
private volatile TokenMap tokenMap;

final ReentrantLock lock = new ReentrantLock();
private final TabletMap tabletMap;

// See https://github.com/apache/cassandra/blob/trunk/doc/cql3/CQL.textile#appendixA
private static final IntObjectHashMap<List<char[]>> RESERVED_KEYWORDS =
Expand Down Expand Up @@ -146,6 +149,7 @@ public class Metadata {

Metadata(Cluster.Manager cluster) {
this.cluster = cluster;
this.tabletMap = TabletMap.emptyMap(cluster);
}

// rebuilds the token map with the current hosts, typically when refreshing schema metadata
Expand Down Expand Up @@ -514,21 +518,30 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
* Extension of legacy method {@link Metadata#getReplicas(String, Token.Factory, ByteBuffer)}.
* Tablets model requires knowledge of the table name to determine the replicas. This method will
* first try to lookup replicas through known tablets metadata. It will default to TokenMap lookup
* if either {@code null} was passed as table name or the tablet lookup is unsuccessful for any
* other reason.
*
* <p>Returns the set of hosts that are replica for a given partition key. Partitioner can be
* {@code null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param table the name of the table to get replicas for. Necessary for distinction for tablets.
* Unnecessary for regular TokenMap
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
@Beta
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) {
keyspace = handleId(keyspace);
TokenMap current = tokenMap;
if (current == null) {
Expand All @@ -537,11 +550,40 @@ public Set<Host> getReplicas(
if (partitioner == null) {
partitioner = current.factory;
}
// If possible, try tablet lookup first
if (keyspace != null && table != null) {
Token token = partitioner.hash(partitionKey);
assert (token instanceof Token.TokenLong64);
Set<UUID> hostUuids = tabletMap.getReplicas(keyspace, table, (long) token.getValue());
if (!hostUuids.isEmpty()) {
return hostUuids.stream().map(this::getHost).collect(Collectors.toSet());
}
}
// Fall back to tokenMap
Set<Host> hosts = current.getReplicas(keyspace, partitioner.hash(partitionKey));
return hosts == null ? Collections.<Host>emptySet() : hosts;
}
}

/**
* Returns the set of hosts that are replica for a given partition key. Partitioner can be {@code
* null} and then a cluster-wide partitioner will be invoked.
*
* <p>Note that this information is refreshed asynchronously by the control connection, when
* schema or ring topology changes. It might occasionally be stale (or even empty).
*
* @param keyspace the name of the keyspace to get replicas for.
* @param partitioner the partitioner to use or @{code null} for cluster-wide partitioner.
* @param partitionKey the partition key for which to find the set of replica.
* @return the (immutable) set of replicas for {@code partitionKey} as known by the driver. Note
* that the result might be stale or empty if metadata was explicitly disabled with {@link
* QueryOptions#setMetadataEnabled(boolean)}.
*/
public Set<Host> getReplicas(
String keyspace, Token.Factory partitioner, ByteBuffer partitionKey) {
return getReplicas(keyspace, null, partitioner, partitionKey);
}

/**
* Returns the set of hosts that are replica for a given token range.
*
Expand Down Expand Up @@ -860,6 +902,58 @@ void triggerOnMaterializedViewRemoved(MaterializedViewMetadata view) {
}
}

@Beta
public int getShardForTabletToken(
String keyspace, String table, Token.TokenLong64 token, Host host) {
if (tabletMap == null) {
logger.trace(
"Could not determine shard for token {} on host {} because tablets metadata is currently null. "
+ "Returning -1.",
token,
host);
return -1;
}
UUID targetHostUuid = host.getHostId();
long tokenValue = (long) token.getValue();
TabletMap.KeyspaceTableNamePair key = new TabletMap.KeyspaceTableNamePair(keyspace, table);
NavigableSet<TabletMap.Tablet> targetTablets = tabletMap.getMapping().get(key);
if (targetTablets == null) {
logger.trace(
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
+ "metadata. Returning -1.",
token,
host,
keyspace,
table);
return -1;
}
TabletMap.Tablet row = targetTablets.ceiling(TabletMap.Tablet.malformedTablet(tokenValue));
if (row != null && row.getFirstToken() < tokenValue) {
for (TabletMap.HostShardPair hostShardPair : row.getReplicas()) {
if (hostShardPair.getHost().equals(targetHostUuid)) {
return hostShardPair.getShard();
}
}
}
logger.trace(
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning -1.",
token,
host,
table,
keyspace);
return -1;
}

/**
* Getter for current {@link TabletMap}.
*
* @return current {@link TabletMap}
*/
@Beta
public TabletMap getTabletMap() {
return tabletMap;
}

private static class TokenMap {

private final Token.Factory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,22 @@ private Iterator<Host> getReplicas(
}

Token.Factory partitioner = statement.getPartitioner();
String tableName = null;
ColumnDefinitions defs = null;
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).preparedStatement().getVariables();
} else if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (defs != null && defs.size() > 0) {
tableName = defs.getTable(0);
}

final Set<Host> replicas =
manager
.cluster
.getMetadata()
.getReplicas(Metadata.quote(keyspace), partitioner, partitionKey);
.getReplicas(Metadata.quote(keyspace), tableName, partitioner, partitionKey);

// replicas are stored in the right order starting with the primary replica
return replicas.iterator();
Expand Down Expand Up @@ -437,13 +448,28 @@ private boolean query(final Host host) {
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);

PoolingOptions poolingOptions = manager.configuration().getPoolingOptions();
String statementKeyspace = statement.getKeyspace();
String statementTable = null;
ColumnDefinitions defs = null;
if (statement instanceof PreparedStatement) {
defs = ((PreparedStatement) statement).getVariables();
}
if (statement instanceof BoundStatement) {
defs = ((BoundStatement) statement).statement.getVariables();
}
if (defs != null && defs.size() > 0) {
statementTable = defs.getTable(0);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could capture the tablet map during prepare time and put it into statement itself, avoiding all the casting and checks here. Probably negligible in terms of performance.

ListenableFuture<Connection> connectionFuture =
pool.borrowConnection(
poolingOptions.getPoolTimeoutMillis(),
TimeUnit.MILLISECONDS,
poolingOptions.getMaxQueueSize(),
statement.getPartitioner(),
routingKey);
routingKey,
statementKeyspace,
statementTable);
GuavaCompatibility.INSTANCE.addCallback(
connectionFuture,
new FutureCallback<Connection>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,22 @@ private ListenableFuture<PreparedStatement> prepare(
if (entry.getKey().getEndPoint().equals(toExclude)) continue;

try {
ColumnDefinitions defs = statement.getVariables();
String statementTable = (defs != null && defs.size() > 0 ? defs.getTable(0) : null);
// Preparing is not critical: if it fails, it will fix itself later when the user tries to
// execute
// the prepared query. So don't wait if no connection is available, simply abort.
ListenableFuture<Connection> connectionFuture =
entry
.getValue()
.borrowConnection(0, TimeUnit.MILLISECONDS, 0, null, statement.getRoutingKey());
.borrowConnection(
0,
TimeUnit.MILLISECONDS,
0,
null,
statement.getRoutingKey(),
statement.getQueryKeyspace(),
statementTable);
ListenableFuture<Response> prepareFuture =
GuavaCompatibility.INSTANCE.transformAsync(
connectionFuture,
Expand Down
33 changes: 33 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/TabletInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.datastax.driver.core;

import java.util.List;
import java.util.Map;

public class TabletInfo {
private static final String SCYLLA_TABLETS_STARTUP_OPTION_KEY = "TABLETS_ROUTING_V1";
private static final String SCYLLA_TABLETS_STARTUP_OPTION_VALUE = "";
public static final String TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY = "tablets-routing-v1";

private boolean enabled = false;

private TabletInfo(boolean enabled) {
this.enabled = enabled;
}

// Currently pertains only to TABLETS_ROUTING_V1
public boolean isEnabled() {
return enabled;
}

public static TabletInfo parseTabletInfo(Map<String, List<String>> supported) {
List<String> values = supported.get(SCYLLA_TABLETS_STARTUP_OPTION_KEY);
return new TabletInfo(
values != null
&& values.size() == 1
&& values.get(0).equals(SCYLLA_TABLETS_STARTUP_OPTION_VALUE));
}

public static void addOption(Map<String, String> options) {
options.put(SCYLLA_TABLETS_STARTUP_OPTION_KEY, SCYLLA_TABLETS_STARTUP_OPTION_VALUE);
}
}
Loading
Loading