Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence: Switch to PersistableStoreReaderWriter #672

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 10 additions & 98 deletions persistence/src/main/java/bisq/persistence/Persistence.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
package bisq.persistence;

import bisq.common.threading.ExecutorFactory;
import bisq.common.util.FileUtils;
import com.google.protobuf.Any;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,54 +33,32 @@
public class Persistence<T extends PersistableStore<T>> {
public static final ExecutorService PERSISTENCE_IO_POOL = ExecutorFactory.newFixedThreadPool("Persistence-io-pool");

private final String directory;
@Getter
private final String fileName;
@Getter
private final String storagePath;
private final Object lock = new Object();
private final AtomicReference<T> candidateToPersist = new AtomicReference<>();

private final PersistableStoreReaderWriter<T> persistableStoreReaderWriter;

public Persistence(String directory, String fileName) {
this.directory = directory;
this.fileName = fileName;
storagePath = directory + File.separator + fileName;

Path storePath = Path.of(directory, fileName);
var storeFileManager = new PersistableStoreFileManager(storePath);
persistableStoreReaderWriter = new PersistableStoreReaderWriter<>(storeFileManager);
}

public CompletableFuture<Optional<T>> readAsync(Consumer<T> consumer) {
return readAsync().whenComplete((result, throwable) -> result.ifPresent(consumer));
}

public CompletableFuture<Optional<T>> readAsync() {
return CompletableFuture.supplyAsync(this::read, PERSISTENCE_IO_POOL);
}

public Optional<T> read() {
File storageFile = new File(storagePath);
if (!storageFile.exists()) {
return Optional.empty();
}
try (FileInputStream fileInputStream = new FileInputStream(storagePath)) {
PersistableStore<?> persistableStore;
synchronized (lock) {
// The data we get is of type Any
Any any = Any.parseDelimitedFrom(fileInputStream);
persistableStore = PersistableStore.fromAny(any);
}
//noinspection unchecked,rawtypes
return (Optional) Optional.of(persistableStore);
} catch (Throwable exception) {
log.error("Error at read for " + storagePath, exception);
try {
FileUtils.backupCorruptedFile(directory, storageFile, fileName, "corruptedFilesAtRead");
} catch (IOException e) {
log.error("Error trying to backup corrupted file " + fileName + ": " + e.getMessage(), e);
}
return Optional.empty();
}
return CompletableFuture.supplyAsync(persistableStoreReaderWriter::read, PERSISTENCE_IO_POOL);
}


public CompletableFuture<Boolean> persistAsync(T serializable) {
synchronized (lock) {
candidateToPersist.set(serializable);
Expand All @@ -96,69 +70,7 @@ public CompletableFuture<Boolean> persistAsync(T serializable) {
}

public boolean persist(T persistableStore) {
synchronized (lock) {
boolean success = false;
File tempFile = null;
FileOutputStream fileOutputStream = null;
File storageFile = null;
try {
FileUtils.makeDirs(directory);
// We use a temp file to not risk data corruption in case the write operation fails.
// After write is done we rename the tempFile to our storageFile which is an atomic operation.
tempFile = File.createTempFile("temp_" + fileName, null, new File(directory));
FileUtils.deleteOnExit(tempFile);
storageFile = new File(storagePath);
fileOutputStream = new FileOutputStream(tempFile);

// We use an Any container (byte blob) as we do not have the dependencies to the
// external PersistableStore implementations (at deserialization we would have an issue otherwise as
// it requires static access).
Any any = persistableStore.toAny();
any.writeDelimitedTo(fileOutputStream);
fileOutputStream.flush();
fileOutputStream.getFD().sync();

// close is needed on WinOS otherwise renameFile will fail
fileOutputStream.close();
fileOutputStream = null;

// Atomic rename
boolean renameSucceeded = FileUtils.renameFile(tempFile, storageFile);
if (!renameSucceeded) {
// At shut down we get sometimes renameSucceeded=false.
// As far I observed the temp file was never left and the storage file got updated, so it seems it's
// not a critical issue.
log.debug("Renaming of tempFile to storageFile failed. tempFile={}, storageFile={}",
tempFile, storageFile);
}
//log.debug("Persisted {}", persistableStore);
success = true;
} catch (IOException ex) {
log.error("Error at read for " + storagePath + " msg: " + ex.getMessage(), ex);
try {
if (storageFile != null) {
FileUtils.backupCorruptedFile(directory, storageFile, fileName, "corruptedFilesAtWrite");
}
} catch (IOException e) {
log.error("FileUtils.backupCorruptedFile failed: " + e.getMessage(), e);
}
} finally {
try {
if (fileOutputStream != null) {
fileOutputStream.close();
}
} catch (IOException ioe) {
log.error("Error closing stream " + ioe.getMessage(), ioe); // swallow
}
if (tempFile != null) {
try {
FileUtils.releaseTempFile(tempFile);
} catch (IOException e) {
log.error("Could not delete " + tempFile, e);
}
}
}
return success;
}
persistableStoreReaderWriter.write(persistableStore);
return true;
}
}