Skip to content

Commit

Permalink
Merge pull request #672 from alvasw/switch_to_persistable_store_reade…
Browse files Browse the repository at this point in the history
…r_writer

Persistence: Switch to PersistableStoreReaderWriter
  • Loading branch information
alvasw authored Mar 3, 2023
2 parents 8852482 + b7a0cb8 commit e799876
Showing 1 changed file with 10 additions and 98 deletions.
108 changes: 10 additions & 98 deletions persistence/src/main/java/bisq/persistence/Persistence.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
package bisq.persistence;

import bisq.common.threading.ExecutorFactory;
import bisq.common.util.FileUtils;
import com.google.protobuf.Any;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,54 +33,32 @@
public class Persistence<T extends PersistableStore<T>> {
public static final ExecutorService PERSISTENCE_IO_POOL = ExecutorFactory.newFixedThreadPool("Persistence-io-pool");

private final String directory;
@Getter
private final String fileName;
@Getter
private final String storagePath;
private final Object lock = new Object();
private final AtomicReference<T> candidateToPersist = new AtomicReference<>();

private final PersistableStoreReaderWriter<T> persistableStoreReaderWriter;

public Persistence(String directory, String fileName) {
this.directory = directory;
this.fileName = fileName;
storagePath = directory + File.separator + fileName;

Path storePath = Path.of(directory, fileName);
var storeFileManager = new PersistableStoreFileManager(storePath);
persistableStoreReaderWriter = new PersistableStoreReaderWriter<>(storeFileManager);
}

public CompletableFuture<Optional<T>> readAsync(Consumer<T> consumer) {
return readAsync().whenComplete((result, throwable) -> result.ifPresent(consumer));
}

public CompletableFuture<Optional<T>> readAsync() {
return CompletableFuture.supplyAsync(this::read, PERSISTENCE_IO_POOL);
}

public Optional<T> read() {
File storageFile = new File(storagePath);
if (!storageFile.exists()) {
return Optional.empty();
}
try (FileInputStream fileInputStream = new FileInputStream(storagePath)) {
PersistableStore<?> persistableStore;
synchronized (lock) {
// The data we get is of type Any
Any any = Any.parseDelimitedFrom(fileInputStream);
persistableStore = PersistableStore.fromAny(any);
}
//noinspection unchecked,rawtypes
return (Optional) Optional.of(persistableStore);
} catch (Throwable exception) {
log.error("Error at read for " + storagePath, exception);
try {
FileUtils.backupCorruptedFile(directory, storageFile, fileName, "corruptedFilesAtRead");
} catch (IOException e) {
log.error("Error trying to backup corrupted file " + fileName + ": " + e.getMessage(), e);
}
return Optional.empty();
}
return CompletableFuture.supplyAsync(persistableStoreReaderWriter::read, PERSISTENCE_IO_POOL);
}


public CompletableFuture<Boolean> persistAsync(T serializable) {
synchronized (lock) {
candidateToPersist.set(serializable);
Expand All @@ -96,69 +70,7 @@ public CompletableFuture<Boolean> persistAsync(T serializable) {
}

public boolean persist(T persistableStore) {
synchronized (lock) {
boolean success = false;
File tempFile = null;
FileOutputStream fileOutputStream = null;
File storageFile = null;
try {
FileUtils.makeDirs(directory);
// We use a temp file to not risk data corruption in case the write operation fails.
// After write is done we rename the tempFile to our storageFile which is an atomic operation.
tempFile = File.createTempFile("temp_" + fileName, null, new File(directory));
FileUtils.deleteOnExit(tempFile);
storageFile = new File(storagePath);
fileOutputStream = new FileOutputStream(tempFile);

// We use an Any container (byte blob) as we do not have the dependencies to the
// external PersistableStore implementations (at deserialization we would have an issue otherwise as
// it requires static access).
Any any = persistableStore.toAny();
any.writeDelimitedTo(fileOutputStream);
fileOutputStream.flush();
fileOutputStream.getFD().sync();

// close is needed on WinOS otherwise renameFile will fail
fileOutputStream.close();
fileOutputStream = null;

// Atomic rename
boolean renameSucceeded = FileUtils.renameFile(tempFile, storageFile);
if (!renameSucceeded) {
// At shut down we get sometimes renameSucceeded=false.
// As far I observed the temp file was never left and the storage file got updated, so it seems it's
// not a critical issue.
log.debug("Renaming of tempFile to storageFile failed. tempFile={}, storageFile={}",
tempFile, storageFile);
}
//log.debug("Persisted {}", persistableStore);
success = true;
} catch (IOException ex) {
log.error("Error at read for " + storagePath + " msg: " + ex.getMessage(), ex);
try {
if (storageFile != null) {
FileUtils.backupCorruptedFile(directory, storageFile, fileName, "corruptedFilesAtWrite");
}
} catch (IOException e) {
log.error("FileUtils.backupCorruptedFile failed: " + e.getMessage(), e);
}
} finally {
try {
if (fileOutputStream != null) {
fileOutputStream.close();
}
} catch (IOException ioe) {
log.error("Error closing stream " + ioe.getMessage(), ioe); // swallow
}
if (tempFile != null) {
try {
FileUtils.releaseTempFile(tempFile);
} catch (IOException e) {
log.error("Could not delete " + tempFile, e);
}
}
}
return success;
}
persistableStoreReaderWriter.write(persistableStore);
return true;
}
}

0 comments on commit e799876

Please sign in to comment.