Skip to content

Commit

Permalink
feat(#37): Cache inconsistency
Browse files Browse the repository at this point in the history
Refactored work with QueryContext dividing it into two - QueryPlanningContext and QueryExecutionContext. The latter is unique per (isolated for) QueryPlan execution. This lead to simplification of the QueryPlanning and avoiding issues with multiple plans execution and result comparison for debug requirements.
  • Loading branch information
novoj committed Jun 14, 2024
1 parent 4132c9f commit 3d79f7b
Show file tree
Hide file tree
Showing 181 changed files with 3,460 additions and 2,919 deletions.
18 changes: 13 additions & 5 deletions evita_api/src/main/java/io/evitadb/api/CatalogContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.evitadb.api.exception.EntityTypeAlreadyPresentInCatalogSchemaException;
import io.evitadb.api.exception.InvalidMutationException;
import io.evitadb.api.exception.SchemaAlteringException;
import io.evitadb.api.exception.TemporalDataNotAvailableException;
import io.evitadb.api.requestResponse.EvitaRequest;
import io.evitadb.api.requestResponse.EvitaResponse;
import io.evitadb.api.requestResponse.mutation.Mutation;
Expand All @@ -40,12 +41,14 @@
import io.evitadb.api.requestResponse.system.CatalogVersionDescriptor;
import io.evitadb.api.requestResponse.system.TimeFlow;
import io.evitadb.api.requestResponse.transaction.TransactionMutation;
import io.evitadb.api.task.ProgressiveCompletableFuture;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.UnexpectedIOException;

import javax.annotation.Nonnull;
import java.io.OutputStream;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -305,10 +308,15 @@ boolean renameCollectionOfEntity(@Nonnull String entityType, @Nonnull String new
/**
* Creates a backup of the specified catalog and returns an InputStream to read the binary data of the zip file.
*
* @param outputStream an OutputStream to write the binary data of the zip file
* @throws UnexpectedIOException if an I/O error occurs during reading the catalog contents
* @param pastMoment leave null for creating backup for actual dataset, or specify past moment to create backup for
* the dataset as it was at that moment
* @param includingWAL if true, the backup will include the Write-Ahead Log (WAL) file and when the catalog is
* restored, it'll replay the WAL contents locally to bring the catalog to the current state
* @return jobId of the backup process
* @throws TemporalDataNotAvailableException when the past data is not available
*/
void backup(OutputStream outputStream) throws UnexpectedIOException;
@Nonnull
ProgressiveCompletableFuture<Path> backup(@Nullable OffsetDateTime pastMoment, boolean includingWAL) throws TemporalDataNotAvailableException;

/**
* Terminates catalog instance and frees all claimed resources. Prepares catalog instance to be garbage collected.
Expand Down
17 changes: 12 additions & 5 deletions evita_api/src/main/java/io/evitadb/api/EvitaContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import io.evitadb.api.TransactionContract.CommitBehavior;
import io.evitadb.api.exception.CatalogAlreadyPresentException;
import io.evitadb.api.exception.InstanceTerminatedException;
import io.evitadb.api.exception.PastDataNotAvailableException;
import io.evitadb.api.exception.TemporalDataNotAvailableException;
import io.evitadb.api.exception.TransactionException;
import io.evitadb.api.job.JobStatus;
import io.evitadb.api.requestResponse.schema.CatalogSchemaEditor.CatalogSchemaBuilder;
import io.evitadb.api.requestResponse.schema.SealedCatalogSchema;
import io.evitadb.api.requestResponse.schema.mutation.TopLevelCatalogSchemaMutation;
import io.evitadb.api.requestResponse.system.SystemStatus;
import io.evitadb.api.task.JobStatus;
import io.evitadb.api.task.ProgressiveCompletableFuture;
import io.evitadb.dataType.PaginatedList;
import io.evitadb.exception.EvitaInternalError;
import io.evitadb.exception.EvitaInvalidUsageException;
Expand All @@ -44,6 +45,7 @@
import javax.annotation.concurrent.ThreadSafe;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -366,22 +368,27 @@ CompletableFuture<Long> updateCatalogAsync(
* @param includingWAL if true, the backup will include the Write-Ahead Log (WAL) file and when the catalog is
* restored, it'll replay the WAL contents locally to bring the catalog to the current state
* @return jobId of the backup process
* @throws PastDataNotAvailableException when the past data is not available
* @throws TemporalDataNotAvailableException when the past data is not available
*/
@Nonnull
UUID backupCatalog(@Nonnull String catalogName, @Nullable OffsetDateTime pastMoment, boolean includingWAL) throws PastDataNotAvailableException;
ProgressiveCompletableFuture<Path> backupCatalog(@Nonnull String catalogName, @Nullable OffsetDateTime pastMoment, boolean includingWAL) throws TemporalDataNotAvailableException;

/**
* Restores a catalog from the provided InputStream which contains the binary data of a previously backed up zip
* file. The input stream is closed within the method.
*
* @param catalogName the name of the catalog to restore
* @param totalBytesExpected total bytes expected to be read from the input stream
* @param inputStream an InputStream to read the binary data of the zip file
* @return jobId of the restore process
* @throws UnexpectedIOException if an I/O error occurs
*/
@Nonnull
UUID restoreCatalog(@Nonnull String catalogName, @Nonnull InputStream inputStream) throws UnexpectedIOException;
ProgressiveCompletableFuture<Void> restoreCatalog(
@Nonnull String catalogName,
long totalBytesExpected,
@Nonnull InputStream inputStream
) throws UnexpectedIOException;

/**
* TODO JNO - document me
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.evitadb.api.exception.EntityClassInvalidException;
import io.evitadb.api.exception.EntityTypeAlreadyPresentInCatalogSchemaException;
import io.evitadb.api.exception.InstanceTerminatedException;
import io.evitadb.api.exception.PastDataNotAvailableException;
import io.evitadb.api.exception.SchemaAlteringException;
import io.evitadb.api.exception.TemporalDataNotAvailableException;
import io.evitadb.api.exception.UnexpectedResultCountException;
import io.evitadb.api.exception.UnexpectedResultException;
import io.evitadb.api.proxy.ProxyFactory;
Expand Down Expand Up @@ -65,6 +65,7 @@
import io.evitadb.api.requestResponse.schema.mutation.LocalCatalogSchemaMutation;
import io.evitadb.api.requestResponse.schema.mutation.catalog.ModifyCatalogSchemaMutation;
import io.evitadb.api.requestResponse.schema.mutation.catalog.ModifyEntitySchemaMutation;
import io.evitadb.api.task.ProgressiveCompletableFuture;
import io.evitadb.exception.EvitaInvalidUsageException;
import io.evitadb.utils.ArrayUtils;
import io.evitadb.utils.Assert;
Expand All @@ -74,6 +75,7 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.io.Serializable;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -995,10 +997,10 @@ <T extends Serializable> DeletedHierarchy<T> deleteEntityAndItsHierarchy(@Nonnul
* @param includingWAL if true, the backup will include the Write-Ahead Log (WAL) file and when the catalog is
* restored, it'll replay the WAL contents locally to bring the catalog to the current state
* @return jobId of the backup process
* @throws PastDataNotAvailableException when the past data is not available
* @throws TemporalDataNotAvailableException when the past data is not available
*/
@Nonnull
UUID backupCatalog(@Nullable OffsetDateTime pastMoment, boolean includingWAL) throws PastDataNotAvailableException;
ProgressiveCompletableFuture<Path> backupCatalog(@Nullable OffsetDateTime pastMoment, boolean includingWAL) throws TemporalDataNotAvailableException;

/**
* Default implementation uses ID for comparing two sessions (and to distinguish one session from another).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* limitations under the License.
*/

package io.evitadb.api.job;
package io.evitadb.api.task;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* limitations under the License.
*/

package io.evitadb.api.job;
package io.evitadb.api.task;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* limitations under the License.
*/

package io.evitadb.api.job;
package io.evitadb.api.task;

import java.io.Serializable;
import java.time.OffsetDateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* limitations under the License.
*/

package io.evitadb.api.job;
package io.evitadb.api.task;

/**
* This enumeration contains listing of all job types.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
*
* _ _ ____ ____
* _____ _(_) |_ __ _| _ \| __ )
* / _ \ \ / / | __/ _` | | | | _ \
* | __/\ V /| | || (_| | |_| | |_) |
* \___| \_/ |_|\__\__,_|____/|____/
*
* Copyright (c) 2024
*
* Licensed under the Business Source License, Version 1.1 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/FgForrest/evitaDB/blob/master/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.evitadb.api.task;

import io.evitadb.utils.Assert;
import io.evitadb.utils.UUIDUtil;
import lombok.Getter;

import javax.annotation.Nonnull;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntConsumer;

/**
* This extension of {@link CompletableFuture} is used to track the progress of the task and to provide a unique id for
* the task.
*
* @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024
*/
public class ProgressiveCompletableFuture<T> extends CompletableFuture<T> {
/**
* Unique id of the task.
*/
@Getter
private final UUID id = UUIDUtil.randomUUID();
/**
* This counter is used to track the progress of the task.
* The value -1 means that the future is still waiting for the task to start.
* The value 0 means that the task has started.
* The value 100 means that the task has finished.
*/
private final AtomicInteger progress = new AtomicInteger(-1);
/**
* Listeners that are notified when the progress of the task changes.
*/
private final CopyOnWriteArrayList<IntConsumer> progressListeners = new CopyOnWriteArrayList<>();

/**
* Creates a new completed instance of {@link ProgressiveCompletableFuture}.
*/
@Nonnull
public static ProgressiveCompletableFuture<Void> completed() {
final ProgressiveCompletableFuture<Void> future = new ProgressiveCompletableFuture<>();
future.complete(null);
return future;
}

/**
* Returns progress of the task.
*
* @return progress of the task in percents
*/
public int getProgress() {
return progress.get();
}

/**
* Method updates the progress of the task.
*
* @param progress new progress of the task in percents
*/
public void updateProgress(int progress) {
Assert.isPremiseValid(progress >= 0 && progress <= 100, "Progress must be in range 0-100");
final int currentProgress = this.progress.updateAndGet(operand -> Math.max(progress, operand));
for (IntConsumer progressListener : progressListeners) {
progressListener.accept(currentProgress);
}
}

@Override
public boolean complete(T value) {
final boolean complete = super.complete(value);
if (complete) {
updateProgress(100);
this.progressListeners.clear();
}
return complete;
}

@Override
public boolean completeExceptionally(Throwable ex) {
final boolean completeExceptionally = super.completeExceptionally(ex);
if (completeExceptionally) {
updateProgress(100);
this.progressListeners.clear();
}
return completeExceptionally;
}

/**
* Returns a new {@link ProgressiveCompletableFuture} that is completed when this task completes. The progress will
* be correctly adjusted to reach 100% when both tasks are finished.
*
* @param nextStage function that is applied to the result of this task
* @return new {@link ProgressiveCompletableFuture} that is completed when this task completes
* @param <U> type of the result of the next task
*/
@Nonnull
public <U> ProgressiveCompletableFuture<U> andThen(@Nonnull Function<T, ProgressiveCompletableFuture<U>> nextStage) {
final ProgressiveCompletableFuture<U> combinedFuture = new ProgressiveCompletableFuture<>();
this.addProgressListener(progress -> combinedFuture.combineProgressOfTwoSequentialParts(progress, 1))
.whenComplete((t, throwable) -> {
if (throwable != null) {
combinedFuture.completeExceptionally(throwable);
} else {
final ProgressiveCompletableFuture<U> nextFuture = nextStage.apply(t);
nextFuture
.addProgressListener(progress -> combinedFuture.combineProgressOfTwoSequentialParts(progress, 2))
.whenComplete((u, throwable1) -> {
if (throwable1 != null) {
combinedFuture.completeExceptionally(throwable1);
} else {
combinedFuture.complete(u);
}
});
}
});
return combinedFuture;
}

/**
* The second part is expected to start when the first part is finished. The progress of the second part is combined
* with the progress of the first part.
* @param progress progress of the particular part
* @param part order number of the part
*/
private void combineProgressOfTwoSequentialParts(int progress, int part) {
this.updateProgress(((part - 1) * 50) + progress / 2);
}

/**
* Adds a listener that is notified when the progress of the task changes.
*
* @param listener listener that is notified when the progress of the task changes
* @return this instance
*/
@Nonnull
ProgressiveCompletableFuture<T> addProgressListener(@Nonnull IntConsumer listener) {
progressListeners.add(listener);
return this;
}
}
2 changes: 1 addition & 1 deletion evita_api/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
exports io.evitadb.api.configuration;
exports io.evitadb.api.configuration.metric;
exports io.evitadb.api.exception;
exports io.evitadb.api.job;
exports io.evitadb.api.task;
exports io.evitadb.api.proxy;
exports io.evitadb.api.proxy.impl;
exports io.evitadb.api.requestResponse;
Expand Down
11 changes: 7 additions & 4 deletions evita_engine/src/main/java/io/evitadb/core/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.evitadb.api.exception.InvalidSchemaMutationException;
import io.evitadb.api.exception.SchemaAlteringException;
import io.evitadb.api.exception.SchemaNotFoundException;
import io.evitadb.api.exception.TemporalDataNotAvailableException;
import io.evitadb.api.exception.TransactionException;
import io.evitadb.api.observability.trace.TracingContext;
import io.evitadb.api.proxy.ProxyFactory;
Expand Down Expand Up @@ -68,6 +69,8 @@
import io.evitadb.api.requestResponse.system.CatalogVersionDescriptor;
import io.evitadb.api.requestResponse.system.TimeFlow;
import io.evitadb.api.requestResponse.transaction.TransactionMutation;
import io.evitadb.api.task.ProgressiveCompletableFuture;
import io.evitadb.core.async.BackgroundCallableTask;
import io.evitadb.core.async.ObservableExecutorService;
import io.evitadb.core.async.Scheduler;
import io.evitadb.core.buffer.DataStoreChanges;
Expand All @@ -78,6 +81,7 @@
import io.evitadb.core.query.QueryContext;
import io.evitadb.core.query.QueryPlan;
import io.evitadb.core.query.QueryPlanner;
import io.evitadb.core.query.QueryPlanningContext;
import io.evitadb.core.query.algebra.Formula;
import io.evitadb.core.sequence.SequenceService;
import io.evitadb.core.sequence.SequenceType;
Expand Down Expand Up @@ -1558,14 +1562,13 @@ private EntityCollection getOrCreateCollectionForEntityInternal(@Nonnull String
}

/**
* Method creates {@link QueryContext} that is used for read operations.
* Method creates {@link QueryPlanningContext} that is used for read operations.
*/
@Nonnull
private QueryContext createQueryContext(@Nonnull EvitaRequest evitaRequest, @Nonnull EvitaSessionContract session) {
return new QueryContext(
private QueryPlanningContext createQueryContext(@Nonnull EvitaRequest evitaRequest, @Nonnull EvitaSessionContract session) {
return new QueryPlanningContext(
this,
null,
new CatalogReadOnlyEntityStorageContainerAccessor(this),
session, evitaRequest,
evitaRequest.isQueryTelemetryRequested() ? new QueryTelemetry(QueryPhase.OVERALL) : null,
Collections.emptyMap(),
Expand Down
Loading

0 comments on commit 3d79f7b

Please sign in to comment.