Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot temporary directory support configurable #933

Merged
merged 1 commit into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ public interface JRaftServiceFactory {

/**
* Creates a raft snapshot storage
* @param uri The snapshot storage uri from {@link NodeOptions#getSnapshotUri()}
* @param raftOptions the raft options.
* @param nodeOptions the node options.
* @return storage to store state machine snapshot.
*/
SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions);
SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions);

/**
* Creates a raft meta storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.option.NodeOptions;
import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.JRaftServiceFactory;
Expand Down Expand Up @@ -51,9 +52,11 @@ public LogStorage createLogStorage(final String uri, final RaftOptions raftOptio
}

@Override
public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
public SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions) {
String uri = nodeOptions.getSnapshotUri();
String tempUri = nodeOptions.getSnapshotTempUri();
Requires.requireTrue(!StringUtils.isBlank(uri), "Blank snapshot storage uri.");
return new LocalSnapshotStorage(uri, raftOptions);
return new LocalSnapshotStorage(uri, tempUri, nodeOptions.getRaftOptions());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ private boolean initSnapshotStorage() {
}
this.snapshotExecutor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
opts.setUri(this.options.getSnapshotUri());
opts.setFsmCaller(this.fsmCaller);
opts.setNode(this);
opts.setLogManager(this.logManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// Describe a specific SnapshotStorage in format ${type}://${parameters}
private String snapshotUri;

private String snapshotTempUri;

// If enable, we will filter duplicate files before copy remote snapshot,
// to avoid useless transmission. Two files in local and remote are duplicate,
// only if they has the same filename and the same checksum (stored in file meta).
Expand Down Expand Up @@ -366,6 +368,14 @@ public void setSnapshotUri(final String snapshotUri) {
this.snapshotUri = snapshotUri;
}

public String getSnapshotTempUri() {
return snapshotTempUri;
}

public void setSnapshotTempUri(String snapshotTempUri) {
this.snapshotTempUri = snapshotTempUri;
}

public boolean isFilterBeforeCopyRemote() {
return this.filterBeforeCopyRemote;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
*/
public class SnapshotExecutorOptions {

// URI of SnapshotStorage
private String uri;
private FSMCaller fsmCaller;
private NodeImpl node;
private LogManager logManager;
Expand All @@ -49,14 +47,6 @@ public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) {
this.snapshotThrottle = snapshotThrottle;
}

public String getUri() {
return this.uri;
}

public void setUri(String uri) {
this.uri = uri;
}

public FSMCaller getFsmCaller() {
return this.fsmCaller;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.alipay.sofa.jraft.storage.snapshot;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -224,16 +226,18 @@ public SnapshotReader start() {

@Override
public boolean init(final SnapshotExecutorOptions opts) {
if (StringUtils.isBlank(opts.getUri())) {
this.node = opts.getNode();
Objects.requireNonNull(this.node, "Node is null.");
NodeOptions nodeOptions = this.node.getOptions();
String snapshotUri = nodeOptions.getSnapshotUri();
if (StringUtils.isBlank(snapshotUri)) {
LOG.error("Snapshot uri is empty.");
return false;
}
this.logManager = opts.getLogManager();
this.fsmCaller = opts.getFsmCaller();
this.node = opts.getNode();
this.term = opts.getInitTerm();
this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(opts.getUri(),
this.node.getRaftOptions());
this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(nodeOptions);
if (opts.isFilterBeforeCopyRemote()) {
this.snapshotStorage.setFilterBeforeCopyRemote();
}
Expand All @@ -257,7 +261,7 @@ public boolean init(final SnapshotExecutorOptions opts) {
}
this.loadingSnapshotMeta = reader.load();
if (this.loadingSnapshotMeta == null) {
LOG.error("Fail to load meta from {}.", opts.getUri());
LOG.error("Fail to load meta from {}.", snapshotUri);
Utils.closeQuietly(reader);
return false;
}
Expand All @@ -276,7 +280,7 @@ public boolean init(final SnapshotExecutorOptions opts) {
Utils.closeQuietly(reader);
}
if (!done.status.isOk()) {
LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", opts.getUri(), done.status);
LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", snapshotUri, done.status);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +50,7 @@
* Snapshot storage based on local file storage.
*
* @author boyan (boyan@alibaba-inc.com)
*
* <p>
* 2018-Mar-13 2:11:30 PM
*/
public class LocalSnapshotStorage implements SnapshotStorage {
Expand All @@ -58,6 +60,7 @@ public class LocalSnapshotStorage implements SnapshotStorage {
private static final String TEMP_PATH = "temp";
private final ConcurrentMap<Long, AtomicInteger> refMap = new ConcurrentHashMap<>();
private final String path;
private final String tempPath;
private Endpoint addr;
private boolean filterBeforeCopyRemote;
private long lastSnapshotIndex;
Expand All @@ -78,14 +81,33 @@ public void setServerAddr(Endpoint addr) {
this.addr = addr;
}

public LocalSnapshotStorage(String path, RaftOptions raftOptions) {
public LocalSnapshotStorage(String path, String tempPath, RaftOptions raftOptions) {
super();
this.path = path;
if (StringUtils.isEmpty(tempPath)) {
this.tempPath = buildTempPath(this.path);
} else {
File pathFile = new File(path);
File tempPathFile = new File(tempPath);

String pathAbsolutePath = pathFile.getAbsolutePath();
String tempPathAbsolutePath = tempPathFile.getAbsolutePath();
if (pathAbsolutePath.equals(tempPathAbsolutePath) || pathAbsolutePath.startsWith(tempPathAbsolutePath)) {
this.tempPath = buildTempPath(this.path);
} else {
this.tempPath = tempPath;
}
LOG.info("The snapshot temp path is {}", this.tempPath);
}
this.lastSnapshotIndex = 0;
this.raftOptions = raftOptions;
this.lock = new ReentrantLock();
}

private String buildTempPath(String path) {
return Paths.get(path, TEMP_PATH).toString();
}

public long getLastSnapshotIndex() {
this.lock.lock();
try {
Expand All @@ -108,13 +130,12 @@ public boolean init(final Void v) {

// delete temp snapshot
if (!this.filterBeforeCopyRemote) {
final String tempSnapshotPath = this.path + File.separator + TEMP_PATH;
final File tempFile = new File(tempSnapshotPath);
final File tempFile = new File(this.tempPath);
if (tempFile.exists()) {
try {
FileUtils.forceDelete(tempFile);
} catch (final IOException e) {
LOG.error("Fail to delete temp snapshot path {}.", tempSnapshotPath, e);
LOG.error("Fail to delete temp snapshot path {}.", this.tempPath, e);
return false;
}
}
Expand Down Expand Up @@ -223,7 +244,6 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro
break;
}
// rename temp to new
final String tempPath = this.path + File.separator + TEMP_PATH;
final String newPath = getSnapshotPath(newIndex);

if (!destroySnapshot(newPath)) {
Expand All @@ -232,11 +252,11 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro
ioe = new IOException("Fail to delete new snapshot path: " + newPath);
break;
}
LOG.info("Renaming {} to {}.", tempPath, newPath);
if (!Utils.atomicMoveFile(new File(tempPath), new File(newPath), true)) {
LOG.error("Renamed temp snapshot failed, from path {} to path {}.", tempPath, newPath);
LOG.info("Renaming {} to {}.", this.tempPath, newPath);
if (!Utils.atomicMoveFile(new File(this.tempPath), new File(newPath), true)) {
LOG.error("Renamed temp snapshot failed, from path {} to path {}.", this.tempPath, newPath);
ret = RaftError.EIO.getNumber();
ioe = new IOException("Fail to rename temp snapshot from: " + tempPath + " to: " + newPath);
ioe = new IOException("Fail to rename temp snapshot from: " + this.tempPath + " to: " + newPath);
break;
}
ref(newIndex);
Expand Down Expand Up @@ -283,15 +303,14 @@ public SnapshotWriter create(final boolean fromEmpty) {
LocalSnapshotWriter writer = null;
// noinspection ConstantConditions
do {
final String snapshotPath = this.path + File.separator + TEMP_PATH;
// delete temp
// TODO: Notify watcher before deleting
if (new File(snapshotPath).exists() && fromEmpty) {
if (!destroySnapshot(snapshotPath)) {
if (new File(this.tempPath).exists() && fromEmpty) {
if (!destroySnapshot(this.tempPath)) {
break;
}
}
writer = new LocalSnapshotWriter(snapshotPath, this, this.raftOptions);
writer = new LocalSnapshotWriter(this.tempPath, this, this.raftOptions);
if (!writer.init(null)) {
LOG.error("Fail to init snapshot writer.");
writer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,22 @@ public void setup() throws Exception {
this.uri = "remote://" + this.hostPort + "/" + this.readerId;
this.copyOpts = new CopyOptions();

NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotUri(this.path);
Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID);
Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions());
Mockito.when(this.node.getOptions()).thenReturn(new NodeOptions());
Mockito.when(this.node.getRaftOptions()).thenReturn(this.raftOptions);
Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
Mockito.when(this.node.getRpcService()).thenReturn(this.raftClientService);
Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager);
Mockito.when(this.node.getServiceFactory()).thenReturn(DefaultJRaftServiceFactory.newInstance());

this.executor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
opts.setFsmCaller(this.fSMCaller);
opts.setInitTerm(0);
opts.setNode(this.node);
opts.setLogManager(this.logManager);
opts.setUri(this.path);

this.addr = new Endpoint("localhost", 8081);
opts.setAddr(this.addr);
assertTrue(this.executor.init(opts));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setup() throws Exception {
.setLastIncludedTerm(1).build());
this.table.saveToFile(snapshotPath + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);

this.snapshotStorage = new LocalSnapshotStorage(path, new RaftOptions());
this.snapshotStorage = new LocalSnapshotStorage(path, null, new RaftOptions());
assertTrue(this.snapshotStorage.init(null));
}

Expand Down