Skip to content

Commit

Permalink
Segment log storage (#156)
Browse files Browse the repository at this point in the history
* (feat) Impl a segment log storage, #39

* (feat) revert test log level

* (feat) adds more log

* (feat) Use partitioned index filters for rocksdb block based table config, bump rocksdb to 5.18.3

* (fix) comments

* (feat) format segment file name

* (feat) reserve 2 bytes for location metadata

* (fix) hang on shutdown

* (feat) refactor and add unit test for segment storage

* (fix) race condition in doCheckpoint

* (feat) touch abort file

* (feat) refactor

* (feat) minor changes in RocksDBSegmentLogStorage#onShutdown

* fix/log storage minor fix (#191)

* (fix) minor fix

* (fix) minor fix

* (fix) minor fix

* (feat) minor changes in RocksDBSegmentLogStorage#onShutdown

* (fix) remove special characters

* (fix) forgot to reset buffer position when truncating segment file

* (feat) minor changes

* (feat) remove writeLock in checkpoint thread

* (feat) Compare reference instead of array's length

* feat/follow by storagev2 (#196)

* (feat) follow logStorage's tableConfig

* (fix) fix javadoc

* (fix) format

* (fix) log storage text path

* (fix) compare ref is better

* (fix) format

* (fix) ut fix

* (fix) lambda

* fix/read after shutdown (#215)

* (fix) read after db shutdown

* (fix) check state before access db

* (fix) by review comments

* (fix) RocksDBSegmentLogStorage#onTruncateSuffix when last log not found
  • Loading branch information
killme2008 authored and fengjiachun committed Sep 12, 2019
1 parent 6cb5feb commit 27dadb2
Show file tree
Hide file tree
Showing 21 changed files with 2,286 additions and 264 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ target/
*.bak
*.log*
data/
jraft-core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.alipay.sofa.jraft.storage.RaftMetaStorage;
import com.alipay.sofa.jraft.storage.SnapshotStorage;
import com.alipay.sofa.jraft.storage.impl.LocalRaftMetaStorage;
import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage;
import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage;
import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SPI;
Expand All @@ -47,7 +47,7 @@ public static DefaultJRaftServiceFactory newInstance() {
@Override
public LogStorage createLogStorage(final String uri, final RaftOptions raftOptions) {
Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
return new RocksDBLogStorage(uri, raftOptions);
return new RocksDBSegmentLogStorage(uri, raftOptions);
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.storage.log;

import java.io.File;
import java.io.IOException;

import com.alipay.sofa.jraft.util.Utils;

/**
* Abort file
*
* @author boyan(boyan@antfin.com)
*/
public class AbortFile {

private final String path;

public String getPath() {
return this.path;
}

public AbortFile(final String path) {
super();
this.path = path;
}

public boolean create() throws IOException {
return new File(this.path) //
.createNewFile();
}

public boolean touch() {
return new File(this.path) //
.setLastModified(Utils.nowMs());
}

public boolean exists() {
final File file = new File(this.path);
return file.isFile() && file.exists();
}

public boolean destroy() {
return new File(this.path) //
.delete();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.storage.log;

import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;

import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
import com.alipay.sofa.jraft.util.Bits;
import com.google.protobuf.ZeroByteStringHelper;

/**
* Segments checkpoint file.
*
* @author boyan(boyan@antfin.com)
*/
public class CheckpointFile {

/**
* firstLogIndex(8 B) + commitPos (4 B)
*/
private static final int CHECKPOINT_METADATA_SIZE = 12;

/**
* Checkpoint metadata info.
*
* @author boyan(boyan@antfin.com)
*/
public static final class Checkpoint {
// Segment file start offset
public final long firstLogIndex;
// Segment file current commit position.
public final int committedPos;

public Checkpoint(final long firstLogIndex, final int committedPos) {
super();
this.firstLogIndex = firstLogIndex;
this.committedPos = committedPos;
}

@Override
public String toString() {
return "Checkpoint [firstLogIndex=" + this.firstLogIndex + ", committedPos=" + this.committedPos + "]";
}
}

public void destroy() {
FileUtils.deleteQuietly(new File(this.path));
}

public String getPath() {
return this.path;
}

private final String path;

public CheckpointFile(final String path) {
super();
this.path = path;
}

public synchronized boolean save(final Checkpoint checkpoint) throws IOException {
final ProtoBufFile file = new ProtoBufFile(this.path);
final byte[] data = new byte[CHECKPOINT_METADATA_SIZE];
Bits.putLong(data, 0, checkpoint.firstLogIndex);
Bits.putInt(data, 8, checkpoint.committedPos);

final LocalFileMeta meta = LocalFileMeta.newBuilder() //
.setUserMeta(ZeroByteStringHelper.wrap(data)) //
.build();

return file.save(meta, true);
}

public Checkpoint load() throws IOException {
final ProtoBufFile file = new ProtoBufFile(this.path);
final LocalFileMeta meta = file.load();
if (meta != null) {
final byte[] data = meta.getUserMeta().toByteArray();
assert (data.length == CHECKPOINT_METADATA_SIZE);
return new Checkpoint(Bits.getLong(data, 0), Bits.getInt(data, 8));
}
return null;
}
}
Loading

0 comments on commit 27dadb2

Please sign in to comment.