Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/multi group snapshot #42

Merged
merged 25 commits into from
Apr 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public interface StateMachine {
* call done.run(status) when snapshot finished.
* Default: Save nothing and returns error.
*
* @param writer snapshot writer
* @param done callback
* @param writer snapshot writer
* @param done callback
*/
void onSnapshotSave(SnapshotWriter writer, Closure done);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ private static boolean onAppendEntriesReturned(ThreadId id, Inflight inflight, S
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term hearbeat_response from peer:%s", r.options.getPeerId()));
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return false;
}
if (isLogDebugEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class Snapshot extends Status {
* Snapshot file prefix.
*/
public static final String JRAFT_SNAPSHOT_PREFIX = "snapshot_";
/** Snapshot uri scheme for remote peer*/
/** Snapshot uri scheme for remote peer */
public static final String REMOTE_SNAPSHOT_URI_SCHEME = "remote://";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public static boolean isWindows() {
}

private static boolean isWindows0() {
boolean windows = SystemPropertyUtil.get("os.name", "").toLowerCase(Locale.US).contains("win");
final boolean windows = SystemPropertyUtil.get("os.name", "") //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉的这里可以搞成全局的静态变量,然后在 static block 里初始化这个变量,这里就变成 getter 了,是不是性能更好点? 如果这个方法调用不频繁,倒也没关系。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩,其实现在的代码就是和你描述的思路一样的,因为 isWindows0() 是只供内部调用的,并且有一个静态变量,
private static final boolean IS_WINDOWS = isWindows0();

对使用者开放的方法是 isWindows() , 所以说 isWindows0() 只会被调用一次

.toLowerCase(Locale.US) //
.contains("win");
if (windows) {
LOG.debug("Platform: Windows");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testOnRpcReturnedTermMismatch() {
Utils.monotonicMs());
Mockito.verify(this.node).increaseTermTo(
2,
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term hearbeat_response from peer:%s",
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s",
peerId));
assertNull(r.id);
}
Expand Down Expand Up @@ -428,7 +428,7 @@ public void testOnHeartbeatReturnedTermMismatch() {
Replicator.onHeartbeatReturned(this.id, Status.OK(), request, response, Utils.monotonicMs());
Mockito.verify(this.node).increaseTermTo(
2,
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term hearbeat_response from peer:%s",
new Status(RaftError.EHIGHERTERMRESPONSE, "Leader receives higher term heartbeat_response from peer:%s",
peerId));
assertNull(r.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,11 @@ public void handleKeyLockRequest(final KeyLockRequest request,
try {
checkRegionEpoch(request);
final byte[] key = requireNonNull(request.getKey(), "lock.key");
final byte[] fencingKey = this.regionEngine.getRegion().getStartKey();
final DistributedLock.Acquirer acquirer = requireNonNull(request.getAcquirer(), "lock.acquirer");
requireNonNull(acquirer.getId(), "lock.id");
requirePositive(acquirer.getLeaseMillis(), "lock.leaseMillis");
this.rawKVStore.tryLockWith(key, request.isKeepLease(), acquirer, new BaseKVStoreClosure() {
this.rawKVStore.tryLockWith(key, fencingKey, request.isKeepLease(), acquirer, new BaseKVStoreClosure() {

@Override
public void run(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized boolean init(final RegionEngineOptions opts) {
return true;
}
this.regionOpts = Requires.requireNonNull(opts, "opts");
this.fsm = new KVStoreStateMachine(this.region.getId(), this.storeEngine);
this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);

// node options
NodeOptions nodeOpts = opts.getNodeOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,7 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp
rOpts.setRaftDataPath(baseRaftDataPath + "raft_data_region_" + region.getId() + "_"
+ getSelfEndpoint().getPort());
final RegionEngine engine = new RegionEngine(region, this);
if (engine.init(rOpts)) {
final RegionKVService regionKVService = new DefaultRegionKVService(engine);
registerRegionKVService(regionKVService);
this.regionEngineTable.put(region.getId(), engine);
} else {
if (!engine.init(rOpts)) {
LOG.error("Fail to init [RegionEngine: {}].", region);
if (closure != null) {
// null on follower
Expand All @@ -528,12 +524,20 @@ public void doSplit(final Long regionId, final Long newRegionId, final byte[] sp
}
return;
}

// update parent conf
final Region pRegion = parent.getRegion();
final RegionEpoch pEpoch = pRegion.getRegionEpoch();
final long version = pEpoch.getVersion();
pEpoch.setVersion(version + 1); // version + 1
pRegion.setEndKey(splitKey); // update endKey

// the following two lines of code can make a relation of 'happens-before' for
// read 'pRegion', because that a write to a ConcurrentMap happens-before every
// subsequent read of that ConcurrentMap.
this.regionEngineTable.put(region.getId(), engine);
registerRegionKVService(new DefaultRegionKVService(engine));

// update local regionRouteTable
this.pdClient.getRegionRouteTable().splitRegion(pRegion.getId(), region);
if (closure != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ private void internalTryLockWith(final byte[] key, final boolean keepLease, fina
retryRunner);
if (regionEngine != null) {
if (ensureOnValidEpoch(region, regionEngine, closure)) {
getRawKVStore(regionEngine).tryLockWith(key, keepLease, acquirer, closure);
getRawKVStore(regionEngine).tryLockWith(key, region.getStartKey(), keepLease, acquirer, closure);
}
} else {
final KeyLockRequest request = new KeyLockRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea.storage;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.zip.ZipOutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.rhea.serialization.Serializer;
import com.alipay.sofa.jraft.rhea.serialization.Serializers;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.rhea.util.ZipUtil;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.google.protobuf.ByteString;

import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;

/**
* @author jiachun.fjc
*/
public abstract class AbstractKVStoreSnapshotFile implements KVStoreSnapshotFile {

private static final Logger LOG = LoggerFactory.getLogger(AbstractKVStoreSnapshotFile.class);

private static final String SNAPSHOT_DIR = "kv";
private static final String SNAPSHOT_ARCHIVE = "kv.zip";

protected final Serializer serializer = Serializers.getDefault();

@Override
public void save(final SnapshotWriter writer, final Closure done, final Region region,
final ExecutorService executor) {
final String writerPath = writer.getPath();
final String snapshotPath = Paths.get(writerPath, SNAPSHOT_DIR).toString();
try {
final LocalFileMeta meta = doSnapshotSave(snapshotPath, region);
executor.execute(() -> compressSnapshot(writer, meta, done));
} catch (final Throwable t) {
LOG.error("Fail to save snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(),
StackTraceUtil.stackTrace(t));
done.run(new Status(RaftError.EIO, "Fail to save snapshot at %s, error is %s", writerPath,
t.getMessage()));
}
}

@Override
public boolean load(final SnapshotReader reader, final Region region) {
final LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_ARCHIVE);
final String readerPath = reader.getPath();
if (meta == null) {
LOG.error("Can't find kv snapshot file, path={}.", readerPath);
return false;
}
final String snapshotPath = Paths.get(readerPath, SNAPSHOT_DIR).toString();
try {
decompressSnapshot(readerPath);
doSnapshotLoad(snapshotPath, meta, region);
return true;
} catch (final Throwable t) {
LOG.error("Fail to load snapshot, path={}, file list={}, {}.", readerPath, reader.listFiles(),
StackTraceUtil.stackTrace(t));
return false;
}
}

abstract LocalFileMeta doSnapshotSave(final String snapshotPath, final Region region) throws Exception;

abstract void doSnapshotLoad(final String snapshotPath, final LocalFileMeta meta, final Region region)
throws Exception;

protected void compressSnapshot(final SnapshotWriter writer, final LocalFileMeta meta, final Closure done) {
final String writerPath = writer.getPath();
final String outputFile = Paths.get(writerPath, SNAPSHOT_ARCHIVE).toString();
try {
try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(outputFile))) {
ZipUtil.compressDirectoryToZipFile(writerPath, SNAPSHOT_DIR, out);
}
if (writer.addFile(SNAPSHOT_ARCHIVE, meta)) {
done.run(Status.OK());
} else {
done.run(new Status(RaftError.EIO, "Fail to add snapshot file: %s", writerPath));
}
} catch (final Throwable t) {
LOG.error("Fail to compress snapshot, path={}, file list={}, {}.", writerPath, writer.listFiles(),
StackTraceUtil.stackTrace(t));
done.run(new Status(RaftError.EIO, "Fail to compress snapshot at %s, error is %s", writerPath, t
.getMessage()));
}
}

protected void decompressSnapshot(final String readerPath) throws IOException {
final String sourceFile = Paths.get(readerPath, SNAPSHOT_ARCHIVE).toString();
ZipUtil.unzipFile(sourceFile, readerPath);
}

protected <T> T readMetadata(final LocalFileMeta meta, final Class<T> cls) {
final ByteString userMeta = meta.getUserMeta();
return this.serializer.readObject(userMeta.toByteArray(), cls);
}

protected <T> LocalFileMeta buildMetadata(final T metadata) {
return metadata == null ? null : LocalFileMeta.newBuilder() //
.setUserMeta(ByteString.copyFrom(this.serializer.writeObject(metadata))) //
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@
import com.alipay.sofa.jraft.rhea.errors.Errors;
import com.alipay.sofa.jraft.rhea.metrics.KVMetrics;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.codahale.metrics.Timer;

import static com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import static com.alipay.sofa.jraft.rhea.metrics.KVMetricNames.DB_TIMER;

/**
* @author jiachun.fjc
*/
public abstract class BaseRawKVStore<T> implements RawKVStore, Lifecycle<T> {

protected static final byte[] LOCK_FENCING_KEY = BytesUtil.writeUtf8("LOCK_FENCING_KEY");

@Override
public void get(final byte[] key, final KVStoreClosure closure) {
get(key, true, closure);
Expand Down Expand Up @@ -96,9 +92,13 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin
*/
public abstract byte[] jumpOver(final byte[] startKey, final long distance);

public abstract LocalFileMeta onSnapshotSave(final String snapshotPath) throws Exception;

public abstract void onSnapshotLoad(final String snapshotPath, final LocalFileMeta meta) throws Exception;
/**
* Init the fencing token of new region.
*
* @param parentKey the fencing key of parent region
* @param childKey the fencing key of new region
*/
public abstract void initFencingToken(final byte[] parentKey, final byte[] childKey);

// static methods
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void batchTryLockWith(final KVStateOutputList kvStates) {
final KVState kvState = kvStates.get(i);
final KVOperation op = kvState.getOp();
final Pair<Boolean, DistributedLock.Acquirer> acquirerPair = op.getAcquirerPair();
tryLockWith(op.getKey(), acquirerPair.getKey(), acquirerPair.getValue(), kvState.getDone());
tryLockWith(op.getKey(), op.getFencingKey(), acquirerPair.getKey(), acquirerPair.getValue(),
kvState.getDone());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ public static KVOperation createNodeExecutor(final NodeExecutor nodeExecutor) {
return new KVOperation(BytesUtil.EMPTY_BYTES, BytesUtil.EMPTY_BYTES, nodeExecutor, NODE_EXECUTE);
}

public static KVOperation createKeyLockRequest(final byte[] key,
public static KVOperation createKeyLockRequest(final byte[] key, final byte[] fencingKey,
final Pair<Boolean, DistributedLock.Acquirer> acquirerPair) {
Requires.requireNonNull(key, "key");
return new KVOperation(key, BytesUtil.EMPTY_BYTES, acquirerPair, KEY_LOCK);
return new KVOperation(key, fencingKey, acquirerPair, KEY_LOCK);
}

public static KVOperation createKeyLockReleaseRequest(final byte[] key, final DistributedLock.Acquirer acquirer) {
Expand Down Expand Up @@ -239,6 +239,10 @@ public byte[] getEndKey() {
return value;
}

public byte[] getFencingKey() {
return value;
}

public void setValue(byte[] value) {
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rhea.storage;

import java.util.concurrent.ExecutorService;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.rhea.metadata.Region;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;

/**
*
* @author jiachun.fjc
*/
public interface KVStoreSnapshotFile {

/**
* Save a snapshot for the specified region.
*
* @param writer snapshot writer
* @param done callback
* @param region the region to save snapshot
* @param executor the executor to compress snapshot
*/
void save(final SnapshotWriter writer, final Closure done, final Region region, final ExecutorService executor);

/**
* Load snapshot for the specified region.
*
* @param reader snapshot reader
* @param region the region to load snapshot
* @return true if load succeed
*/
boolean load(final SnapshotReader reader, final Region region);
}
Loading