Skip to content

CEP-45: Short read protection for mutation tracking #4091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: cep-45-mutation-tracking
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
cep-45-mutation-tracking
* CEP-45: Query forwarding (CASSANDRA-20309)
* Fix mutation tracking startup (CASSANDRA-20540)
* Mutation tracking journal integration, read, and write path (CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
* Introduce MutationJournal for coordinator logs (CASSANDRA-20353)
* Copy over Journal and dependencies from cep-15-accord (CASSANDRA-20321)

5.1
* Throw new IndexBuildInProgressException when queries fail during index build, instead of IndexNotAvailableException (CASSANDRA-20402)
* Fix Paxos repair interrupts running transactions (CASSANDRA-20469)
2 changes: 1 addition & 1 deletion pylib/cqlshlib/test/test_cqlsh_output.py
Original file line number Diff line number Diff line change
@@ -791,7 +791,7 @@ def test_describe_schema_output(self):
self.assertNoHasColors(output)
# Since CASSANDRA-7622 'DESC FULL SCHEMA' also shows all VIRTUAL keyspaces
self.assertIn('VIRTUAL KEYSPACE system_virtual_schema', output)
self.assertIn("\nCREATE KEYSPACE system_auth WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;\n",
self.assertIn("\nCREATE KEYSPACE system_auth WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true AND replication_type = 'untracked';\n",
output)
self.assertRegex(output, r'.*\s*$')

2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
@@ -648,6 +648,8 @@ public static class SSTableConfig

public boolean dynamic_data_masking_enabled = false;

public boolean mutation_tracking_enabled = false;

/**
* Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long.
* (Only valid, if user_defined_functions_threads_enabled==true)
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
@@ -5419,6 +5419,20 @@ public static void setDynamicDataMaskingEnabled(boolean enabled)
}
}

public static boolean getMutationTrackingEnabled()
{
return conf.mutation_tracking_enabled;
}

public static void setMutationTrackingEnabled(boolean enabled)
{
if (enabled != conf.mutation_tracking_enabled)
{
logger.info("Setting mutation_tracking_enabled to {}", enabled);
conf.mutation_tracking_enabled = enabled;
}
}

public static OptionalDouble getSeverityDuringDecommission()
{
return conf.severity_during_decommission > 0 ?
14 changes: 7 additions & 7 deletions src/java/org/apache/cassandra/cql3/QueryProcessor.java
Original file line number Diff line number Diff line change
@@ -58,7 +58,6 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadQuery;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -88,6 +87,7 @@
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.service.reads.IReadResponse;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
@@ -514,17 +514,17 @@ else if (readQuery instanceof SinglePartitionReadQuery.Group)
{
throw new IllegalArgumentException("Unable to handle; only expected ReadCommands but given " + readQuery.getClass());
}
Future<List<Message<ReadResponse>>> future = FutureCombiner.allOf(commands.stream()
.map(rc -> Message.out(rc.verb(), rc))
.map(m -> MessagingService.instance().<ReadCommand, ReadResponse>sendWithResult(m, address))
.collect(Collectors.toList()));
Future<List<Message<IReadResponse>>> future = FutureCombiner.allOf(commands.stream()
.map(rc -> Message.out(rc.verb(), rc))
.map(m -> MessagingService.instance().<ReadCommand, IReadResponse>sendWithResult(m, address))
.collect(Collectors.toList()));

ResultSetBuilder result = new ResultSetBuilder(select.getResultMetadata(), select.getSelection().newSelectors(options), false);
return future.map(list -> {
int i = 0;
for (Message<ReadResponse> m : list)
for (Message<IReadResponse> m : list)
{
ReadResponse rsp = m.payload;
IReadResponse rsp = m.payload;
try (PartitionIterator it = UnfilteredPartitionIterators.filter(rsp.makeIterator(commands.get(i++)), nowInSec))
{
while (it.hasNext())
Original file line number Diff line number Diff line change
@@ -451,7 +451,7 @@ private void executeWithoutConditions(List<? extends IMutation> mutations, Consi
updatePartitionsPerBatchMetrics(mutations.size());

boolean mutateAtomic = (isLogged() && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, requestTime);
StorageProxy.mutateWithoutConditions(mutations, cl, mutateAtomic, requestTime);
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
}

Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.replication.MutationId;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -223,7 +224,7 @@ public Mutation build()
PartitionUpdate update = updateEntry.getValue().build();
updates.put(updateEntry.getKey(), update);
}
return new Mutation(keyspaceName, key, updates.build(), createdAt);
return new Mutation(MutationId.none(), keyspaceName, key, updates.build(), createdAt);
}

public PartitionUpdate.Builder get(TableId tableId)
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.cassandra.replication.MutationId;
import org.apache.cassandra.replication.MutationTrackingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -563,9 +565,10 @@ private ResultMessage executeWithoutCondition(QueryState queryState, QueryOption
options.getTimestamp(queryState),
options.getNowInSeconds(queryState),
requestTime);

if (!mutations.isEmpty())
{
StorageProxy.mutateWithTriggers(mutations, cl, false, requestTime);
StorageProxy.mutateWithoutConditions(mutations, cl, false, requestTime);

if (!SchemaConstants.isSystemKeyspace(metadata.keyspace))
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
@@ -727,8 +730,30 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer
{
long timestamp = options.getTimestamp(queryState);
long nowInSeconds = options.getNowInSeconds(queryState);
for (IMutation mutation : getMutations(queryState.getClientState(), options, true, timestamp, nowInSeconds, requestTime))
mutation.apply();
List<? extends IMutation> mutations = getMutations(queryState.getClientState(), options, true, timestamp, nowInSeconds, requestTime);
boolean isTracked = !mutations.isEmpty() && Schema.instance.getKeyspaceMetadata(mutations.get(0).getKeyspaceName()).params.replicationType.isTracked();
if (isTracked)
{
if (mutations.stream().anyMatch(m -> m instanceof CounterMutation))
throw new InvalidRequestException("Mutation tracking is currently unsupported with counters");
if (mutations.size() > 1)
throw new InvalidRequestException("Mutation tracking is currently unsupported with unlogged batches");

for (IMutation m : mutations)
{
Mutation mutation = (Mutation) m;
String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();
MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
mutation.apply();
}
}
else
{
for (IMutation mutation : mutations)
mutation.apply();
}
return null;
}

107 changes: 63 additions & 44 deletions src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

import org.apache.cassandra.service.reads.ShortReadException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -344,24 +345,35 @@ public ResultMessage.Rows execute(QueryState state, QueryOptions options, Dispat
query.trackWarnings();
ResultMessage.Rows rows;

if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize) || query.isTopK()))
while (true)
{
rows = execute(query, options, state.getClientState(), selectors, nowInSec, userLimit, null, requestTime, unmask);
}
else
{
QueryPager pager = getPager(query, options);
try
{
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize) || query.isTopK()))
{
rows = execute(query, options, state.getClientState(), selectors, nowInSec, userLimit, null, requestTime, unmask);
}
else
{
QueryPager pager = getPager(query, options);

rows = execute(state,
Pager.forDistributedQuery(pager, cl, state.getClientState()),
options,
selectors,
pageSize,
nowInSec,
userLimit,
aggregationSpec,
requestTime,
unmask);
}
break;
} catch (ShortReadException e)
{
// try again
}

rows = execute(state,
Pager.forDistributedQuery(pager, cl, state.getClientState()),
options,
selectors,
pageSize,
nowInSec,
userLimit,
aggregationSpec,
requestTime,
unmask);
}
if (!SchemaConstants.isSystemKeyspace(table.keyspace))
ClientRequestSizeMetrics.recordReadResponseMetrics(rows, restrictions, selection);
@@ -595,39 +607,46 @@ public ResultMessage.Rows executeInternal(QueryState state,
int pageSize = options.getPageSize();
boolean unmask = state.getClientState().hasTablePermission(table, Permission.UNMASK);

Selectors selectors = selection.newSelectors(options);
AggregationSpecification aggregationSpec = getAggregationSpec(options);
ReadQuery query = getQuery(options,
state.getClientState(),
selectors.getColumnFilter(),
nowInSec,
userLimit,
userPerPartitionLimit,
pageSize,
aggregationSpec);

try (ReadExecutionController executionController = query.executionController())
while (true)
{
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize) || query.isTopK()))
Selectors selectors = selection.newSelectors(options);
AggregationSpecification aggregationSpec = getAggregationSpec(options);
ReadQuery query = getQuery(options,
state.getClientState(),
selectors.getColumnFilter(),
nowInSec,
userLimit,
userPerPartitionLimit,
pageSize,
aggregationSpec);

try (ReadExecutionController executionController = query.executionController())
{
try (PartitionIterator data = query.executeInternal(executionController))
if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize) || query.isTopK()))
{
return processResults(data, options, selectors, nowInSec, userLimit, null, unmask, state.getClientState());
try (PartitionIterator data = query.executeInternal(executionController))
{
return processResults(data, options, selectors, nowInSec, userLimit, null, unmask, state.getClientState());
}
}
}

QueryPager pager = getPager(query, options);

return execute(state,
Pager.forInternalQuery(pager, executionController),
options,
selectors,
pageSize,
nowInSec,
userLimit,
aggregationSpec,
requestTime,
unmask);
QueryPager pager = getPager(query, options);

return execute(state,
Pager.forInternalQuery(pager, executionController),
options,
selectors,
pageSize,
nowInSec,
userLimit,
aggregationSpec,
requestTime,
unmask);
}
catch (ShortReadException e)
{
// try again
}
}
}

Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.KeyspaceParams.Option;
import org.apache.cassandra.schema.ReplicationType;
import org.apache.cassandra.schema.ReplicationParams;

public final class KeyspaceAttributes extends PropertyDefinitions
@@ -66,17 +67,25 @@ private Map<String, String> getAllReplicationOptions()
KeyspaceParams asNewKeyspaceParams()
{
boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), KeyspaceParams.DEFAULT_DURABLE_WRITES);
return KeyspaceParams.create(durableWrites, getAllReplicationOptions());

String rtypeName = getString(Option.REPLICATION_TYPE.toString());
ReplicationType replicationType = rtypeName != null ? ReplicationType.valueOf(rtypeName) : KeyspaceParams.DEFAULT_REPLICATION_TYPE;

return KeyspaceParams.create(durableWrites, getAllReplicationOptions(), replicationType);
}

KeyspaceParams asAlteredKeyspaceParams(KeyspaceParams previous)
{
boolean durableWrites = getBoolean(Option.DURABLE_WRITES.toString(), previous.durableWrites);
String rtypeName = getString(Option.REPLICATION_TYPE.toString());
ReplicationType replicationType = rtypeName != null ? ReplicationType.valueOf(rtypeName) : previous.replicationType;


Map<String, String> previousOptions = previous.replication.options;
ReplicationParams replication = getReplicationStrategyClass() == null
? previous.replication
: ReplicationParams.fromMapWithDefaults(getAllReplicationOptions(), previousOptions);
return new KeyspaceParams(durableWrites, replication);
return new KeyspaceParams(durableWrites, replication, replicationType);
}

public boolean hasOption(Option option)
Loading