diff --git a/evita_api/src/main/java/io/evitadb/api/configuration/TransactionOptions.java b/evita_api/src/main/java/io/evitadb/api/configuration/TransactionOptions.java index 385eba4bc..b27a7a4a2 100644 --- a/evita_api/src/main/java/io/evitadb/api/configuration/TransactionOptions.java +++ b/evita_api/src/main/java/io/evitadb/api/configuration/TransactionOptions.java @@ -49,9 +49,6 @@ * that the buffer will be full and will have to be copied to the disk. * @param walFileSizeBytes Size of the Write-Ahead Log (WAL) file in bytes before it is rotated. * @param walFileCountKept Number of WAL files to keep. - * @param maxQueueSize Size of the catalog queue for parallel transaction. If there are more - * transaction than the number of free threads in the pool, the transaction - * are queued. If the queue is full, the transaction is rejected. * @param flushFrequencyInMillis The frequency of flushing the transactional data to the disk when they * are sequentially processed. If database process the (small) transaction * very quickly, it may decide to process next transaction before flushing @@ -66,7 +63,6 @@ public record TransactionOptions( int transactionMemoryRegionCount, long walFileSizeBytes, int walFileCountKept, - int maxQueueSize, long flushFrequencyInMillis ) { public static final Path DEFAULT_TX_DIRECTORY = Paths.get(System.getProperty("java.io.tmpdir"), "evita/transaction"); @@ -74,7 +70,6 @@ public record TransactionOptions( public static final int DEFAULT_TRANSACTION_MEMORY_REGION_COUNT = 256; public static final int DEFAULT_WAL_SIZE_BYTES = 16_777_216; public static final int DEFAULT_WAL_FILE_COUNT_KEPT = 8; - public static final int DEFAULT_MAX_QUEUE_SIZE = 1_024; public static final int DEFAULT_FLUSH_FREQUENCY = 1_000; /** @@ -87,7 +82,6 @@ public static TransactionOptions temporary() { 32, 8_388_608, 1, - 16, 100 ); } @@ -113,7 +107,6 @@ public TransactionOptions() { DEFAULT_TRANSACTION_MEMORY_REGION_COUNT, DEFAULT_WAL_SIZE_BYTES, DEFAULT_WAL_FILE_COUNT_KEPT, - DEFAULT_MAX_QUEUE_SIZE, DEFAULT_FLUSH_FREQUENCY ); } @@ -124,7 +117,6 @@ public TransactionOptions( int transactionMemoryRegionCount, long walFileSizeBytes, int walFileCountKept, - int maxQueueSize, long flushFrequencyInMillis ) { this.transactionWorkDirectory = Optional.ofNullable(transactionWorkDirectory).orElse(DEFAULT_TX_DIRECTORY); @@ -132,7 +124,6 @@ public TransactionOptions( this.transactionMemoryRegionCount = transactionMemoryRegionCount; this.walFileSizeBytes = walFileSizeBytes; this.walFileCountKept = walFileCountKept; - this.maxQueueSize = maxQueueSize; this.flushFrequencyInMillis = flushFrequencyInMillis; } @@ -146,7 +137,6 @@ public static class Builder { private int transactionMemoryRegionCount = DEFAULT_TRANSACTION_MEMORY_REGION_COUNT; private long walFileSizeBytes = DEFAULT_WAL_SIZE_BYTES; private int walFileCountKept = DEFAULT_WAL_FILE_COUNT_KEPT; - private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; private long flushFrequency = DEFAULT_FLUSH_FREQUENCY; Builder() { @@ -158,7 +148,6 @@ public static class Builder { this.transactionMemoryRegionCount = TransactionOptions.transactionMemoryRegionCount; this.walFileSizeBytes = TransactionOptions.walFileSizeBytes; this.walFileCountKept = TransactionOptions.walFileCountKept; - this.maxQueueSize = TransactionOptions.maxQueueSize; this.flushFrequency = TransactionOptions.flushFrequencyInMillis; } @@ -192,12 +181,6 @@ public TransactionOptions.Builder walFileCountKept(int walFileCountKept) { return this; } - @Nonnull - public TransactionOptions.Builder maxQueueSize(int maxQueueSize) { - this.maxQueueSize = maxQueueSize; - return this; - } - @Nonnull public TransactionOptions.Builder flushFrequency(long flushFrequency) { this.flushFrequency = flushFrequency; @@ -212,7 +195,6 @@ public TransactionOptions build() { transactionMemoryRegionCount, walFileSizeBytes, walFileCountKept, - maxQueueSize, flushFrequency ); } diff --git a/evita_engine/src/main/java/io/evitadb/core/Catalog.java b/evita_engine/src/main/java/io/evitadb/core/Catalog.java index b7e0724ca..579281e61 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Catalog.java +++ b/evita_engine/src/main/java/io/evitadb/core/Catalog.java @@ -32,7 +32,6 @@ import io.evitadb.api.TransactionContract.CommitBehavior; import io.evitadb.api.configuration.EvitaConfiguration; import io.evitadb.api.configuration.StorageOptions; -import io.evitadb.api.configuration.TransactionOptions; import io.evitadb.api.exception.CollectionNotFoundException; import io.evitadb.api.exception.ConcurrentSchemaUpdateException; import io.evitadb.api.exception.InvalidMutationException; @@ -86,15 +85,13 @@ import io.evitadb.core.query.algebra.Formula; import io.evitadb.core.sequence.SequenceService; import io.evitadb.core.sequence.SequenceType; +import io.evitadb.core.transaction.TransactionManager; import io.evitadb.core.transaction.memory.TransactionalLayerMaintainer; import io.evitadb.core.transaction.memory.TransactionalLayerProducer; import io.evitadb.core.transaction.memory.TransactionalObjectVersion; import io.evitadb.core.transaction.stage.AbstractTransactionStage; -import io.evitadb.core.transaction.stage.CatalogSnapshotPropagationTransactionStage; -import io.evitadb.core.transaction.stage.ConflictResolutionTransactionStage; import io.evitadb.core.transaction.stage.ConflictResolutionTransactionStage.ConflictResolutionTransactionTask; import io.evitadb.core.transaction.stage.TrunkIncorporationTransactionStage; -import io.evitadb.core.transaction.stage.WalAppendingTransactionStage; import io.evitadb.dataType.PaginatedList; import io.evitadb.exception.GenericEvitaInternalError; import io.evitadb.index.CatalogIndex; @@ -237,13 +234,9 @@ public final class Catalog implements CatalogContract, CatalogVersionBeyondTheHo */ @Getter private final ProxyFactory proxyFactory; /** - * Reference to the current {@link EvitaConfiguration#storage()} settings. + * Reference to the current {@link EvitaConfiguration} settings. */ - private final StorageOptions storageOptions; - /** - * Reference to the current {@link EvitaConfiguration#transaction()} settings. - */ - private final TransactionOptions transactionOptions; + private final EvitaConfiguration evitaConfiguration; /** * Reference to the shared transactional executor service that provides carrier threads for transaction processing. */ @@ -266,10 +259,9 @@ public final class Catalog implements CatalogContract, CatalogVersionBeyondTheHo **/ private final TracingContext tracingContext; /** - * Java {@link java.util.concurrent.Flow} implementation that allows to process transactional tasks in - * asynchronous reactive manner. + * Transaction manager used for processing the transactions. */ - private final SubmissionPublisher transactionalPipeline; + private final TransactionManager transactionManager; /** * Last persisted schema version of the catalog. */ @@ -278,10 +270,10 @@ public final class Catalog implements CatalogContract, CatalogVersionBeyondTheHo /** * Verifies whether the catalog name could be used for a new catalog. * - * @param catalogName the name of the catalog - * @param storageOptions the storage options + * @param catalogName the name of the catalog + * @param storageOptions the storage options * @param totalBytesExpected the total bytes expected to be read from the input stream - * @param inputStream the input stream with the catalog data + * @param inputStream the input stream with the catalog data * @return future that will be completed with path where the content of the catalog was restored */ public static BackgroundCallableTask restoreCatalogTo( @@ -289,53 +281,17 @@ public static BackgroundCallableTask restoreCatalogTo( @Nonnull StorageOptions storageOptions, long totalBytesExpected, @Nonnull InputStream inputStream - ) { + ) { return ServiceLoader.load(CatalogPersistenceServiceFactory.class) .findFirst() .map(it -> it.restoreCatalogTo(catalogName, storageOptions, totalBytesExpected, inputStream)) .orElseThrow(() -> new IllegalStateException("IO service is unexpectedly not available!")); } - /** - * Creates a transaction pipeline for transaction processing consisting of 4 stages: - * - * - conflict resolution (and catalog version sequence number assignment) - * - WAL appending (writing {@link IsolatedWalPersistenceService} to the shared catalog WAL) - * - trunk incorporation (applying transaction from shared WAL in order to the shared catalog view) - * - catalog snapshot propagation (propagating new catalog version to the "live view" of the evitaDB engine) - * - * @param catalog the catalog instance - * @param transactionOptions the options for the transaction - * @param scheduler the executor service for async processing - * @param newCatalogVersionConsumer the consumer to handle new catalog versions - * @return the submission publisher for conflict resolution transaction tasks - */ - @Nonnull - private static SubmissionPublisher createTransactionPipeline( - @Nonnull Catalog catalog, - @Nonnull TransactionOptions transactionOptions, - @Nonnull Scheduler scheduler, - @Nonnull ObservableExecutorService transactionalExecutor, - @Nonnull Consumer newCatalogVersionConsumer - ) { - final SubmissionPublisher txPublisher = new SubmissionPublisher<>(transactionalExecutor, transactionOptions.maxQueueSize()); - final ConflictResolutionTransactionStage stage1 = new ConflictResolutionTransactionStage(transactionalExecutor, transactionOptions.maxQueueSize(), catalog); - final WalAppendingTransactionStage stage2 = new WalAppendingTransactionStage(transactionalExecutor, transactionOptions.maxQueueSize(), catalog, stage1::notifyCatalogVersionDropped); - final TrunkIncorporationTransactionStage stage3 = new TrunkIncorporationTransactionStage(scheduler, transactionOptions.maxQueueSize(), catalog, transactionOptions.flushFrequencyInMillis()); - final CatalogSnapshotPropagationTransactionStage stage4 = new CatalogSnapshotPropagationTransactionStage(newCatalogVersionConsumer); - - txPublisher.subscribe(stage1); - stage1.subscribe(stage2); - stage2.subscribe(stage3); - stage3.subscribe(stage4); - return txPublisher; - } - public Catalog( @Nonnull CatalogSchemaContract catalogSchema, @Nonnull CacheSupervisor cacheSupervisor, - @Nonnull StorageOptions storageOptions, - @Nonnull TransactionOptions transactionOptions, + @Nonnull EvitaConfiguration evitaConfiguration, @Nonnull ReflectionLookup reflectionLookup, @Nonnull Scheduler scheduler, @Nonnull ObservableExecutorService transactionalExecutor, @@ -353,7 +309,7 @@ public Catalog( this.schema = new TransactionalReference<>(new CatalogSchemaDecorator(internalCatalogSchema)); this.persistenceService = ServiceLoader.load(CatalogPersistenceServiceFactory.class) .findFirst() - .map(it -> it.createNew(this, this.getSchema().getName(), storageOptions, transactionOptions, scheduler)) + .map(it -> it.createNew(this, this.getSchema().getName(), evitaConfiguration.storage(), evitaConfiguration.transaction(), scheduler)) .orElseThrow(StorageImplementationNotFoundException::new); this.catalogId = UUID.randomUUID(); @@ -374,13 +330,15 @@ public Catalog( this.catalogIndex = new CatalogIndex(); this.catalogIndex.attachToCatalog(null, this); this.proxyFactory = ProxyFactory.createInstance(reflectionLookup); - this.storageOptions = storageOptions; - this.transactionOptions = transactionOptions; + this.evitaConfiguration = evitaConfiguration; this.scheduler = scheduler; this.transactionalExecutor = transactionalExecutor; this.newCatalogVersionConsumer = newCatalogVersionConsumer; this.lastPersistedSchemaVersion = this.schema.get().version(); - this.transactionalPipeline = createTransactionPipeline(this, transactionOptions, scheduler, transactionalExecutor, newCatalogVersionConsumer); + this.transactionManager = new TransactionManager( + this, evitaConfiguration.server(), evitaConfiguration.transaction(), scheduler, + transactionalExecutor, newCatalogVersionConsumer + ); this.persistenceService.storeHeader( this.catalogId, CatalogState.WARMING_UP, catalogVersion, 0, null, @@ -392,8 +350,7 @@ public Catalog( public Catalog( @Nonnull String catalogName, @Nonnull CacheSupervisor cacheSupervisor, - @Nonnull StorageOptions storageOptions, - @Nonnull TransactionOptions transactionOptions, + @Nonnull EvitaConfiguration evitaConfiguration, @Nonnull ReflectionLookup reflectionLookup, @Nonnull Scheduler scheduler, @Nonnull ObservableExecutorService transactionalExecutor, @@ -403,7 +360,7 @@ public Catalog( this.tracingContext = tracingContext; this.persistenceService = ServiceLoader.load(CatalogPersistenceServiceFactory.class) .findFirst() - .map(it -> it.load(this, catalogName, storageOptions, transactionOptions, scheduler)) + .map(it -> it.load(this, catalogName, evitaConfiguration.storage(), evitaConfiguration.transaction(), scheduler)) .orElseThrow(() -> new IllegalStateException("IO service is unexpectedly not available!")); final CatalogHeader catalogHeader = this.persistenceService.getCatalogHeader( this.persistenceService.getLastCatalogVersion() @@ -482,13 +439,15 @@ public Catalog( ) ); this.proxyFactory = ProxyFactory.createInstance(reflectionLookup); - this.storageOptions = storageOptions; - this.transactionOptions = transactionOptions; + this.evitaConfiguration = evitaConfiguration; this.scheduler = scheduler; this.transactionalExecutor = transactionalExecutor; this.newCatalogVersionConsumer = newCatalogVersionConsumer; this.lastPersistedSchemaVersion = this.schema.get().version(); - this.transactionalPipeline = createTransactionPipeline(this, transactionOptions, scheduler, transactionalExecutor, newCatalogVersionConsumer); + this.transactionManager = new TransactionManager( + this, evitaConfiguration.server(), evitaConfiguration.transaction(), scheduler, + transactionalExecutor, newCatalogVersionConsumer + ); } Catalog( @@ -527,12 +486,11 @@ public Catalog( this.cacheSupervisor = previousCatalogVersion.cacheSupervisor; this.entityTypeSequence = previousCatalogVersion.entityTypeSequence; this.proxyFactory = previousCatalogVersion.proxyFactory; - this.storageOptions = previousCatalogVersion.storageOptions; - this.transactionOptions = previousCatalogVersion.transactionOptions; + this.evitaConfiguration = previousCatalogVersion.evitaConfiguration; this.scheduler = previousCatalogVersion.scheduler; this.transactionalExecutor = previousCatalogVersion.transactionalExecutor; this.newCatalogVersionConsumer = previousCatalogVersion.newCatalogVersionConsumer; - this.transactionalPipeline = previousCatalogVersion.transactionalPipeline; + this.transactionManager = previousCatalogVersion.transactionManager; catalogIndex.attachToCatalog(null, this); final StoragePartPersistenceService storagePartPersistenceService = persistenceService.getStoragePartPersistenceService(catalogVersion); @@ -969,7 +927,7 @@ public Optional getEntityIndexIfExists(@Nonnull Strin return empty(); } else if (expectedType.isInstance(entityIndex)) { //noinspection unchecked - return of((T)entityIndex); + return of((T) entityIndex); } else { throw new IllegalArgumentException("Expected index of type " + expectedType.getName() + " but got " + entityIndex.getClass().getName()); } @@ -1137,26 +1095,32 @@ public void commitWal( @Nonnull CompletableFuture transactionFinalizationFuture ) { try { - this.transactionalPipeline.offer( - new ConflictResolutionTransactionTask( - getName(), - transactionId, - walPersistenceService.getMutationCount(), - walPersistenceService.getMutationSizeInBytes(), - walPersistenceService.getWalReference(), - commitBehaviour, - transactionFinalizationFuture - ), - (subscriber, task) -> { - transactionFinalizationFuture.completeExceptionally( - new TransactionException( - "Conflict resolution transaction queue is full! Transaction cannot be processed at the moment." - ) - ); - return false; - } - ); + transactionManager.getTransactionalPublisher( + this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), + this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer + ) + .offer( + new ConflictResolutionTransactionTask( + getName(), + transactionId, + walPersistenceService.getMutationCount(), + walPersistenceService.getMutationSizeInBytes(), + walPersistenceService.getWalReference(), + commitBehaviour, + transactionFinalizationFuture + ), + (subscriber, task) -> { + transactionManager.invalidateTransactionalPublisher(); + transactionFinalizationFuture.completeExceptionally( + new TransactionException( + "Conflict resolution transaction queue is full! Transaction cannot be processed at the moment." + ) + ); + return false; + } + ); } catch (Exception e) { + transactionManager.invalidateTransactionalPublisher(); if (e.getCause() instanceof TransactionException txException) { throw txException; } else { @@ -1253,7 +1217,10 @@ public long appendWalAndDiscard( * This method is used to indicate that a catalog is currently available in the live view. */ public void notifyCatalogPresentInLiveView() { - SubmissionPublisher current = this.transactionalPipeline; + SubmissionPublisher current = transactionManager.getTransactionalPublisher( + this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), + this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer + ); while (current != null) { //noinspection unchecked final List> subscribers = (List>) current.getSubscribers(); @@ -1284,27 +1251,6 @@ public void emitDeleteObservabilityEvents() { this.persistenceService.emitDeleteObservabilityEvents(); } - /** - * Informs transactional pipeline jobs that the catalog version has advanced due to external reasons (such as - * catalog renaming). - */ - private void advanceVersion(long catalogVersion) { - SubmissionPublisher current = this.transactionalPipeline; - while (current != null) { - //noinspection unchecked - final List> subscribers = (List>) current.getSubscribers(); - Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!"); - for (Subscriber subscriber : subscribers) { - if (subscriber instanceof AbstractTransactionStage stage) { - stage.advanceVersion(catalogVersion); - current = stage; - } else { - current = null; - } - } - } - } - /** * We need to forget all volatile data when the data written to catalog aren't going to be committed (incorporated * in the final state). @@ -1372,7 +1318,10 @@ void flush() { */ @Nonnull private TrunkIncorporationTransactionStage getTrunkIncorporationStage() { - SubmissionPublisher current = this.transactionalPipeline; + SubmissionPublisher current = transactionManager.getTransactionalPublisher( + this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), + this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer + ); while (current != null) { //noinspection unchecked final List> subscribers = (List>) current.getSubscribers(); @@ -1392,6 +1341,30 @@ private TrunkIncorporationTransactionStage getTrunkIncorporationStage() { ); } + /** + * Informs transactional pipeline jobs that the catalog version has advanced due to external reasons (such as + * catalog renaming). + */ + private void advanceVersion(long catalogVersion) { + SubmissionPublisher current = transactionManager.getTransactionalPublisher( + this, this.evitaConfiguration.server(), this.evitaConfiguration.transaction(), + this.scheduler, this.transactionalExecutor, this.newCatalogVersionConsumer + ); + while (current != null) { + //noinspection unchecked + final List> subscribers = (List>) current.getSubscribers(); + Assert.isPremiseValid(subscribers.size() == 1, "Only one subscriber is expected!"); + for (Subscriber subscriber : subscribers) { + if (subscriber instanceof AbstractTransactionStage stage) { + stage.advanceVersion(catalogVersion); + current = stage; + } else { + current = null; + } + } + } + } + /** * Replaces reference to the catalog in this instance. The reference is stored in transactional data structure so * that it doesn't affect parallel clients until committed. diff --git a/evita_engine/src/main/java/io/evitadb/core/Evita.java b/evita_engine/src/main/java/io/evitadb/core/Evita.java index 52cbf72d1..7dfb0dc39 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Evita.java +++ b/evita_engine/src/main/java/io/evitadb/core/Evita.java @@ -660,8 +660,7 @@ private ProgressiveCompletableFuture loadCatalog(@Nonnull String catalogNa final Catalog theCatalog = new Catalog( catalogName, this.cacheSupervisor, - this.configuration.storage(), - this.configuration.transaction(), + this.configuration, this.reflectionLookup, this.serviceExecutor, this.transactionExecutor, @@ -746,8 +745,7 @@ private void createCatalogInternal(@Nonnull CreateCatalogSchemaMutation createCa return new Catalog( catalogSchema, this.cacheSupervisor, - this.configuration.storage(), - this.configuration.transaction(), + this.configuration, this.reflectionLookup, this.serviceExecutor, this.transactionExecutor, diff --git a/evita_engine/src/main/java/io/evitadb/core/metric/event/system/EvitaStartedEvent.java b/evita_engine/src/main/java/io/evitadb/core/metric/event/system/EvitaStartedEvent.java index 850fa13de..dbaecbc75 100644 --- a/evita_engine/src/main/java/io/evitadb/core/metric/event/system/EvitaStartedEvent.java +++ b/evita_engine/src/main/java/io/evitadb/core/metric/event/system/EvitaStartedEvent.java @@ -104,10 +104,6 @@ public class EvitaStartedEvent extends AbstractSystemCatalogEvent { @ExportMetric(metricType = MetricType.GAUGE) private final int transactionMemoryRegions; - @Label("Maximal count of commited transactions in queue") - @ExportMetric(metricType = MetricType.GAUGE) - private final int transactionMaxQueueSize; - @Label("Maximal write-ahead log file size in Bytes") @ExportMetric(metricType = MetricType.GAUGE) private final long walMaxFileSizeBytes; @@ -152,7 +148,6 @@ public EvitaStartedEvent(@Nonnull EvitaConfiguration configuration) { this.transactionMemoryRegions = transactionConfiguration.transactionMemoryRegionCount(); this.walMaxFileSizeBytes = transactionConfiguration.walFileSizeBytes(); this.walMaxFileCountKept = transactionConfiguration.walFileCountKept(); - this.transactionMaxQueueSize = transactionConfiguration.maxQueueSize(); final CacheOptions cacheConfiguration = configuration.cache(); this.cacheReevaluationSeconds = cacheConfiguration.reevaluateEachSeconds(); diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java b/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java new file mode 100644 index 000000000..0d7fdd487 --- /dev/null +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/TransactionManager.java @@ -0,0 +1,148 @@ +/* + * + * _ _ ____ ____ + * _____ _(_) |_ __ _| _ \| __ ) + * / _ \ \ / / | __/ _` | | | | _ \ + * | __/\ V /| | || (_| | |_| | |_) | + * \___| \_/ |_|\__\__,_|____/|____/ + * + * Copyright (c) 2024 + * + * Licensed under the Business Source License, Version 1.1 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/FgForrest/evitaDB/blob/master/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.evitadb.core.transaction; + +import io.evitadb.api.configuration.ServerOptions; +import io.evitadb.api.configuration.TransactionOptions; +import io.evitadb.core.Catalog; +import io.evitadb.core.async.ObservableExecutorService; +import io.evitadb.core.async.Scheduler; +import io.evitadb.core.transaction.stage.CatalogSnapshotPropagationTransactionStage; +import io.evitadb.core.transaction.stage.ConflictResolutionTransactionStage; +import io.evitadb.core.transaction.stage.ConflictResolutionTransactionStage.ConflictResolutionTransactionTask; +import io.evitadb.core.transaction.stage.TrunkIncorporationTransactionStage; +import io.evitadb.core.transaction.stage.WalAppendingTransactionStage; +import io.evitadb.store.spi.IsolatedWalPersistenceService; + +import javax.annotation.Nonnull; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.Optional.ofNullable; + +/** + * Transaction manager is propagated through different versions / instances of the same catalog and is responsible for + * managing the transaction processing pipeline. This pipeline or its parts might be closed anytime due to + * the {@link RejectedExecutionException} and needs to be recreated from scratch when this happens. There must be no + * more than single active transaction pipeline at a time. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 + */ +public class TransactionManager { + /** + * Java {@link java.util.concurrent.Flow} implementation that allows to process transactional tasks in + * asynchronous reactive manner. + */ + private final AtomicReference> transactionalPipeline = new AtomicReference<>(); + + public TransactionManager( + @Nonnull Catalog catalog, + @Nonnull ServerOptions serverOptions, + @Nonnull TransactionOptions transactionOptions, + @Nonnull Scheduler scheduler, + @Nonnull ObservableExecutorService transactionalExecutor, + @Nonnull Consumer newCatalogVersionConsumer + ) { + getTransactionalPublisher( + catalog, serverOptions, transactionOptions, scheduler, transactionalExecutor, newCatalogVersionConsumer + ); + } + + /** + * Method lazily creates and returns the transaction pipeline. The transaction processing consists of 4 stages: + * + * - conflict resolution (and catalog version sequence number assignment) + * - WAL appending (writing {@link IsolatedWalPersistenceService} to the shared catalog WAL) + * - trunk incorporation (applying transaction from shared WAL in order to the shared catalog view) + * - catalog snapshot propagation (propagating new catalog version to the "live view" of the evitaDB engine) + * + * @return the submission publisher for conflict resolution transaction tasks + */ + @Nonnull + public SubmissionPublisher getTransactionalPublisher( + @Nonnull Catalog catalog, + @Nonnull ServerOptions serverOptions, + @Nonnull TransactionOptions transactionOptions, + @Nonnull Scheduler scheduler, + @Nonnull ObservableExecutorService transactionalExecutor, + @Nonnull Consumer newCatalogVersionConsumer + ) { + final SubmissionPublisher thePipeline = transactionalPipeline.get(); + if (thePipeline != null && !thePipeline.isClosed()) { + return thePipeline; + } else { + synchronized (this.transactionalPipeline) { + final int maxBufferCapacity = serverOptions.transactionThreadPool().queueSize(); + + final SubmissionPublisher txPublisher = new SubmissionPublisher<>( + transactionalExecutor, maxBufferCapacity + ); + final ConflictResolutionTransactionStage stage1 = new ConflictResolutionTransactionStage( + transactionalExecutor, maxBufferCapacity, catalog, + this::invalidateTransactionalPublisher + ); + final WalAppendingTransactionStage stage2 = new WalAppendingTransactionStage( + transactionalExecutor, maxBufferCapacity, catalog, + stage1::notifyCatalogVersionDropped, + this::invalidateTransactionalPublisher + ); + final TrunkIncorporationTransactionStage stage3 = new TrunkIncorporationTransactionStage( + scheduler, maxBufferCapacity, + catalog, transactionOptions.flushFrequencyInMillis(), + this::invalidateTransactionalPublisher + ); + final CatalogSnapshotPropagationTransactionStage stage4 = new CatalogSnapshotPropagationTransactionStage( + newCatalogVersionConsumer + ); + + txPublisher.subscribe(stage1); + stage1.subscribe(stage2); + stage2.subscribe(stage3); + stage3.subscribe(stage4); + + this.transactionalPipeline.set(txPublisher); + + return txPublisher; + } + } + } + + /** + * This method is called when any of the {@link SubmissionPublisher} + * gets closed - for example due to the exception in the processing of the transactional task. One of the possible + * issues is that the system can't keep up and throws {@link RejectedExecutionException}. + * + * In such a situation, the submission publisher is automatically closed and needs to be recreated from scratch. + * This is design decision form the authors of the {@link java.util.concurrent.Flow} API. + */ + public void invalidateTransactionalPublisher() { + synchronized (this.transactionalPipeline) { + ofNullable(this.transactionalPipeline.getAndSet(null)) + .ifPresent(SubmissionPublisher::close); + + } + } +} diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/AbstractTransactionStage.java b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/AbstractTransactionStage.java index 95aa285d9..7a4a2e23b 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/AbstractTransactionStage.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/AbstractTransactionStage.java @@ -75,10 +75,15 @@ public sealed abstract class AbstractTransactionStage(catalog); + this.onException = onException; } @Override @@ -99,7 +104,7 @@ public final void onNext(T task) { } catch (Throwable ex) { handleException(task, ex); } - subscription.request(1); + this.subscription.request(1); } /** @@ -113,6 +118,7 @@ protected void handleException(@Nonnull T task, @Nonnull Throwable ex) { if (future != null) { future.completeExceptionally(ex); } + onException.run(); } @Override diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/ConflictResolutionTransactionStage.java b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/ConflictResolutionTransactionStage.java index 1db2e87ea..1a59e499b 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/ConflictResolutionTransactionStage.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/ConflictResolutionTransactionStage.java @@ -65,9 +65,10 @@ public final class ConflictResolutionTransactionStage public ConflictResolutionTransactionStage( @Nonnull Executor executor, int maxBufferCapacity, - @Nonnull Catalog catalog + @Nonnull Catalog catalog, + @Nonnull Runnable onException ) { - super(executor, maxBufferCapacity, catalog); + super(executor, maxBufferCapacity, catalog, onException); this.catalogVersion = new AtomicLong(catalog.getVersion()); } diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/TrunkIncorporationTransactionStage.java b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/TrunkIncorporationTransactionStage.java index 4bed90813..da147bd13 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/TrunkIncorporationTransactionStage.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/TrunkIncorporationTransactionStage.java @@ -129,9 +129,10 @@ public TrunkIncorporationTransactionStage( @Nonnull Executor executor, int maxBufferCapacity, @Nonnull Catalog catalog, - long timeoutInMillis + long timeoutInMillis, + @Nonnull Runnable onException ) { - super(executor, maxBufferCapacity, catalog); + super(executor, maxBufferCapacity, catalog, onException); this.catalog = catalog; this.timeout = timeoutInMillis * 1_000_000; this.lastFinalizedCatalogVersion = catalog.getVersion(); diff --git a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/WalAppendingTransactionStage.java b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/WalAppendingTransactionStage.java index 4b97f51f0..9be1d9948 100644 --- a/evita_engine/src/main/java/io/evitadb/core/transaction/stage/WalAppendingTransactionStage.java +++ b/evita_engine/src/main/java/io/evitadb/core/transaction/stage/WalAppendingTransactionStage.java @@ -64,9 +64,10 @@ public WalAppendingTransactionStage( @Nonnull Executor executor, int maxBufferCapacity, @Nonnull Catalog catalog, - @Nonnull IntConsumer catalogVersionCompensator + @Nonnull IntConsumer catalogVersionCompensator, + @Nonnull Runnable onException ) { - super(executor, maxBufferCapacity, catalog); + super(executor, maxBufferCapacity, catalog, onException); this.catalogVersionCompensator = catalogVersionCompensator; } @@ -77,7 +78,6 @@ protected String getName() { @Override protected void handleNext(@Nonnull WalAppendingTransactionTask task) { - // emit queue event task.transactionQueuedEvent().finish().commit(); @@ -124,7 +124,7 @@ protected void handleNext(@Nonnull WalAppendingTransactionTask task) { @Override protected void handleException(@Nonnull WalAppendingTransactionTask task, @Nonnull Throwable ex) { - catalogVersionCompensator.accept(1); + this.catalogVersionCompensator.accept(1); super.handleException(task, ex); } diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java index bd5cce378..41f9cab55 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTransactionalFunctionalTest.java @@ -1128,7 +1128,6 @@ void shouldAutomaticallyGenerateEntitiesInParallel(EvitaContract originalEvita, .storage(originalConfiguration.storage()) .transaction( TransactionOptions.builder() - .maxQueueSize(16_384) .build() ) .server(originalConfiguration.server()) @@ -1154,7 +1153,6 @@ void shouldBackupAndRestoreCatalogDuringHeavyParallelIndexing(EvitaContract orig .storage(originalConfiguration.storage()) .transaction( TransactionOptions.builder() - .maxQueueSize(16_384) .build() ) .server(originalConfiguration.server()) @@ -1240,7 +1238,6 @@ void shouldCorrectlyRotateAllFiles(GenerationalTestInput input) throws Exception TransactionOptions.builder() .walFileSizeBytes(4_096) .walFileCountKept(2) - .maxQueueSize(16_384) .build() ) .server( diff --git a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java index 28271a3b8..5f7ce06b7 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/store/catalog/DefaultCatalogPersistenceServiceTest.java @@ -740,7 +740,6 @@ private TransactionOptions getTransactionOptions() { TransactionOptions.DEFAULT_TRANSACTION_MEMORY_REGION_COUNT, TransactionOptions.DEFAULT_WAL_SIZE_BYTES, TransactionOptions.DEFAULT_WAL_FILE_COUNT_KEPT, - TransactionOptions.DEFAULT_MAX_QUEUE_SIZE, TransactionOptions.DEFAULT_FLUSH_FREQUENCY ); }