From de2e49ba237334c4ad4a97ed8fccd6387e87c417 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Novotn=C3=BD?= Date: Thu, 20 Jun 2024 10:26:19 +0200 Subject: [PATCH] feat(#41): corrected file removal and bootstrap trimming --- .../main/java/io/evitadb/core/Catalog.java | 8 +- ...ogSnapshotPropagationTransactionStage.java | 4 +- .../api/EvitaTransactionalFunctionalTest.java | 11 +- .../DefaultCatalogPersistenceService.java | 20 +-- .../store/catalog/ObsoleteFileMaintainer.java | 149 ++++++++++-------- .../store/wal/CatalogWriteAheadLog.java | 115 ++++++++------ 6 files changed, 186 insertions(+), 121 deletions(-) diff --git a/evita_engine/src/main/java/io/evitadb/core/Catalog.java b/evita_engine/src/main/java/io/evitadb/core/Catalog.java index 579281e61..938a44635 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Catalog.java +++ b/evita_engine/src/main/java/io/evitadb/core/Catalog.java @@ -1221,10 +1221,10 @@ public void notifyCatalogPresentInLiveView() { this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer ); - while (current != null) { + while (current != null && !current.isClosed()) { //noinspection unchecked final List> subscribers = (List>) current.getSubscribers(); - Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!"); + Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!"); for (Subscriber subscriber : subscribers) { if (subscriber instanceof AbstractTransactionStage stage) { stage.updateCatalogReference(this); @@ -1322,10 +1322,10 @@ private TrunkIncorporationTransactionStage getTrunkIncorporationStage() { this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer ); - while (current != null) { + while (current != null && !current.isClosed()) { //noinspection unchecked final List> subscribers = (List>) current.getSubscribers(); - Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!"); + Assert.isPremiseValid(current.isClosed() || subscribers.size() == 1, "Only one subscriber is expected, " + subscribers.size() + " found!"); for (Subscriber subscriber : subscribers) { if (subscriber instanceof TrunkIncorporationTransactionStage stage) { return stage; diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/CatalogSnapshotPropagationTransactionStage.java b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/CatalogSnapshotPropagationTransactionStage.java index 72887793d..992c66569 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/CatalogSnapshotPropagationTransactionStage.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/CatalogSnapshotPropagationTransactionStage.java @@ -103,7 +103,9 @@ public final void onNext(UpdatedCatalogTransactionTask task) { } } catch (Throwable ex) { log.error("Error while processing snapshot propagating task for catalog `" + catalogName + "`!", ex); - task.future().completeExceptionally(ex); + if (task.future() != null) { + task.future().completeExceptionally(ex); + } } // emit the event diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java index 41f9cab55..c3189f40f 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java @@ -31,6 +31,7 @@ import io.evitadb.api.configuration.EvitaConfiguration; import io.evitadb.api.configuration.ServerOptions; import io.evitadb.api.configuration.StorageOptions; +import io.evitadb.api.configuration.ThreadPoolOptions; import io.evitadb.api.configuration.TransactionOptions; import io.evitadb.api.exception.RollbackException; import io.evitadb.api.query.QueryConstraints; @@ -1171,7 +1172,8 @@ void shouldBackupAndRestoreCatalogDuringHeavyParallelIndexing(EvitaContract orig assertTrue(backupFilePath.toFile().exists(), "Backup file does not exist!"); final String restoredCatalogName = TEST_CATALOG + "_restored"; - evita.restoreCatalog(restoredCatalogName, Files.size(backupFilePath), Files.newInputStream(backupFilePath)); + evita.restoreCatalog(restoredCatalogName, Files.size(backupFilePath), Files.newInputStream(backupFilePath)) + .get(2, TimeUnit.MINUTES); final long originalCatalogVersion = evita.queryCatalog( TEST_CATALOG, @@ -1245,6 +1247,13 @@ void shouldCorrectlyRotateAllFiles(GenerationalTestInput input) throws Exception .queryTimeoutInMilliseconds(-1) .transactionTimeoutInMilliseconds(-1) .closeSessionsAfterSecondsOfInactivity(-1) + .transactionThreadPool( + ThreadPoolOptions.transactionThreadPoolBuilder() + .threadPriority(Thread.MAX_PRIORITY) + .maxThreadCount(16) + .queueSize(16_384) + .build() + ) .build() ) .build() diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java index 7f3bc8a04..b06da4b21 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/DefaultCatalogPersistenceService.java @@ -107,6 +107,7 @@ import io.evitadb.store.spi.model.storageParts.index.CatalogIndexStoragePart; import io.evitadb.store.spi.model.storageParts.index.GlobalUniqueIndexStoragePart; import io.evitadb.store.wal.CatalogWriteAheadLog; +import io.evitadb.store.wal.CatalogWriteAheadLog.WalPurgeCallback; import io.evitadb.store.wal.WalKryoConfigurer; import io.evitadb.utils.ArrayUtils; import io.evitadb.utils.Assert; @@ -137,7 +138,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntConsumer; -import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.regex.Matcher; @@ -589,7 +589,7 @@ private static CatalogWriteAheadLog getCatalogWriteAheadLog( @Nonnull TransactionOptions transactionOptions, @Nonnull Scheduler scheduler, @Nonnull Consumer bootstrapFileTrimFunction, - @Nonnull LongConsumer onWalPurgeCallback + @Nonnull Supplier onWalPurgeCallback ) { WalFileReference currentWalFileRef = catalogHeader.walFileReference(); if (catalogHeader.catalogState() == CatalogState.ALIVE && currentWalFileRef == null) { @@ -624,7 +624,7 @@ private static CatalogWriteAheadLog getCatalogWriteAheadLog( walFileReference -> new CatalogWriteAheadLog( catalogVersion, catalogName, catalogStoragePath, catalogKryoPool, storageOptions, transactionOptions, scheduler, - bootstrapFileTrimFunction, onWalPurgeCallback + bootstrapFileTrimFunction, onWalPurgeCallback.get() ) ) .orElse(null); @@ -650,7 +650,7 @@ private static CatalogWriteAheadLog createWalIfAnyWalFilePresent( @Nonnull TransactionOptions transactionOptions, @Nonnull Scheduler scheduler, @Nonnull Consumer bootstrapFileTrimFunction, - @Nonnull LongConsumer onWalPurgeCallback, + @Nonnull Supplier onWalPurgeCallback, @Nonnull Path catalogFilePath, @Nonnull Pool kryoPool ) { @@ -662,7 +662,7 @@ private static CatalogWriteAheadLog createWalIfAnyWalFilePresent( new CatalogWriteAheadLog( catalogVersion, catalogName, catalogFilePath, kryoPool, storageOptions, transactionOptions, scheduler, - bootstrapFileTrimFunction, onWalPurgeCallback + bootstrapFileTrimFunction, onWalPurgeCallback.get() ); } @@ -816,8 +816,10 @@ public DefaultCatalogPersistenceService( this.catalogWal = createWalIfAnyWalFilePresent( catalogVersion, catalogName, storageOptions, transactionOptions, scheduler, - this::trimBootstrapFile, this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged, - this.catalogStoragePath, this.catalogKryoPool + this::trimBootstrapFile, + this.obsoleteFileMaintainer::createWalPurgeCallback, + this.catalogStoragePath, + this.catalogKryoPool ); final String catalogFileName = getCatalogDataStoreFileName(catalogName, lastCatalogBootstrap.catalogFileIndex()); @@ -895,7 +897,7 @@ public DefaultCatalogPersistenceService( final long catalogVersion = this.bootstrapUsed.catalogVersion(); this.catalogWal = createWalIfAnyWalFilePresent( catalogVersion, catalogName, storageOptions, transactionOptions, scheduler, - this::trimBootstrapFile, this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged, + this::trimBootstrapFile, this.obsoleteFileMaintainer::createWalPurgeCallback, this.catalogStoragePath, this.catalogKryoPool ); @@ -1385,7 +1387,7 @@ public long appendWalAndDiscard( this.bootstrapUsed.catalogVersion(), this.catalogName, this.catalogStoragePath, catalogHeader, this.catalogKryoPool, this.storageOptions, this.transactionOptions, this.scheduler, this::trimBootstrapFile, - this.obsoleteFileMaintainer::firstAvailableCatalogVersionChanged + this.obsoleteFileMaintainer::createWalPurgeCallback ); } Assert.isPremiseValid( diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/ObsoleteFileMaintainer.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/ObsoleteFileMaintainer.java index 63ff2beda..629368187 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/ObsoleteFileMaintainer.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/catalog/ObsoleteFileMaintainer.java @@ -31,7 +31,9 @@ import io.evitadb.store.spi.CatalogPersistenceService.EntityTypePrimaryKeyAndFileIndex; import io.evitadb.store.spi.model.CatalogHeader; import io.evitadb.store.spi.model.reference.CollectionFileReference; +import io.evitadb.store.wal.CatalogWriteAheadLog.WalPurgeCallback; import io.evitadb.utils.Assert; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; @@ -42,7 +44,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -53,6 +54,7 @@ import static io.evitadb.store.spi.CatalogPersistenceService.ENTITY_COLLECTION_FILE_SUFFIX; import static io.evitadb.store.spi.CatalogPersistenceService.getEntityPrimaryKeyAndIndexFromEntityCollectionFileName; import static io.evitadb.store.spi.CatalogPersistenceService.getIndexFromCatalogFileName; +import static java.util.Optional.ofNullable; /** * This class is responsible for clearing all the files that were made obsolete either by deleting or renaming to @@ -169,70 +171,10 @@ public void removeFileWhenNotUsed( } } - /** - * Method synchronously removes all files which indexes are lower than the indexes mentioned in {@link CatalogHeader} - * of the currently first available catalog version. This method is called only when time travel is enabled. - * - * @param catalogVersion the first available catalog version - */ - public void firstAvailableCatalogVersionChanged(long catalogVersion) { - if (this.timeTravelEnabled) { - final DataFilesBulkInfo dataFilesBulkInfo = Optional.ofNullable(this.dataFilesInfoFetcher.apply(catalogVersion)) - .orElseThrow(() -> new GenericEvitaInternalError( - "Catalog bootstrap record and header for the catalog version `" + catalogVersion + "` " + - "are not available. Cannot purge obsolete files." - ) - ); - - final int firstUsedCatalogDataFileIndex = dataFilesBulkInfo.bootstrapRecord().catalogFileIndex(); - final Map entityFileIndex = dataFilesBulkInfo - .catalogHeader() - .getEntityTypeFileIndexes() - .stream() - .collect( - Collectors.toMap( - CollectionFileReference::entityTypePrimaryKey, - CollectionFileReference::fileIndex - ) - ); - - Arrays.stream( - this.catalogStoragePath.toFile() - .listFiles((dir, name) -> name.endsWith(CATALOG_FILE_SUFFIX)) - ) - .filter(file -> getIndexFromCatalogFileName(file.getName()) < firstUsedCatalogDataFileIndex) - .forEach(file -> { - if (file.delete()) { - log.info("Deleted obsolete catalog file `{}`", file.getAbsolutePath()); - } else { - log.warn("Could not delete obsolete catalog file `{}`", file.getAbsolutePath()); - } - }); - - Arrays.stream( - this.catalogStoragePath.toFile() - .listFiles((dir, name) -> name.endsWith(ENTITY_COLLECTION_FILE_SUFFIX)) - ) - .filter(file -> { - final EntityTypePrimaryKeyAndFileIndex result = getEntityPrimaryKeyAndIndexFromEntityCollectionFileName(file.getName()); - final Integer firstUsedEntityFileIndex = entityFileIndex.get(result.entityTypePrimaryKey()); - return firstUsedEntityFileIndex == null || result.fileIndex() < firstUsedEntityFileIndex; - }) - .forEach(file -> { - if (file.delete()) { - log.info("Deleted obsolete entity collection file `{}`", file.getAbsolutePath()); - } else { - log.warn("Could not delete entity collection file `{}`", file.getAbsolutePath()); - } - }); - } - } - /** * Updates the catalog version that is no longer used by any active session and plans the purge task for * asynchronous file removal. This method does nothing when time travel is enabled, because the files are removed - * when WAL files are removed and this logic is executed in {@link #firstAvailableCatalogVersionChanged(long)} - * method. + * when WAL files are removed and this logic is executed in {@link ObsoleteWalPurgeCallback} callback. * * @param minimalActiveCatalogVersion the minimal catalog version that is still being used, NULL when there is no * active session @@ -267,6 +209,21 @@ public void close() { this.maintainedFiles.clear(); } + /** + * Creates the WAL purge callback that is used to remove all files that are no longer used. The callback is used + * when the WAL history is purged. + * + * @return the WAL purge callback + */ + @Nonnull + public WalPurgeCallback createWalPurgeCallback() { + if (this.timeTravelEnabled) { + return new ObsoleteWalPurgeCallback(this.catalogStoragePath, this.dataFilesInfoFetcher); + } else { + return WalPurgeCallback.NO_OP; + } + } + /** * Method is called from {@link #purgeTask} to remove all files that are no longer used. The method iterates over * all maintained files and removes the files whose catalog version is less or equal to the last catalog version @@ -321,4 +278,72 @@ public record DataFilesBulkInfo( } + /** + * Callback synchronously removes all files which indexes are lower than the indexes mentioned in {@link CatalogHeader} + * of the currently first available catalog version. This callback is used only when time travel is enabled. + */ + @RequiredArgsConstructor + private static class ObsoleteWalPurgeCallback implements WalPurgeCallback { + /** + * Folder where the catalog files are stored. + */ + private final Path catalogStoragePath; + /** + * The supplier of the catalog header for the specified catalog version. + */ + private final LongFunction dataFilesInfoFetcher; + + + @Override + public void purgeFilesUpTo(long firstActiveCatalogVersion) { + final DataFilesBulkInfo activeFiles = ofNullable(this.dataFilesInfoFetcher.apply(firstActiveCatalogVersion)) + .orElseThrow( + () -> new GenericEvitaInternalError( + "Catalog bootstrap record and header for the catalog version `" + firstActiveCatalogVersion + "` " + + "are not available. Cannot purge obsolete files." + ) + ); + final int firstUsedCatalogDataFileIndex = activeFiles.bootstrapRecord().catalogFileIndex(); + final Map entityFileIndex = activeFiles + .catalogHeader() + .getEntityTypeFileIndexes() + .stream() + .collect( + Collectors.toMap( + CollectionFileReference::entityTypePrimaryKey, + CollectionFileReference::fileIndex + ) + ); + + Arrays.stream( + this.catalogStoragePath.toFile() + .listFiles((dir, name) -> name.endsWith(CATALOG_FILE_SUFFIX)) + ) + .filter(file -> getIndexFromCatalogFileName(file.getName()) < firstUsedCatalogDataFileIndex) + .forEach(file -> { + if (file.delete()) { + log.info("Deleted obsolete catalog file `{}`", file.getAbsolutePath()); + } else { + log.warn("Could not delete obsolete catalog file `{}`", file.getAbsolutePath()); + } + }); + + Arrays.stream( + this.catalogStoragePath.toFile() + .listFiles((dir, name) -> name.endsWith(ENTITY_COLLECTION_FILE_SUFFIX)) + ) + .filter(file -> { + final EntityTypePrimaryKeyAndFileIndex result = getEntityPrimaryKeyAndIndexFromEntityCollectionFileName(file.getName()); + final Integer firstUsedEntityFileIndex = entityFileIndex.get(result.entityTypePrimaryKey()); + return firstUsedEntityFileIndex == null || result.fileIndex() < firstUsedEntityFileIndex; + }) + .forEach(file -> { + if (file.delete()) { + log.info("Deleted obsolete entity collection file `{}`", file.getAbsolutePath()); + } else { + log.warn("Could not delete entity collection file `{}`", file.getAbsolutePath()); + } + }); + } + } } diff --git a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java index 6ce93d433..c880749a3 100644 --- a/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java +++ b/evita_store/evita_store_server/src/main/java/io/evitadb/store/wal/CatalogWriteAheadLog.java @@ -96,7 +96,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.LongConsumer; +import java.util.function.Supplier; import java.util.stream.Stream; import static io.evitadb.store.spi.CatalogPersistenceService.WAL_FILE_SUFFIX; @@ -212,6 +212,14 @@ public class CatalogWriteAheadLog implements Closeable { * records in them were not yet processed. */ private final List pendingRemovals = new CopyOnWriteArrayList<>(); + /** + * Callback to be called when the WAL file is purged. + */ + private final WalPurgeCallback onWalPurgeCallback; + /** + * Contains the last processed catalog version that analysed and added to pending removals. + */ + private long purgeProcessedCatalogVersion; /** * The index of the WAL file incremented each time the WAL file is rotated. */ @@ -234,10 +242,6 @@ public class CatalogWriteAheadLog implements Closeable { * Field contains current size of the WAL file the records are appended to in Bytes. */ private long currentWalFileSize; - /** - * Callback to be called when the WAL file is purged. - */ - private final LongConsumer onWalPurgeCallback; /** * Method checks if the WAL file is complete and if not, it truncates it. The non-complete WAL file record is @@ -387,6 +391,7 @@ static FirstAndLastCatalogVersions checkAndTruncate(@Nonnull String catalogName, /** * Returns the first and last catalog versions found in the given WAL file. + * * @param walFile the WAL file to read from * @return the first and last catalog versions found in the WAL file */ @@ -541,7 +546,7 @@ public CatalogWriteAheadLog( @Nonnull TransactionOptions transactionOptions, @Nonnull Scheduler scheduler, @Nonnull Consumer bootstrapFileTrimmer, - @Nonnull LongConsumer onWalPurgeCallback + @Nonnull WalPurgeCallback onWalPurgeCallback ) { this.processedCatalogVersion = new AtomicLong(catalogVersion); final int[] firstAndLastWalFileIndex = getFirstAndLastWalFileIndex(catalogStoragePath, catalogName); @@ -602,34 +607,6 @@ public CatalogWriteAheadLog( } } - /** - * Removes the obsolete WAL files from the catalog storage path. - */ - private long removeWalFiles() { - synchronized (this.pendingRemovals) { - final long catalogVersion = this.processedCatalogVersion.get(); - final Set toRemove = new HashSet<>(64); - - long lastPuredCatalogVersion = -1; - for (PendingRemoval pendingRemoval : this.pendingRemovals) { - if (pendingRemoval.catalogVersion <= catalogVersion) { - pendingRemoval.runnable().run(); - toRemove.add(pendingRemoval); - lastPuredCatalogVersion = pendingRemoval.catalogVersion; - } else { - break; - } - } - this.pendingRemovals.removeAll(toRemove); - // call the listener to remove the obsolete files - if (lastPuredCatalogVersion > -1) { - this.onWalPurgeCallback.accept(lastPuredCatalogVersion + 1); - } - - return -1; - } - } - /** * Method for internal use - allows emitting start events when observability facilities are already initialized. * If we didn't postpone this initialization, events would become lost. @@ -1051,6 +1028,40 @@ MutationSupplier createSupplier(long catalogVersion, boolean avoidPartiallyFille } } + /** + * Removes the obsolete WAL files from the catalog storage path. + */ + private long removeWalFiles() { + synchronized (this.pendingRemovals) { + final long catalogVersion = this.processedCatalogVersion.get(); + final Set toRemove = new HashSet<>(64); + + long firstCatalogVersionToBeKept = -1; + OffsetDateTime firstCommitTimestamp = null; + for (PendingRemoval pendingRemoval : this.pendingRemovals) { + if (pendingRemoval.catalogVersion() <= catalogVersion) { + final TransactionMutation firstTxMutationFromRemovedFile = pendingRemoval.removeLambda().get(); + toRemove.add(pendingRemoval); + if (pendingRemoval.catalogVersion() > firstCatalogVersionToBeKept) { + firstCatalogVersionToBeKept = pendingRemoval.catalogVersion(); + firstCommitTimestamp = firstTxMutationFromRemovedFile.getCommitTimestamp(); + } + } else { + break; + } + } + this.pendingRemovals.removeAll(toRemove); + // call the listener to remove the obsolete files + if (firstCatalogVersionToBeKept > -1) { + this.onWalPurgeCallback.purgeFilesUpTo(firstCatalogVersionToBeKept); + } + // now trim the bootstrap record file + this.bootstrapFileTrimmer.accept(firstCommitTimestamp); + + return -1; + } + } + /** * Finds the index of a write-ahead log (WAL) file associated with a given catalog version. * @@ -1229,8 +1240,9 @@ private void rotateWalFile() { try { final FirstAndLastCatalogVersions versionsFromWalFile = getFirstAndLastCatalogVersionsFromWalFile(walFile); final PendingRemoval pendingRemoval = new PendingRemoval( - versionsFromWalFile.lastCatalogVersion(), + versionsFromWalFile.lastCatalogVersion() + 1, () -> { + final TransactionMutation firstTransactionMutation = getFirstTransactionMutationFromWalFile(walFile); try { if (walFile.delete()) { log.info("Deleted WAL file `" + walFile + "`!"); @@ -1241,6 +1253,7 @@ private void rotateWalFile() { } catch (Exception ex) { log.error("Failed to delete WAL file `" + walFile + "`!", ex); } + return firstTransactionMutation; } ); if (!this.pendingRemovals.contains(pendingRemoval)) { @@ -1252,13 +1265,6 @@ private void rotateWalFile() { // the file was deleted in the meantime } } - - // now check the date and time of the leading transaction of the oldest WAL file - final TransactionMutation firstMutation = getFirstTransactionMutationFromWalFile( - walFiles[walFiles.length - this.walFileCountKept] - ); - firstCommitTimestamp = firstMutation.getCommitTimestamp(); - this.bootstrapFileTrimmer.accept(firstCommitTimestamp); } } catch (IOException e) { @@ -1363,6 +1369,25 @@ private void updateCacheSize() { ).commit(); } + /** + * Interface that allows to look up for the active files for the given catalog version and to remove the files up to + * the given active files. + */ + public interface WalPurgeCallback { + + WalPurgeCallback NO_OP = activeFiles -> { + // do nothing + }; + + /** + * Purges the files up to the given active files. + * + * @param firstActiveCatalogVersion the first catalog version that needs to be kept + */ + void purgeFilesUpTo(long firstActiveCatalogVersion); + + } + /** * Contains first and last catalog versions found in current WAL file. * @@ -1377,12 +1402,14 @@ record FirstAndLastCatalogVersions( /** * Record that holds information about pending removal of the WAL file. + * * @param catalogVersion the catalog version that needs to be processed before the removal - * @param runnable the runnable that performs the file removal + * @param removeLambda the removeLambda that performs the file removal + * and returns first transaction mutation in the removed file */ private record PendingRemoval( long catalogVersion, - @Nonnull Runnable runnable + @Nonnull Supplier removeLambda ) { @Override