Skip to content

Cleanup 11apr25 #4086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/io/util/File.java
Original file line number Diff line number Diff line change
@@ -225,6 +225,15 @@ public void deleteRecursive()
PathUtils.deleteRecursive(toPathForWrite());
}

/**
* Deletes all files and subdirectories under "dir".
* @return false if the root cannot be deleted
*/
public boolean tryDeleteRecursive()
{
return PathUtils.tryDeleteRecursive(toPathForWrite());
}

/**
* Try to delete the file on process exit.
*/
46 changes: 36 additions & 10 deletions src/java/org/apache/cassandra/io/util/PathUtils.java
Original file line number Diff line number Diff line change
@@ -346,11 +346,22 @@ public static Throwable delete(Path file, Throwable accumulate, @Nullable RateLi
private static void deleteRecursiveUsingNixCommand(Path path, boolean quietly)
{
String [] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() };
IOException failure = null;
if (!quietly && !Files.exists(path))
failure = new NoSuchFileException(path.toString());

if (failure == null)
failure = tryDeleteRecursiveUsingNixCommand(path, quietly);

if (failure != null)
throw propagateUnchecked(failure, path, true);
}

private static IOException tryDeleteRecursiveUsingNixCommand(Path path, boolean quietly)
{
String[] cmd = new String[]{ "rm", quietly ? "-rdf" : "-rd", path.toAbsolutePath().toString() };
try
{
if (!quietly && !Files.exists(path))
throw new NoSuchFileException(path.toString());

Process p = Runtime.getRuntime().exec(cmd);
int result = p.waitFor();

@@ -363,24 +374,39 @@ private static void deleteRecursiveUsingNixCommand(Path path, boolean quietly)
}

if (result != 0 && Files.exists(path))
{
logger.error("{} returned:\nstdout:\n{}\n\nstderr:\n{}", Arrays.toString(cmd), out, err);
throw new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err));
}
return new IOException(String.format("%s returned non-zero exit code: %d%nstdout:%n%s%n%nstderr:%n%s", Arrays.toString(cmd), result, out, err));

onDeletion.accept(path);
return null;
}
catch (IOException e)
{
throw propagateUnchecked(e, path, true);
return e;
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw new FSWriteError(e, path);
return new IOException("Interrupted while executing command " + Arrays.toString(cmd), e);
}
}


/**
* Deletes all files and subdirectories under "path".
* @param path file to be deleted
* @return false if the root cannot be deleted
*/
public static boolean tryDeleteRecursive(Path path)
{
if (USE_NIX_RECURSIVE_DELETE.getBoolean() && path.getFileSystem() == java.nio.file.FileSystems.getDefault())
return null == tryDeleteRecursiveUsingNixCommand(path, true);

if (isDirectory(path))
forEach(path, PathUtils::tryDeleteRecursive);

// The directory should now be empty, so now it can be smoked
return tryDelete(path);
}

/**
* Deletes all files and subdirectories under "path".
* @param path file to be deleted
18 changes: 15 additions & 3 deletions src/java/org/apache/cassandra/journal/Flusher.java
Original file line number Diff line number Diff line change
@@ -143,10 +143,20 @@ public void run(Interruptible.State state) throws InterruptedException
}
}

private boolean hasWork()
{
return hasWork(fsyncStartedFor);
}

private boolean hasWork(long lastStartedAt)
{
return fsyncWaitingSince != lastStartedAt;
}

private void awaitWork() throws InterruptedException
{
long lastStartedAt = fsyncStartedFor;
if (fsyncWaitingSince != lastStartedAt)
if (hasWork(lastStartedAt))
return;

awaitingWork = Thread.currentThread();
@@ -158,7 +168,7 @@ private void awaitWork() throws InterruptedException
throw new InterruptedException();
}

if (fsyncWaitingSince != lastStartedAt)
if (hasWork(lastStartedAt))
break;

LockSupport.park();
@@ -175,7 +185,9 @@ void notify(Thread notify)

public void doRun(Interruptible.State state) throws InterruptedException
{
awaitWork();
if (state == NORMAL) awaitWork();
else if (!hasWork()) return;

if (fsyncing == null)
fsyncing = journal.oldestActiveSegment();

Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.SortedListSet;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import org.agrona.collections.LongArrayList;
@@ -449,8 +450,7 @@ protected void localSyncComplete(Topology topology, boolean startSync)
epochState.setSyncStatus(SyncStatus.NOTIFYING);
}

// TODO (required): replace with SortedArraySet when it is available
Set<Node.Id> notify = new HashSet<>(topology.nodes());
Set<Node.Id> notify = SortedListSet.allOf(topology.nodes());
notify.remove(localId);
syncPropagator.reportSyncComplete(epoch, notify, localId);
}
Original file line number Diff line number Diff line change
@@ -240,8 +240,6 @@ public void reserialize(JournalKey key, DurableBeforeAccumulator from, DataOutpu
@Override
public void deserialize(JournalKey journalKey, DurableBeforeAccumulator into, DataInputPlus in, Version userVersion) throws IOException
{
// TODO: maybe using local serializer is not the best call here, but how do we distinguish
// between messaging and disk versioning?
into.update(CommandStoreSerializers.durableBefore.deserialize(in));
}
}
Original file line number Diff line number Diff line change
@@ -129,7 +129,6 @@ public class AccordKeyspace

public static final Set<String> TABLE_NAMES = ImmutableSet.of(COMMANDS_FOR_KEY, JOURNAL);

// TODO (desired): implement a custom type so we can get correct sort order
public static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, Int32Type.instance));

private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), Clustering.EMPTY), false);
Original file line number Diff line number Diff line change
@@ -225,7 +225,6 @@ public <P1, P2> void visit(Unseekables<?> keysOrRanges, Timestamp startedBefore,
commandsForRanges.visit(keysOrRanges, startedBefore, testKind, visitor, p1, p2);
}

// TODO (expected): instead of accepting a slice, accept the min/max epoch and let implementation handle it
@Override
public boolean visit(Unseekables<?> keysOrRanges, TxnId testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp testStartedAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit)
{
Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestFailureException;
import org.apache.cassandra.journal.Params;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
@@ -475,7 +476,8 @@ private TopologyRange fetchTopologies(long from) throws ExecutionException, Inte
}
catch (Throwable e)
{
logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer);
if (e instanceof RequestFailureException) logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer);
else logger.info("Failed to fetch epochs [{}, {}] from {}", from, metadata.epoch.getEpoch(), peer, e);
}
}

5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/service/accord/AccordTask.java
Original file line number Diff line number Diff line change
@@ -194,11 +194,11 @@ boolean isComplete()
private final String loggingId;
private static final AtomicLong nextLoggingId = new AtomicLong(Clock.Global.currentTimeMillis());

// TODO (expected): merge all of these maps into one
// TODO (desired): merge all of these maps into one
@Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands;
@Nullable Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> commandsForKey;
@Nullable Object2ObjectHashMap<Object, AccordSafeState<?, ?>> loading;
// TODO (expected): collection supporting faster deletes but still fast poll (e.g. some ordered collection)
// TODO (desired): collection supporting faster deletes but still fast poll (e.g. some ordered collection)
@Nullable ArrayDeque<AccordCacheEntry<?, ?>> waitingToLoad;
@Nullable RangeTxnScanner rangeScanner;
boolean hasRanges;
@@ -662,7 +662,6 @@ public void run()
safeStore = commandStore.begin(this, commandsForRanges);
R result = apply(safeStore);

// TODO (required): currently, we are not very efficient about ensuring that we persist the absolute minimum amount of state. Improve that.
List<Journal.CommandUpdate> changes = null;
if (commands != null)
{
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@

import static accord.local.CommandSummaries.SummaryStatus.NOT_DIRECTLY_WITNESSED;

// TODO (required): move to accord-core, merge with existing logic there
// TODO (expected): move to accord-core, merge with existing logic there
public class CommandsForRanges extends TreeMap<Timestamp, Summary> implements CommandSummaries.ByTxnIdSnapshot
{
public CommandsForRanges(Map<? extends Timestamp, ? extends Summary> m)
Original file line number Diff line number Diff line change
@@ -223,7 +223,6 @@ private static ReadDataSerializer serializerFor(ReadType type)

public static final class ReplySerializer<D extends Data> implements IVersionedSerializer<ReadReply>
{
// TODO (expected): use something other than ordinal
final CommitOrReadNack[] nacks = CommitOrReadNack.values();
private final VersionedSerializer<D, Version> dataSerializer;

Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@

public class ResultSerializers
{
// TODO (expected): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query
// TODO (desired): this is meant to encode e.g. whether the transaction's condition met or not for clients to later query
public static final Result APPLIED = new Result()
{
@Override
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ public final T deserialize(DataInputPlus in, Version version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in);
Route<?> scope = KeySerializers.route.deserialize(in);
// TODO: there should be a base epoch
// TODO (desired): there should be a base epoch
long waitForEpoch = in.readUnsignedVInt();
return deserializeBody(in, version, txnId, scope, waitForEpoch);
}
Original file line number Diff line number Diff line change
@@ -78,7 +78,6 @@ public UpdateParameters updateParameters(TableMetadata metadata, DecoratedKey dk
// For the time being, guardrails are disabled for Accord queries.
ClientState disabledGuardrails = null;

// TODO : How should Accord work with TTL?
int ttl = metadata.params.defaultTimeToLive;
return new RowUpdateParameters(metadata,
disabledGuardrails,
@@ -103,7 +102,6 @@ private Map<DecoratedKey, Partition> prefetchRow(TableMetadata metadata, Decorat
checkState(data.entrySet().size() == 1, "CAS read should only have one entry");
return ImmutableMap.of(dk, value);
case AUTO_READ:
// TODO (review): Is this the right DK being passed into that matches what we used to store in TxnDataName
if (TxnData.txnDataNameIndex(name) == index)
return ImmutableMap.of(dk, value);
default:
Original file line number Diff line number Diff line change
@@ -235,9 +235,6 @@ public AsyncChain<Data> read(TableMetadatas tables, ConsistencyLevel consistency
if (command == null)
return AsyncResults.success(TxnData.NOOP_DATA);

// TODO (required, safety): before release, double check reasoning that this is safe
// AccordCommandsForKey cfk = ((SafeAccordCommandStore)safeStore).commandsForKey(key);
// int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
// It's fine for our nowInSeconds to lag slightly our insertion timestamp, as to the user
// this simply looks like the transaction witnessed TTL'd data and the data then expired
// immediately after the transaction executed, and this simplifies things a great deal
Original file line number Diff line number Diff line change
@@ -174,15 +174,15 @@ public boolean preserveTimestamps()
public Update slice(Ranges ranges)
{
Keys keys = this.keys.slice(ranges);
// TODO: Slice the condition.
// TODO (desired): Slice the condition.
return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps);
}

@Override
public Update intersecting(Participants<?> participants)
{
Keys keys = this.keys.intersecting(participants);
// TODO: Slice the condition.
// TODO (desired): Slice the condition.
return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), condition, cassandraCommitCL, preserveTimestamps);
}

@@ -201,9 +201,9 @@ private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
@Override
public Update merge(Update update)
{
// TODO: special method for linear merging keyed and non-keyed lists simultaneously
TxnUpdate that = (TxnUpdate) update;
Keys mergedKeys = this.keys.with(that.keys);
// TODO (desired): special method for linear merging keyed and non-keyed lists simultaneously
ByteBuffer[] mergedFragments = merge(this.keys, that.keys, this.fragments, that.fragments, mergedKeys.size());
return new TxnUpdate(tables, mergedKeys, mergedFragments, condition, cassandraCommitCL, preserveTimestamps);
}
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@

import java.util.UUID;

import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.auth.CassandraRoleManager;
@@ -44,7 +43,6 @@
@SuppressWarnings("Convert2MethodRef")
public class HintsMaxSizeTest extends TestBaseImpl
{
@Ignore
@Test
public void testMaxHintedHandoffSize() throws Exception
{
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
@@ -106,7 +105,6 @@ public void bulkLoaderSuccessfullyStreamsOverSsl() throws Throwable
assertRows(CLUSTER.get(1).executeInternal("SELECT count(*) FROM ssl_upload_tables.test"), row(42L));
}

@Ignore
@Test
public void bulkLoaderSuccessfullyStreamsOverSslWithDeprecatedSslStoragePort() throws Throwable
{
Original file line number Diff line number Diff line change
@@ -90,7 +90,9 @@ public void runQuery(Cluster cluster, ClusterState clusterState, Operation opera
cluster.coordinator(coordinator).execute(withKeyspace(query), ConsistencyLevel.QUORUM);
fail("should fail");
}
catch (Exception ignored) {}
catch (Exception ignored)
{
}

boolean metricBumped = false;
for (int i = 1; i <= cluster.size(); i++)
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
@@ -57,7 +56,6 @@

public class SplitBrainTest extends TestBaseImpl
{
@Ignore
@Test
public void testSplitBrainStartup() throws IOException, TimeoutException
{
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/ServerTestUtils.java
Original file line number Diff line number Diff line change
@@ -238,7 +238,7 @@ private static void cleanupDirectory(File directory)
{
if (directory.exists())
{
Arrays.stream(directory.tryList()).forEach(File::deleteRecursive);
Arrays.stream(directory.tryList()).forEach(File::tryDeleteRecursive);
}
}

Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
@@ -110,7 +109,6 @@ public void testSegmentFlaggingWithNonblockingOnCreation() throws Throwable
testWithNonblockingMode(this::testSegmentFlaggingOnCreation0);
}

@Ignore
@Test
public void testNonblockingShouldMaintainSteadyDiskUsage() throws Throwable
{
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@

import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@@ -58,7 +57,6 @@ public class CommitlogShutdownTest

private final static byte[] entropy = new byte[1024 * 256];

@Ignore
@Test
@BMRule(name = "Make removing commitlog segments slow",
targetClass = "CommitLogSegment",
Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@@ -92,7 +91,6 @@ public void reinstanciateService() throws Throwable
HintsService.instance.startDispatch();
}

@Ignore
@Test
@BMRule(name = "Delay delivering hints",
targetClass = "DispatchHintsTask",
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import java.util.Random;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
@@ -60,7 +59,6 @@ public void testTrim() throws IOException
testSafeMemoryWriter(CHUNK * 5, CHUNK, 65536);
}

@Ignore
@Test
public void testOver2GBuffer() throws IOException
{
Original file line number Diff line number Diff line change
@@ -69,12 +69,12 @@ private static Gen<KnownMap> foundKnownMap()
switch (domain)
{
case Key:
// TODO (coverage): don't hard code murmur
// TODO (desired): don't hard code murmur
Gen<TokenKey> keyGen = AccordGenerators.routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN), Gens.constant(AccordGenerators.RoutingKeyKind.TOKEN), fromQT(CassandraGenerators.murmurToken()), Murmur3Partitioner.instance);
TokenKey homeKey = keyGen.next(rs);
List<TokenKey> forOrdering = Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs);
forOrdering.sort(Comparator.naturalOrder());
// TODO (coverage): don't hard code keys type
// TODO (desired): don't hard code keys type
keysOrRanges = new FullKeyRoute(homeKey, forOrdering.toArray(RoutingKey[]::new));
break;
case Range:
Original file line number Diff line number Diff line change
@@ -479,7 +479,6 @@ private static void testOne(long seed)
}
}

// TODO (expected): we currently don't explore TruncatedApply statuses because we don't transition through all phases and therefore don't adopt the Applied status
Choices<SaveStatus> saveStatusChoices = Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, SaveStatus.TruncatedUnapplied, SaveStatus.TruncatedApplyWithOutcome)).toArray(SaveStatus[]::new));
Supplier<SaveStatus> saveStatusSupplier = () -> {
SaveStatus result = saveStatusChoices.choose(source);
Original file line number Diff line number Diff line change
@@ -32,7 +32,6 @@

import org.apache.commons.lang3.NotImplementedException;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
@@ -78,7 +77,6 @@ public static void setup()
log.readyUnchecked();
}

@Ignore
@Test
public void discoveryTest() throws Throwable
{