Skip to content

Commit

Permalink
feat(#41): backup & restore over the client
Browse files Browse the repository at this point in the history
Implemented and tested
  • Loading branch information
novoj committed May 6, 2024
1 parent 3e17056 commit b2c0d75
Show file tree
Hide file tree
Showing 22 changed files with 3,340 additions and 424 deletions.
3 changes: 2 additions & 1 deletion evita_api/src/main/java/io/evitadb/api/EvitaContract.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ default void updateCatalog(
void backupCatalog(@Nonnull String catalogName, @Nonnull OutputStream outputStream) throws UnexpectedIOException;

/**
* Restores a catalog from the provided InputStream which contains the binary data of a previously backed up zip file.
* Restores a catalog from the provided InputStream which contains the binary data of a previously backed up zip
* file. The input stream is closed within the method.
*
* @param catalogName the name of the catalog to restore
* @param inputStream an InputStream to read the binary data of the zip file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

package io.evitadb.driver;

import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.evitadb.api.EvitaContract;
import io.evitadb.api.EvitaSessionContract;
Expand Down Expand Up @@ -53,6 +54,7 @@
import io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter;
import io.evitadb.externalApi.grpc.generated.*;
import io.evitadb.externalApi.grpc.generated.EvitaServiceGrpc.EvitaServiceFutureStub;
import io.evitadb.externalApi.grpc.generated.EvitaServiceGrpc.EvitaServiceStub;
import io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter;
import io.evitadb.externalApi.grpc.requestResponse.schema.mutation.DelegatingTopLevelCatalogSchemaMutationConverter;
import io.evitadb.externalApi.grpc.requestResponse.schema.mutation.SchemaMutationConverter;
Expand All @@ -67,14 +69,17 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand All @@ -90,6 +95,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -155,6 +161,18 @@ public class EvitaClient implements EvitaContract {
*/
private final ThreadLocal<LinkedList<Timeout>> timeout;

@Nonnull
private static ClientTracingContext getClientTracingContext(@Nonnull EvitaClientConfiguration configuration) {
final ClientTracingContext context = ClientTracingContextProvider.getContext();
final Object openTelemetryInstance = configuration.openTelemetryInstance();
if (openTelemetryInstance != null && context instanceof DefaultClientTracingContext) {
throw new EvitaInvalidUsageException(
"OpenTelemetry instance is set, but tracing context is not configured!"
);
}
return context;
}

public EvitaClient(@Nonnull EvitaClientConfiguration configuration) {
this(configuration, null);
}
Expand Down Expand Up @@ -262,33 +280,11 @@ public EvitaClient(
}
}

@Nonnull
private static ClientTracingContext getClientTracingContext(@Nonnull EvitaClientConfiguration configuration) {
final ClientTracingContext context = ClientTracingContextProvider.getContext();
final Object openTelemetryInstance = configuration.openTelemetryInstance();
if (openTelemetryInstance != null && context instanceof DefaultClientTracingContext) {
throw new EvitaInvalidUsageException(
"OpenTelemetry instance is set, but tracing context is not configured!"
);
}
return context;
}

@Override
public boolean isActive() {
return active.get();
}

/**
* Retrieves the version number of the evitaDB client.
*
* @return The version number as a string.
*/
@Nonnull
public String getVersion() {
return VersionUtils.readVersion();
}

@Nonnull
@Override
public EvitaClientSession createSession(@Nonnull SessionTraits traits) {
Expand Down Expand Up @@ -590,6 +586,92 @@ public void updateCatalog(@Nonnull String catalogName, @Nonnull Consumer<EvitaSe
}
}

@Override
public void backupCatalog(@Nonnull String catalogName, @Nonnull OutputStream outputStream) throws UnexpectedIOException {
assertActive();
try (final EvitaSessionContract session = this.createReadOnlySession(catalogName)) {
session.backupCatalog(outputStream);
}
}

@Override
public void restoreCatalog(@Nonnull String catalogName, @Nonnull InputStream inputStream) throws UnexpectedIOException {
assertActive();

executeWithAsyncEvitaService(
evitaService -> {
final CompletableFuture<Void> result = new CompletableFuture<>();
final AtomicLong bytesSent = new AtomicLong(0);
final StreamObserver<GrpcRestoreCatalogRequest> requestObserver = evitaService.restoreCatalog(
new StreamObserver<>() {
final AtomicLong bytesReceived = new AtomicLong(0);

@Override
public void onNext(GrpcRestoreCatalogResponse value) {
bytesReceived.accumulateAndGet(value.getRead(), Math::max);
}

@Override
public void onError(Throwable t) {
log.error("Error occurred during catalog restoration: {}", t.getMessage(), t);
result.completeExceptionally(t);
}

@Override
public void onCompleted() {
if (bytesSent.get() == bytesReceived.get()) {
result.complete(null);
} else {
result.completeExceptionally(
new UnexpectedIOException(
"Number of bytes sent and received during catalog restoration does not match (sent " + bytesSent.get() + ", received " + bytesReceived.get() + ")!",
"Number of bytes sent and received during catalog restoration does not match!"
)
);
}
}
}
);

// Send data in chunks
final ByteBuffer buffer = ByteBuffer.allocate(65_536);
try (inputStream) {
while (inputStream.available() > 0) {
final int read = inputStream.read(buffer.array());
if (read == -1) {
requestObserver.onCompleted();
}
buffer.limit(read);
requestObserver.onNext(
GrpcRestoreCatalogRequest.newBuilder()
.setCatalogName(catalogName)
.setBackupFile(ByteString.copyFrom(buffer))
.build()
);
buffer.clear();
bytesSent.addAndGet(read);
}

requestObserver.onCompleted();
} catch (IOException e) {
requestObserver.onError(e);
throw new RuntimeException(e);
}

// wait for result and rethrow original exception if available
try {
return result.join();
} catch (RuntimeException ex) {
if (ex.getCause() instanceof RuntimeException runtimeException) {
throw runtimeException;
} else {
throw ex;
}
}
}
);
}

@Override
public CompletableFuture<Long> updateCatalogAsync(
@Nonnull String catalogName,
Expand All @@ -616,16 +698,6 @@ public CompletableFuture<Long> updateCatalogAsync(
return closeFuture;
}

@Override
public void backupCatalog(@Nonnull String catalogName, @Nonnull OutputStream outputStream) throws UnexpectedIOException {
/* TODO JNO - implement me */
}

@Override
public void restoreCatalog(@Nonnull String catalogName, @Nonnull InputStream inputStream) throws UnexpectedIOException {
/* TODO JNO - implement me */
}

@Nonnull
@Override
public SystemStatus getSystemStatus() {
Expand All @@ -648,6 +720,16 @@ public SystemStatus getSystemStatus() {
);
}

/**
* Retrieves the version number of the evitaDB client.
*
* @return The version number as a string.
*/
@Nonnull
public String getVersion() {
return VersionUtils.readVersion();
}

@Override
public void close() {
if (active.compareAndSet(true, false)) {
Expand All @@ -662,9 +744,9 @@ public void close() {
* Method executes lambda using specified timeout for the call ignoring the defaults specified
* in {@link EvitaClientConfiguration#timeout()}.
*
* @param lambda logic to be executed
* @param lambda logic to be executed
* @param timeout timeout value
* @param unit time unit of the timeout
* @param unit time unit of the timeout
*/
public void executeWithExtendedTimeout(@Nonnull Runnable lambda, long timeout, @Nonnull TimeUnit unit) {
try {
Expand All @@ -679,11 +761,11 @@ public void executeWithExtendedTimeout(@Nonnull Runnable lambda, long timeout, @
* Method executes lambda using specified timeout for the call ignoring the defaults specified
* in {@link EvitaClientConfiguration#timeout()}.
*
* @param lambda logic to be executed
* @param lambda logic to be executed
* @param timeout timeout value
* @param unit time unit of the timeout
* @param unit time unit of the timeout
* @param <T> type of the result
* @return result of the lambda
* @param <T> type of the result
*/
public <T> T executeWithExtendedTimeout(@Nonnull Supplier<T> lambda, long timeout, @Nonnull TimeUnit unit) {
try {
Expand All @@ -707,14 +789,63 @@ protected void assertActive() {
* Method that is called within the {@link EvitaClientSession} to apply the wanted logic on a channel retrieved
* from a channel pool.
*
* @param evitaServiceBlockingStub function that holds a logic passed by the caller
* @param lambda function that holds a logic passed by the caller
* @param <T> return type of the function
* @return result of the applied function
*/
private <T> T executeWithEvitaService(@Nonnull AsyncCallFunction<EvitaServiceFutureStub, T> evitaServiceBlockingStub) {
private <T> T executeWithEvitaService(@Nonnull AsyncCallFunction<EvitaServiceFutureStub, T> lambda) {
final ManagedChannel managedChannel = this.channelPool.getChannel();
try {
return lambda.apply(EvitaServiceGrpc.newFutureStub(managedChannel));
} catch (StatusRuntimeException statusRuntimeException) {
final Code statusCode = statusRuntimeException.getStatus().getCode();
final String description = ofNullable(statusRuntimeException.getStatus().getDescription())
.orElse("No description.");
if (statusCode == Code.INVALID_ARGUMENT) {
final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description);
if (expectedFormat.matches()) {
throw EvitaInvalidUsageException.createExceptionWithErrorCode(
expectedFormat.group(2), expectedFormat.group(1)
);
} else {
throw new EvitaInvalidUsageException(description);
}
} else {
final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description);
if (expectedFormat.matches()) {
throw GenericEvitaInternalError.createExceptionWithErrorCode(
expectedFormat.group(2), expectedFormat.group(1)
);
} else {
throw new GenericEvitaInternalError(description);
}
}
} catch (EvitaInvalidUsageException | EvitaInternalError evitaError) {
throw evitaError;
} catch (Throwable e) {
log.error("Unexpected internal Evita error occurred: {}", e.getMessage(), e);
throw new GenericEvitaInternalError(
"Unexpected internal Evita error occurred: " + e.getMessage(),
"Unexpected internal Evita error occurred.",
e
);
} finally {
this.channelPool.releaseChannel(managedChannel);
}
}

/**
* Method that is called within the {@link EvitaClientSession} to apply the wanted logic on a channel retrieved
* from a channel pool.
*
* @param lambda function that holds a logic passed by the caller
* @param <T> return type of the function
* @return result of the applied function
*/
private <T> T executeWithAsyncEvitaService(@Nonnull AsyncCallFunction<EvitaServiceStub, T> lambda) {
final ManagedChannel managedChannel = this.channelPool.getChannel();
try {
return evitaServiceBlockingStub.apply(EvitaServiceGrpc.newFutureStub(managedChannel));
return lambda.apply(EvitaServiceGrpc.newStub(managedChannel));
} catch (StatusRuntimeException statusRuntimeException) {
final Code statusCode = statusRuntimeException.getStatus().getCode();
final String description = ofNullable(statusRuntimeException.getStatus().getDescription())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,58 @@ public SealedEntity[] deleteSealedEntitiesAndReturnBodies(@Nonnull Query query)

@Override
public void backupCatalog(@Nonnull OutputStream outputStream) throws UnexpectedIOException {
/* TODO JNO - Implement me */
assertActive();
final CompletableFuture<Void> result = new CompletableFuture<>();
executeWithAsyncEvitaSessionService(
evitaSessionService -> {
evitaSessionService.backupCatalog(
Empty.newBuilder().build(),
new StreamObserver<>() {
@Override
public void onNext(GrpcBackupCatalogResponse grpcBackupCatalogResponse) {
try {
grpcBackupCatalogResponse.getBackupFile().writeTo(outputStream);
} catch (Exception ex) {
result.completeExceptionally(
new UnexpectedIOException(
"Unexpected exception occurred while backing up the catalog: " + ex.getMessage(),
"Unexpected exception occurred while backing up the catalog!",
ex
)
);
}
}

@Override
public void onError(Throwable throwable) {
result.completeExceptionally(
new UnexpectedIOException(
"Unexpected exception occurred while backing up the catalog: " + throwable.getMessage(),
"Unexpected exception occurred while backing up the catalog!",
throwable
)
);
}

@Override
public void onCompleted() {
result.complete(null);
}
}
);
return null;
}
);
// wait for result and rethrow original exception if available
try {
result.join();
} catch (RuntimeException ex) {
if (ex.getCause() instanceof RuntimeException runtimeException) {
throw runtimeException;
} else {
throw ex;
}
}
}

@Nonnull
Expand Down
Loading

0 comments on commit b2c0d75

Please sign in to comment.