Skip to content

Cassandra 19958 4.1 #4061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: cassandra-4.1
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,346 changes: 1,146 additions & 200 deletions .circleci/config.yml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.1.9
* Assign a separate queue for Hints which is different from Mutation queue (CASSANDRA-19958)
* Fix mixed mode paxos ttl commit hang (CASSANDRA-20514)
* Fix paxos mixed mode infinite loop (CASSANDRA-20493)
* Optionally skip exception logging on invalid legacy protocol magic exception (CASSANDRA-19483)
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/concurrent/Stage.java
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ public enum Stage
INTERNAL_RESPONSE (false, "InternalResponseStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage),
IMMEDIATE (false, "ImmediateStage", "internal", () -> 0, null, Stage::immediateExecutor),
PAXOS_REPAIR (false, "PaxosRepairStage", "internal", FBUtilities::getAvailableProcessors, null, Stage::multiThreadedStage),
HINT (true, "HintStage", "request", DatabaseDescriptor::getConcurrentHints, DatabaseDescriptor::setConcurrentHints, Stage::multiThreadedLowSignalStage),
;

public final String jmxName;
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
@@ -164,6 +164,7 @@ public static Set<String> splitCommaDelimited(String src)

public int concurrent_reads = 32;
public int concurrent_writes = 32;
public int concurrent_hints = 32;
public int concurrent_counter_writes = 32;
public int concurrent_materialized_view_writes = 32;
public int available_processors = -1;
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
@@ -4638,4 +4638,19 @@ public static void setRejectOutOfTokenRangeRequests(boolean enabled)
{
conf.reject_out_of_token_range_requests = enabled;
}

public static int getConcurrentHints()
{
return conf.concurrent_hints;
}

public static void setConcurrentHints(int concurrent_hints)
{
if (concurrent_hints < 0)
{
throw new IllegalArgumentException("Concurrent hints must be non-negative");
}
conf.concurrent_hints = concurrent_hints;
}
}

2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
@@ -2770,7 +2770,7 @@ private static Future<Void> submitHint(HintRunnable runnable)
StorageMetrics.totalHintsInProgress.inc(runnable.targets.size());
for (Replica target : runnable.targets)
getHintsInProgressFor(target.endpoint()).incrementAndGet();
return (Future<Void>) Stage.MUTATION.submit(runnable);
return (Future<Void>) Stage.HINT.submit(runnable);
}

public Long getRpcTimeout() { return DatabaseDescriptor.getRpcTimeout(MILLISECONDS); }
74 changes: 68 additions & 6 deletions test/unit/org/apache/cassandra/service/StorageProxyTest.java
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import java.net.UnknownHostException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import com.google.common.util.concurrent.Uninterruptibles;
@@ -32,16 +33,26 @@
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.HeartBeatState;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StorageMetrics;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;

import static org.apache.cassandra.locator.ReplicaUtils.full;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

@RunWith(BMUnitRunner.class)
public class StorageProxyTest
@@ -56,14 +67,14 @@ public static void initDD()
@Test
public void testSetGetPaxosVariant()
{
Assert.assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
Assert.assertEquals("v1", StorageProxy.instance.getPaxosVariant());
assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
assertEquals("v1", StorageProxy.instance.getPaxosVariant());
StorageProxy.instance.setPaxosVariant("v2");
Assert.assertEquals("v2", StorageProxy.instance.getPaxosVariant());
Assert.assertEquals(Config.PaxosVariant.v2, DatabaseDescriptor.getPaxosVariant());
assertEquals("v2", StorageProxy.instance.getPaxosVariant());
assertEquals(Config.PaxosVariant.v2, DatabaseDescriptor.getPaxosVariant());
DatabaseDescriptor.setPaxosVariant(Config.PaxosVariant.v1);
Assert.assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
Assert.assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
assertEquals(Config.PaxosVariant.v1, DatabaseDescriptor.getPaxosVariant());
}

@Test
@@ -148,4 +159,55 @@ private void shouldHintTest(Consumer<Replica> test) throws UnknownHostException
StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
}
}

public static AtomicBoolean hintsHit = new AtomicBoolean(false);
public static AtomicBoolean mutationsHit = new AtomicBoolean(false);
@Test
@BMRules(rules = {
@BMRule(name = "Separate hints",
targetClass = "org.apache.cassandra.concurrent.Stage",
targetLocation = "ENTRY",
targetMethod = "submit(Runnable)",
condition = "$0.name().equals(Stage.HINT.name())",
action = "org.apache.cassandra.service.StorageProxyTest.hintsHit.set(true);"),
@BMRule(name = "Separate mutations",
targetClass = "org.apache.cassandra.concurrent.Stage",
targetLocation = "ENTRY",
targetMethod = "submit(Runnable)",
condition = "$0.name().equals(Stage.MUTATION.name())",
action = "org.apache.cassandra.service.StorageProxyTest.mutationsHit.set(true);")
})
public void testHintIsPutByDefaultInHintQueueAndNotMutationQueue() throws Exception {
// Mock mutation and create a real replica
Mutation mockMutation = mock(Mutation.class);
Replica realReplica = createRealReplica();

// Mock the response handler
AbstractWriteResponseHandler<IMutation> mockResponseHandler = mock(AbstractWriteResponseHandler.class);

// Capture the metrics before the method call
long hintsBefore = StorageMetrics.totalHintsInProgress.getCount();

// Invoke the submitHint method
StorageProxy.submitHint(mockMutation, realReplica, mockResponseHandler);

// Capture the metrics after the method call
long hintsAfter = StorageMetrics.totalHintsInProgress.getCount();

// Assertions
assertEquals(1, hintsAfter - hintsBefore);
assertEquals(true, hintsHit.get());
assertEquals(false, mutationsHit.get());
}

private Replica createRealReplica() throws UnknownHostException {
InetAddressAndPort inetAddress = InetAddressAndPort.getByName("127.0.0.1");
// Create a Token and Range<Token>
IPartitioner partitioner = Murmur3Partitioner.instance;
Token startToken = partitioner.getMinimumToken();
Token endToken = partitioner.getRandomToken();
Range<Token> tokenRange = new Range<>(startToken, endToken);

return Replica.fullReplica(inetAddress, tokenRange);
}
}