Skip to content

Commit

Permalink
feat(#41): corrected file removal and bootstrap trimming
Browse files Browse the repository at this point in the history
  • Loading branch information
novoj committed Jun 20, 2024
1 parent 312b5e0 commit de2e49b
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 121 deletions.
8 changes: 4 additions & 4 deletions evita_engine/src/main/java/io/evitadb/core/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1221,10 +1221,10 @@ public void notifyCatalogPresentInLiveView() {
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
);
while (current != null) {
while (current != null && !current.isClosed()) {
//noinspection unchecked
final List<Subscriber<?>> subscribers = (List<Subscriber<?>>) current.getSubscribers();
Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!");
Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!");
for (Subscriber<?> subscriber : subscribers) {
if (subscriber instanceof AbstractTransactionStage<?, ?> stage) {
stage.updateCatalogReference(this);
Expand Down Expand Up @@ -1322,10 +1322,10 @@ private TrunkIncorporationTransactionStage getTrunkIncorporationStage() {
this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(),
this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer
);
while (current != null) {
while (current != null && !current.isClosed()) {
//noinspection unchecked
final List<Subscriber<?>> subscribers = (List<Subscriber<?>>) current.getSubscribers();
Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!");
Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!");
for (Subscriber<?> subscriber : subscribers) {
if (subscriber instanceof TrunkIncorporationTransactionStage stage) {
return stage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public final void onNext(UpdatedCatalogTransactionTask task) {
}
} catch (Throwable ex) {
log.error("Error while processing snapshot propagating task for catalog `" + catalogName + "`!", ex);
task.future().completeExceptionally(ex);
if (task.future() != null) {
task.future().completeExceptionally(ex);
}
}

// emit the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.evitadb.api.configuration.EvitaConfiguration;
import io.evitadb.api.configuration.ServerOptions;
import io.evitadb.api.configuration.StorageOptions;
import io.evitadb.api.configuration.ThreadPoolOptions;
import io.evitadb.api.configuration.TransactionOptions;
import io.evitadb.api.exception.RollbackException;
import io.evitadb.api.query.QueryConstraints;
Expand Down Expand Up @@ -1171,7 +1172,8 @@ void shouldBackupAndRestoreCatalogDuringHeavyParallelIndexing(EvitaContract orig
assertTrue(backupFilePath.toFile().exists(), "Backup file does not exist!");

final String restoredCatalogName = TEST_CATALOG + "_restored";
evita.restoreCatalog(restoredCatalogName, Files.size(backupFilePath), Files.newInputStream(backupFilePath));
evita.restoreCatalog(restoredCatalogName, Files.size(backupFilePath), Files.newInputStream(backupFilePath))
.get(2, TimeUnit.MINUTES);

final long originalCatalogVersion = evita.queryCatalog(
TEST_CATALOG,
Expand Down Expand Up @@ -1245,6 +1247,13 @@ void shouldCorrectlyRotateAllFiles(GenerationalTestInput input) throws Exception
.queryTimeoutInMilliseconds(-1)
.transactionTimeoutInMilliseconds(-1)
.closeSessionsAfterSecondsOfInactivity(-1)
.transactionThreadPool(
ThreadPoolOptions.transactionThreadPoolBuilder()
.threadPriority(Thread.MAX_PRIORITY)
.maxThreadCount(16)
.queueSize(16_384)
.build()
)
.build()
)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import io.evitadb.store.spi.model.storageParts.index.CatalogIndexStoragePart;
import io.evitadb.store.spi.model.storageParts.index.GlobalUniqueIndexStoragePart;
import io.evitadb.store.wal.CatalogWriteAheadLog;
import io.evitadb.store.wal.CatalogWriteAheadLog.WalPurgeCallback;
import io.evitadb.store.wal.WalKryoConfigurer;
import io.evitadb.utils.ArrayUtils;
import io.evitadb.utils.Assert;
Expand Down Expand Up @@ -137,7 +138,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -589,7 +589,7 @@ private static CatalogWriteAheadLog getCatalogWriteAheadLog(
@Nonnull TransactionOptions transactionOptions,
@Nonnull Scheduler scheduler,
@Nonnull Consumer<OffsetDateTime> bootstrapFileTrimFunction,
@Nonnull LongConsumer onWalPurgeCallback
@Nonnull Supplier<WalPurgeCallback> onWalPurgeCallback
) {
WalFileReference currentWalFileRef = catalogHeader.walFileReference();
if (catalogHeader.catalogState() == CatalogState.ALIVE && currentWalFileRef == null) {
Expand Down Expand Up @@ -624,7 +624,7 @@ private static CatalogWriteAheadLog getCatalogWriteAheadLog(
walFileReference -> new CatalogWriteAheadLog(
catalogVersion, catalogName, catalogStoragePath, catalogKryoPool,
storageOptions, transactionOptions, scheduler,
bootstrapFileTrimFunction, onWalPurgeCallback
bootstrapFileTrimFunction, onWalPurgeCallback.get()
)
)
.orElse(null);
Expand All @@ -650,7 +650,7 @@ private static CatalogWriteAheadLog createWalIfAnyWalFilePresent(
@Nonnull TransactionOptions transactionOptions,
@Nonnull Scheduler scheduler,
@Nonnull Consumer<OffsetDateTime> bootstrapFileTrimFunction,
@Nonnull LongConsumer onWalPurgeCallback,
@Nonnull Supplier<WalPurgeCallback> onWalPurgeCallback,
@Nonnull Path catalogFilePath,
@Nonnull Pool<Kryo> kryoPool
) {
Expand All @@ -662,7 +662,7 @@ private static CatalogWriteAheadLog createWalIfAnyWalFilePresent(
new CatalogWriteAheadLog(
catalogVersion, catalogName, catalogFilePath, kryoPool,
storageOptions, transactionOptions, scheduler,
bootstrapFileTrimFunction, onWalPurgeCallback
bootstrapFileTrimFunction, onWalPurgeCallback.get()
);
}

Expand Down Expand Up @@ -816,8 +816,10 @@ public DefaultCatalogPersistenceService(
this.catalogWal = createWalIfAnyWalFilePresent(
catalogVersion, catalogName,
storageOptions, transactionOptions, scheduler,
this::trimBootstrapFile, this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged,
this.catalogStoragePath, this.catalogKryoPool
this::trimBootstrapFile,
this.obsoleteFileMaintainer::createWalPurgeCallback,
this.catalogStoragePath,
this.catalogKryoPool
);

final String catalogFileName = getCatalogDataStoreFileName(catalogName, lastCatalogBootstrap.catalogFileIndex());
Expand Down Expand Up @@ -895,7 +897,7 @@ public DefaultCatalogPersistenceService(
final long catalogVersion = this.bootstrapUsed.catalogVersion();
this.catalogWal = createWalIfAnyWalFilePresent(
catalogVersion, catalogName, storageOptions, transactionOptions, scheduler,
this::trimBootstrapFile, this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged,
this::trimBootstrapFile, this.obsoleteFileMaintainer::createWalPurgeCallback,
this.catalogStoragePath, this.catalogKryoPool
);

Expand Down Expand Up @@ -1385,7 +1387,7 @@ public long appendWalAndDiscard(
this.bootstrapUsed.catalogVersion(), this.catalogName, this.catalogStoragePath, catalogHeader, this.catalogKryoPool,
this.storageOptions, this.transactionOptions, this.scheduler,
this::trimBootstrapFile,
this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged
this.obsoleteFileMaintainer::createWalPurgeCallback
);
}
Assert.isPremiseValid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.evitadb.store.spi.CatalogPersistenceService.EntityTypePrimaryKeyAndFileIndex;
import io.evitadb.store.spi.model.CatalogHeader;
import io.evitadb.store.spi.model.reference.CollectionFileReference;
import io.evitadb.store.wal.CatalogWriteAheadLog.WalPurgeCallback;
import io.evitadb.utils.Assert;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
Expand All @@ -42,7 +44,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -53,6 +54,7 @@
import static io.evitadb.store.spi.CatalogPersistenceService.ENTITY_COLLECTION_FILE_SUFFIX;
import static io.evitadb.store.spi.CatalogPersistenceService.getEntityPrimaryKeyAndIndexFromEntityCollectionFileName;
import static io.evitadb.store.spi.CatalogPersistenceService.getIndexFromCatalogFileName;
import static java.util.Optional.ofNullable;

/**
* This class is responsible for clearing all the files that were made obsolete either by deleting or renaming to
Expand Down Expand Up @@ -169,70 +171,10 @@ public void removeFileWhenNotUsed(
}
}

/**
* Method synchronously removes all files which indexes are lower than the indexes mentioned in {@link CatalogHeader}
* of the currently first available catalog version. This method is called only when time travel is enabled.
*
* @param catalogVersion the first available catalog version
*/
public void firstAvailableCatalogVersionChanged(long catalogVersion) {
if (this.timeTravelEnabled) {
final DataFilesBulkInfo dataFilesBulkInfo = Optional.ofNullable(this.dataFilesInfoFetcher.apply(catalogVersion))
.orElseThrow(() -> new GenericEvitaInternalError(
"Catalog bootstrap record and header for the catalog version `" + catalogVersion + "` " +
"are not available. Cannot purge obsolete files."
)
);

final int firstUsedCatalogDataFileIndex = dataFilesBulkInfo.bootstrapRecord().catalogFileIndex();
final Map<Integer, Integer> entityFileIndex = dataFilesBulkInfo
.catalogHeader()
.getEntityTypeFileIndexes()
.stream()
.collect(
Collectors.toMap(
CollectionFileReference::entityTypePrimaryKey,
CollectionFileReference::fileIndex
)
);

Arrays.stream(
this.catalogStoragePath.toFile()
.listFiles((dir, name) -> name.endsWith(CATALOG_FILE_SUFFIX))
)
.filter(file -> getIndexFromCatalogFileName(file.getName()) < firstUsedCatalogDataFileIndex)
.forEach(file -> {
if (file.delete()) {
log.info("Deleted obsolete catalog file `{}`", file.getAbsolutePath());
} else {
log.warn("Could not delete obsolete catalog file `{}`", file.getAbsolutePath());
}
});

Arrays.stream(
this.catalogStoragePath.toFile()
.listFiles((dir, name) -> name.endsWith(ENTITY_COLLECTION_FILE_SUFFIX))
)
.filter(file -> {
final EntityTypePrimaryKeyAndFileIndex result = getEntityPrimaryKeyAndIndexFromEntityCollectionFileName(file.getName());
final Integer firstUsedEntityFileIndex = entityFileIndex.get(result.entityTypePrimaryKey());
return firstUsedEntityFileIndex == null || result.fileIndex() < firstUsedEntityFileIndex;
})
.forEach(file -> {
if (file.delete()) {
log.info("Deleted obsolete entity collection file `{}`", file.getAbsolutePath());
} else {
log.warn("Could not delete entity collection file `{}`", file.getAbsolutePath());
}
});
}
}

/**
* Updates the catalog version that is no longer used by any active session and plans the purge task for
* asynchronous file removal. This method does nothing when time travel is enabled, because the files are removed
* when WAL files are removed and this logic is executed in {@link #firstAvailableCatalogVersionChanged(long)}
* method.
* when WAL files are removed and this logic is executed in {@link ObsoleteWalPurgeCallback} callback.
*
* @param minimalActiveCatalogVersion the minimal catalog version that is still being used, NULL when there is no
* active session
Expand Down Expand Up @@ -267,6 +209,21 @@ public void close() {
this.maintainedFiles.clear();
}

/**
* Creates the WAL purge callback that is used to remove all files that are no longer used. The callback is used
* when the WAL history is purged.
*
* @return the WAL purge callback
*/
@Nonnull
public WalPurgeCallback createWalPurgeCallback() {
if (this.timeTravelEnabled) {
return new ObsoleteWalPurgeCallback(this.catalogStoragePath, this.dataFilesInfoFetcher);
} else {
return WalPurgeCallback.NO_OP;
}
}

/**
* Method is called from {@link #purgeTask} to remove all files that are no longer used. The method iterates over
* all maintained files and removes the files whose catalog version is less or equal to the last catalog version
Expand Down Expand Up @@ -321,4 +278,72 @@ public record DataFilesBulkInfo(

}

/**
* Callback synchronously removes all files which indexes are lower than the indexes mentioned in {@link CatalogHeader}
* of the currently first available catalog version. This callback is used only when time travel is enabled.
*/
@RequiredArgsConstructor
private static class ObsoleteWalPurgeCallback implements WalPurgeCallback {
/**
* Folder where the catalog files are stored.
*/
private final Path catalogStoragePath;
/**
* The supplier of the catalog header for the specified catalog version.
*/
private final LongFunction<DataFilesBulkInfo> dataFilesInfoFetcher;


@Override
public void purgeFilesUpTo(long firstActiveCatalogVersion) {
final DataFilesBulkInfo activeFiles = ofNullable(this.dataFilesInfoFetcher.apply(firstActiveCatalogVersion))
.orElseThrow(
() -> new GenericEvitaInternalError(
"Catalog bootstrap record and header for the catalog version `" + firstActiveCatalogVersion + "` " +
"are not available. Cannot purge obsolete files."
)
);
final int firstUsedCatalogDataFileIndex = activeFiles.bootstrapRecord().catalogFileIndex();
final Map<Integer, Integer> entityFileIndex = activeFiles
.catalogHeader()
.getEntityTypeFileIndexes()
.stream()
.collect(
Collectors.toMap(
CollectionFileReference::entityTypePrimaryKey,
CollectionFileReference::fileIndex
)
);

Arrays.stream(
this.catalogStoragePath.toFile()
.listFiles((dir, name) -> name.endsWith(CATALOG_FILE_SUFFIX))
)
.filter(file -> getIndexFromCatalogFileName(file.getName()) < firstUsedCatalogDataFileIndex)
.forEach(file -> {
if (file.delete()) {
log.info("Deleted obsolete catalog file `{}`", file.getAbsolutePath());
} else {
log.warn("Could not delete obsolete catalog file `{}`", file.getAbsolutePath());
}
});

Arrays.stream(
this.catalogStoragePath.toFile()
.listFiles((dir, name) -> name.endsWith(ENTITY_COLLECTION_FILE_SUFFIX))
)
.filter(file -> {
final EntityTypePrimaryKeyAndFileIndex result = getEntityPrimaryKeyAndIndexFromEntityCollectionFileName(file.getName());
final Integer firstUsedEntityFileIndex = entityFileIndex.get(result.entityTypePrimaryKey());
return firstUsedEntityFileIndex == null || result.fileIndex() < firstUsedEntityFileIndex;
})
.forEach(file -> {
if (file.delete()) {
log.info("Deleted obsolete entity collection file `{}`", file.getAbsolutePath());
} else {
log.warn("Could not delete entity collection file `{}`", file.getAbsolutePath());
}
});
}
}
}
Loading

0 comments on commit de2e49b

Please sign in to comment.