diff --git a/.gitignore b/.gitignore index 9c7bed100..9e9710ce3 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ target/ *.bak *.log* data/ +jraft-core/src/test/resources/log4j2.xml diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java index 26321f49f..deba4c251 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/DefaultJRaftServiceFactory.java @@ -26,7 +26,7 @@ import com.alipay.sofa.jraft.storage.RaftMetaStorage; import com.alipay.sofa.jraft.storage.SnapshotStorage; import com.alipay.sofa.jraft.storage.impl.LocalRaftMetaStorage; -import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage; +import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage; import com.alipay.sofa.jraft.storage.snapshot.local.LocalSnapshotStorage; import com.alipay.sofa.jraft.util.Requires; import com.alipay.sofa.jraft.util.SPI; @@ -47,7 +47,7 @@ public static DefaultJRaftServiceFactory newInstance() { @Override public LogStorage createLogStorage(final String uri, final RaftOptions raftOptions) { Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri."); - return new RocksDBLogStorage(uri, raftOptions); + return new RocksDBSegmentLogStorage(uri, raftOptions); } @Override diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java index ca26a1030..68c711813 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorage.java @@ -17,6 +17,7 @@ package com.alipay.sofa.jraft.storage.impl; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -84,7 +85,7 @@ public class RocksDBLogStorage implements LogStorage { */ private interface WriteBatchTemplate { - void execute(WriteBatch batch) throws RocksDBException; + void execute(WriteBatch batch) throws RocksDBException, IOException; } private final String path; @@ -96,9 +97,9 @@ private interface WriteBatchTemplate { private ColumnFamilyHandle defaultHandle; private ColumnFamilyHandle confHandle; private ReadOptions totalOrderReadOptions; - private final ReadWriteLock lock = new ReentrantReadWriteLock(false); - private final Lock readLock = this.lock.readLock(); - private final Lock writeLock = this.lock.writeLock(); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = this.readWriteLock.readLock(); + private final Lock writeLock = this.readWriteLock.writeLock(); private volatile long firstLogIndex = 1; @@ -114,13 +115,20 @@ public RocksDBLogStorage(final String path, final RaftOptions raftOptions) { } private static BlockBasedTableConfig createTableConfig() { - return new BlockBasedTableConfig(). // - setIndexType(IndexType.kHashSearch). // use hash search(btree) for prefix scan. - setBlockSize(4 * SizeUnit.KB).// - setFilter(new BloomFilter(16, false)). // - setCacheIndexAndFilterBlocks(true). // - setBlockCacheSize(512 * SizeUnit.MB). // - setCacheNumShardBits(8); + return new BlockBasedTableConfig() // + // Begin to use partitioned index filters + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters#how-to-use-it + .setIndexType(IndexType.kTwoLevelIndexSearch) // + .setFilter(new BloomFilter(16, false)) // + .setPartitionFilters(true) // + .setMetadataBlockSize(8 * SizeUnit.KB) // + .setCacheIndexAndFilterBlocks(false) // + .setCacheIndexAndFilterBlocksWithHighPriority(true) // + .setPinL0FilterAndIndexBlocksInCache(true) // + // End of partitioned index filters settings. + .setBlockSize(4 * SizeUnit.KB)// + .setBlockCacheSize(512 * SizeUnit.MB) // + .setCacheNumShardBits(8); } public static DBOptions createDBOptions() { @@ -129,11 +137,10 @@ public static DBOptions createDBOptions() { public static ColumnFamilyOptions createColumnFamilyOptions() { final BlockBasedTableConfig tConfig = createTableConfig(); - final ColumnFamilyOptions options = StorageOptionsFactory - .getRocksDBColumnFamilyOptions(RocksDBLogStorage.class); - return options.useFixedLengthPrefixExtractor(8). // - setTableFormatConfig(tConfig). // - setMergeOperator(new StringAppendOperator()); + return StorageOptionsFactory.getRocksDBColumnFamilyOptions(RocksDBLogStorage.class) // + .useFixedLengthPrefixExtractor(8) // + .setTableFormatConfig(tConfig) // + .setMergeOperator(new StringAppendOperator()); } @Override @@ -159,7 +166,7 @@ public boolean init(final LogStorageOptions opts) { return initAndLoad(opts.getConfigurationManager()); } catch (final RocksDBException e) { - LOG.error("Fail to init RocksDBLogStorage, path={}", this.path, e); + LOG.error("Fail to init RocksDBLogStorage, path={}.", this.path, e); return false; } finally { this.writeLock.unlock(); @@ -173,21 +180,28 @@ private boolean initAndLoad(final ConfigurationManager confManager) throws Rocks final List columnFamilyDescriptors = new ArrayList<>(); final ColumnFamilyOptions cfOption = createColumnFamilyOptions(); this.cfOptions.add(cfOption); + // Column family to store configuration log entry. columnFamilyDescriptors.add(new ColumnFamilyDescriptor("Configuration".getBytes(), cfOption)); - // default column family + // Default column family to store user data log entry. columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOption)); openDB(columnFamilyDescriptors); load(confManager); - return true; + return onInitLoaded(); } + /** + * First log index and last log index key in configuration column family. + */ + public static final byte[] FIRST_LOG_IDX_KEY = Utils.getBytes("meta/firstLogIndex"); + private void load(final ConfigurationManager confManager) { - try (RocksIterator it = this.db.newIterator(this.confHandle, this.totalOrderReadOptions)) { + checkState(); + try (final RocksIterator it = this.db.newIterator(this.confHandle, this.totalOrderReadOptions)) { it.seekToFirst(); while (it.isValid()) { - final byte[] bs = it.value(); final byte[] ks = it.key(); + final byte[] bs = it.value(); // LogEntry index if (ks.length == 8) { @@ -205,15 +219,15 @@ private void load(final ConfigurationManager confManager) { } } } else { - LOG.warn("Fail to decode conf entry at index {}, the log data is: {}", - Bits.getLong(it.key(), 0), BytesUtil.toHex(bs)); + LOG.warn("Fail to decode conf entry at index {}, the log data is: {}.", Bits.getLong(ks, 0), + BytesUtil.toHex(bs)); } } else { if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) { setFirstLogIndex(Bits.getLong(bs, 0)); truncatePrefixInBackground(0L, this.firstLogIndex); } else { - LOG.warn("Unknown entry in configuration storage key={}, value={}", BytesUtil.toHex(ks), + LOG.warn("Unknown entry in configuration storage key={}, value={}.", BytesUtil.toHex(ks), BytesUtil.toHex(bs)); } } @@ -227,11 +241,6 @@ private void setFirstLogIndex(final long index) { this.hasLoadFirstLogIndex = true; } - /** - * First log inex and last log index key in configuration column family. - */ - public static final byte[] FIRST_LOG_IDX_KEY = Utils.getBytes("meta/firstLogIndex"); - /** * Save the first log index into conf column family. */ @@ -240,10 +249,11 @@ private boolean saveFirstLogIndex(final long firstLogIndex) { try { final byte[] vs = new byte[8]; Bits.putLong(vs, 0, firstLogIndex); + checkState(); this.db.put(this.confHandle, this.writeOptions, FIRST_LOG_IDX_KEY, vs); return true; } catch (final RocksDBException e) { - LOG.error("Fail to save first log index {}", firstLogIndex, e); + LOG.error("Fail to save first log index {}.", firstLogIndex, e); return false; } finally { this.readLock.unlock(); @@ -259,10 +269,15 @@ private void openDB(final List columnFamilyDescriptors) } this.db = RocksDB.open(this.dbOptions, this.path, columnFamilyDescriptors, columnFamilyHandles); + assert (columnFamilyHandles.size() == 2); this.confHandle = columnFamilyHandles.get(0); this.defaultHandle = columnFamilyHandles.get(1); } + private void checkState() { + Requires.requireNonNull(this.db, "DB not initialized or destroyed"); + } + /** * Execute write batch template. * @@ -270,11 +285,19 @@ private void openDB(final List columnFamilyDescriptors) */ private boolean executeBatch(final WriteBatchTemplate template) { this.readLock.lock(); + if (this.db == null) { + LOG.warn("DB not initialized or destroyed."); + this.readLock.unlock(); + return false; + } try (final WriteBatch batch = new WriteBatch()) { template.execute(batch); this.db.write(this.writeOptions, batch); } catch (final RocksDBException e) { - LOG.error("Execute rocksdb operation failed", e); + LOG.error("Execute batch failed with rocksdb exception.", e); + return false; + } catch (final IOException e) { + LOG.error("Execute batch failed with io exception.", e); return false; } finally { this.readLock.unlock(); @@ -289,6 +312,7 @@ public void shutdown() { // The shutdown order is matter. // 1. close column family handles closeDB(); + onShutdown(); // 2. close column family options. for (final ColumnFamilyOptions opt : this.cfOptions) { opt.close(); @@ -303,6 +327,7 @@ public void shutdown() { this.writeOptions = null; this.defaultHandle = null; this.confHandle = null; + LOG.info("DB destroyed, the db path is: {}.", this.path); } finally { this.writeLock.unlock(); } @@ -322,6 +347,7 @@ public long getFirstLogIndex() { if (this.hasLoadFirstLogIndex) { return this.firstLogIndex; } + checkState(); it = this.db.newIterator(this.defaultHandle, this.totalOrderReadOptions); it.seekToFirst(); if (it.isValid()) { @@ -342,6 +368,7 @@ public long getFirstLogIndex() { @Override public long getLastLogIndex() { this.readLock.lock(); + checkState(); try (final RocksIterator it = this.db.newIterator(this.defaultHandle, this.totalOrderReadOptions)) { it.seekToLast(); if (it.isValid()) { @@ -360,27 +387,32 @@ public LogEntry getEntry(final long index) { if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) { return null; } - final byte[] bs = this.db.get(this.defaultHandle, getKeyBytes(index)); + final byte[] keyBytes = getKeyBytes(index); + final byte[] bs = onDataGet(index, getValueFromRocksDB(keyBytes)); if (bs != null) { final LogEntry entry = this.logEntryDecoder.decode(bs); if (entry != null) { return entry; } else { - LOG.error("Bad log entry format for index={}, the log data is: {}", index, BytesUtil.toHex(bs)); + LOG.error("Bad log entry format for index={}, the log data is: {}.", index, BytesUtil.toHex(bs)); // invalid data remove? TODO return null; } } - } catch (final RocksDBException e) { - LOG.error("Fail to get log entry at index {}", index, e); - return null; + } catch (final RocksDBException | IOException e) { + LOG.error("Fail to get log entry at index {}.", index, e); } finally { this.readLock.unlock(); } return null; } - private byte[] getKeyBytes(final long index) { + protected byte[] getValueFromRocksDB(final byte[] keyBytes) throws RocksDBException { + checkState(); + return this.db.get(this.defaultHandle, keyBytes); + } + + protected byte[] getKeyBytes(final long index) { final byte[] ks = new byte[8]; Bits.putLong(ks, 0, index); return ks; @@ -402,25 +434,33 @@ private void addConfBatch(final LogEntry entry, final WriteBatch batch) throws R batch.put(this.confHandle, ks, content); } - private void addDataBatch(final LogEntry entry, final WriteBatch batch) throws RocksDBException { - final byte[] ks = getKeyBytes(entry.getId().getIndex()); + private void addDataBatch(final LogEntry entry, final WriteBatch batch) throws RocksDBException, IOException { + final long logIndex = entry.getId().getIndex(); final byte[] content = this.logEntryEncoder.encode(entry); - batch.put(this.defaultHandle, ks, content); + batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content)); } @Override public boolean appendEntry(final LogEntry entry) { if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) { return executeBatch(batch -> addConfBatch(entry, batch)); - } else { this.readLock.lock(); try { - this.db.put(this.defaultHandle, getKeyBytes(entry.getId().getIndex()), - this.logEntryEncoder.encode(entry)); + if (this.db == null) { + LOG.warn("DB not initialized or destroyed."); + return false; + } + final long logIndex = entry.getId().getIndex(); + final byte[] valueBytes = this.logEntryEncoder.encode(entry); + final byte[] newValueBytes = onDataAppend(logIndex, valueBytes); + this.db.put(this.defaultHandle, this.writeOptions, getKeyBytes(logIndex), newValueBytes); + if (newValueBytes != valueBytes) { + doSync(); + } return true; - } catch (final RocksDBException e) { - LOG.error("Fail to append entry", e); + } catch (final RocksDBException | IOException e) { + LOG.error("Fail to append entry.", e); return false; } finally { this.readLock.unlock(); @@ -428,6 +468,12 @@ public boolean appendEntry(final LogEntry entry) { } } + private void doSync() throws IOException { + if (this.sync) { + onSync(); + } + } + @Override public int appendEntries(final List entries) { if (entries == null || entries.isEmpty()) { @@ -443,6 +489,7 @@ public int appendEntries(final List entries) { addDataBatch(entry, batch); } } + doSync(); }); if (ret) { @@ -450,7 +497,6 @@ public int appendEntries(final List entries) { } else { return 0; } - } @Override @@ -478,10 +524,11 @@ private void truncatePrefixInBackground(final long startIndex, final long firstI if (this.db == null) { return; } + onTruncatePrefix(startIndex, firstIndexKept); this.db.deleteRange(this.defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept)); this.db.deleteRange(this.confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept)); - } catch (final RocksDBException e) { - LOG.error("Fail to truncatePrefix {}", firstIndexKept, e); + } catch (final RocksDBException | IOException e) { + LOG.error("Fail to truncatePrefix {}.", firstIndexKept, e); } finally { this.readLock.unlock(); } @@ -492,14 +539,17 @@ private void truncatePrefixInBackground(final long startIndex, final long firstI public boolean truncateSuffix(final long lastIndexKept) { this.readLock.lock(); try { - this.db.deleteRange(this.defaultHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1), - getKeyBytes(getLastLogIndex() + 1)); - this.db.deleteRange(this.confHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1), - getKeyBytes(getLastLogIndex() + 1)); + try { + onTruncateSuffix(lastIndexKept); + } finally { + this.db.deleteRange(this.defaultHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1), + getKeyBytes(getLastLogIndex() + 1)); + this.db.deleteRange(this.confHandle, this.writeOptions, getKeyBytes(lastIndexKept + 1), + getKeyBytes(getLastLogIndex() + 1)); + } return true; - } catch (final RocksDBException e) { - LOG.error("Fail to truncateSuffix {}", lastIndexKept, e); - + } catch (final RocksDBException | IOException e) { + LOG.error("Fail to truncateSuffix {}.", lastIndexKept, e); } finally { this.readLock.unlock(); } @@ -512,28 +562,98 @@ public boolean reset(final long nextLogIndex) { throw new IllegalArgumentException("Invalid next log index."); } this.writeLock.lock(); - try (Options opt = new Options()) { + try (final Options opt = new Options()) { LogEntry entry = getEntry(nextLogIndex); closeDB(); try { RocksDB.destroyDB(this.path, opt); + onReset(nextLogIndex); if (initAndLoad(null)) { if (entry == null) { entry = new LogEntry(); entry.setType(EntryType.ENTRY_TYPE_NO_OP); entry.setId(new LogId(nextLogIndex, 0)); - LOG.warn("Entry not found for nextLogIndex {} when reset", nextLogIndex); + LOG.warn("Entry not found for nextLogIndex {} when reset.", nextLogIndex); } return appendEntry(entry); } else { return false; } } catch (final RocksDBException e) { - LOG.error("Fail to reset next log index", e); + LOG.error("Fail to reset next log index.", e); return false; } } finally { this.writeLock.unlock(); } } + + // Hooks for {@link RocksDBSegmentLogStorage} + + /** + * Called after opening RocksDB and loading configuration into conf manager. + */ + protected boolean onInitLoaded() { + return true; + } + + /** + * Called after closing db. + */ + protected void onShutdown() { + } + + /** + * Called after resetting db. + * + * @param nextLogIndex next log index + */ + protected void onReset(final long nextLogIndex) { + } + + /** + * Called after truncating prefix logs in rocksdb. + * + * @param startIndex the start index + * @param firstIndexKept the first index to kept + */ + protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException, + IOException { + } + + /** + * Called when sync data into file system. + */ + protected void onSync() throws IOException { + } + + /** + * Called after truncating suffix logs in rocksdb. + * + * @param lastIndexKept the last index to kept + */ + protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException { + } + + /** + * Called before appending data entry. + * + * @param logIndex the log index + * @param value the data value in log entry. + * @return the new value + */ + protected byte[] onDataAppend(final long logIndex, final byte[] value) throws IOException { + return value; + } + + /** + * Called after getting data from rocksdb. + * + * @param logIndex the log index + * @param value the value in rocksdb + * @return the new value + */ + protected byte[] onDataGet(final long logIndex, final byte[] value) throws IOException { + return value; + } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java new file mode 100644 index 000000000..4288a1a8c --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import java.io.File; +import java.io.IOException; + +import com.alipay.sofa.jraft.util.Utils; + +/** + * Abort file + * + * @author boyan(boyan@antfin.com) + */ +public class AbortFile { + + private final String path; + + public String getPath() { + return this.path; + } + + public AbortFile(final String path) { + super(); + this.path = path; + } + + public boolean create() throws IOException { + return new File(this.path) // + .createNewFile(); + } + + public boolean touch() { + return new File(this.path) // + .setLastModified(Utils.nowMs()); + } + + public boolean exists() { + final File file = new File(this.path); + return file.isFile() && file.exists(); + } + + public boolean destroy() { + return new File(this.path) // + .delete(); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java new file mode 100644 index 000000000..9ad0e7fe3 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/CheckpointFile.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta; +import com.alipay.sofa.jraft.storage.io.ProtoBufFile; +import com.alipay.sofa.jraft.util.Bits; +import com.google.protobuf.ZeroByteStringHelper; + +/** + * Segments checkpoint file. + * + * @author boyan(boyan@antfin.com) + */ +public class CheckpointFile { + + /** + * firstLogIndex(8 B) + commitPos (4 B) + */ + private static final int CHECKPOINT_METADATA_SIZE = 12; + + /** + * Checkpoint metadata info. + * + * @author boyan(boyan@antfin.com) + */ + public static final class Checkpoint { + // Segment file start offset + public final long firstLogIndex; + // Segment file current commit position. + public final int committedPos; + + public Checkpoint(final long firstLogIndex, final int committedPos) { + super(); + this.firstLogIndex = firstLogIndex; + this.committedPos = committedPos; + } + + @Override + public String toString() { + return "Checkpoint [firstLogIndex=" + this.firstLogIndex + ", committedPos=" + this.committedPos + "]"; + } + } + + public void destroy() { + FileUtils.deleteQuietly(new File(this.path)); + } + + public String getPath() { + return this.path; + } + + private final String path; + + public CheckpointFile(final String path) { + super(); + this.path = path; + } + + public synchronized boolean save(final Checkpoint checkpoint) throws IOException { + final ProtoBufFile file = new ProtoBufFile(this.path); + final byte[] data = new byte[CHECKPOINT_METADATA_SIZE]; + Bits.putLong(data, 0, checkpoint.firstLogIndex); + Bits.putInt(data, 8, checkpoint.committedPos); + + final LocalFileMeta meta = LocalFileMeta.newBuilder() // + .setUserMeta(ZeroByteStringHelper.wrap(data)) // + .build(); + + return file.save(meta, true); + } + + public Checkpoint load() throws IOException { + final ProtoBufFile file = new ProtoBufFile(this.path); + final LocalFileMeta meta = file.load(); + if (meta != null) { + final byte[] data = meta.getUserMeta().toByteArray(); + assert (data.length == CHECKPOINT_METADATA_SIZE); + return new Checkpoint(Bits.getLong(data, 0), Bits.getInt(data, 8)); + } + return null; + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java new file mode 100644 index 000000000..1799106a0 --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java @@ -0,0 +1,651 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import org.apache.commons.io.FileUtils; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.error.LogEntryCorruptedException; +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.storage.impl.RocksDBLogStorage; +import com.alipay.sofa.jraft.storage.log.CheckpointFile.Checkpoint; +import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions; +import com.alipay.sofa.jraft.util.Bits; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.Utils; + +/** + * Log Storage implementation based on rocksdb and segment files. + * + * @author boyan(boyan@antfin.com) + */ +public class RocksDBSegmentLogStorage extends RocksDBLogStorage { + + private static final Logger LOG = LoggerFactory + .getLogger(RocksDBSegmentLogStorage.class); + + /** + * Default checkpoint interval in milliseconds. + */ + private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = SystemPropertyUtil + .getInt( + "jraft.log_storage.segment.checkpoint.interval.ms", + 5000); + + /** + * Location metadata format: + * 1. magic bytes + * 2. reserved(2 B) + * 3. segmentFileName(8 B) + * 4. wrotePosition(4 B) + */ + private static final int LOCATION_METADATA_SIZE = SegmentFile.MAGIC_BYTES_SIZE + 2 + 8 + 4; + + /** + * Max segment file size, 1G + */ + private static final int MAX_SEGMENT_FILE_SIZE = SystemPropertyUtil.getInt( + "jraft.log_storage.segment.max.size.bytes", + 1024 * 1024 * 1024); + + // Default value size threshold to decide whether it will be stored in segments or rocksdb, default is 4K. + // When the value size is less than 4K, it will be stored in rocksdb directly. + private static int DEFAULT_VALUE_SIZE_THRESHOLD = SystemPropertyUtil + .getInt( + "jraft.log_storage.segment.value.threshold.bytes", + 4 * 1024); + + private final int valueSizeThreshold; + private final String segmentsPath; + private final CheckpointFile checkpointFile; + private List segments; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock writeLock = this.readWriteLock.writeLock(); + private final Lock readLock = this.readWriteLock.readLock(); + private ScheduledExecutorService checkpointExecutor; + private final AbortFile abortFile; + + public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions) { + this(path, raftOptions, DEFAULT_VALUE_SIZE_THRESHOLD); + } + + public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions, final int valueSizeThreshold) { + super(path, raftOptions); + this.segmentsPath = path + File.separator + "segments"; + this.abortFile = new AbortFile(this.segmentsPath + File.separator + "abort"); + this.checkpointFile = new CheckpointFile(this.segmentsPath + File.separator + "checkpoint"); + this.valueSizeThreshold = valueSizeThreshold; + + } + + private SegmentFile getLastSegmentFile(final long logIndex, final int waitToWroteSize, + final boolean createIfNecessary) throws IOException { + this.readLock.lock(); + try { + return getLastSegmentFileUnLock(logIndex, waitToWroteSize, createIfNecessary); + } finally { + this.readLock.unlock(); + } + } + + private SegmentFile getLastSegmentFileUnLock(final long logIndex, final int waitToWroteSize, + final boolean createIfNecessary) throws IOException { + SegmentFile lastFile = null; + if (!this.segments.isEmpty()) { + final SegmentFile currLastFile = this.segments.get(this.segments.size() - 1); + if (!currLastFile.reachesFileEndBy(waitToWroteSize)) { + lastFile = currLastFile; + } + } + + if (lastFile == null && createIfNecessary) { + lastFile = createNewSegmentFile(logIndex); + } + return lastFile; + } + + private SegmentFile createNewSegmentFile(final long logIndex) throws IOException { + this.writeLock.lock(); + try { + if (!this.segments.isEmpty()) { + // Sync current last file and correct it's lastLogIndex. + final SegmentFile currLastFile = this.segments.get(this.segments.size() - 1); + currLastFile.sync(); + currLastFile.setLastLogIndex(logIndex - 1); + } + final SegmentFile segmentFile = new SegmentFile(logIndex, MAX_SEGMENT_FILE_SIZE, this.segmentsPath); + LOG.info("Create a new segment file {} with firstLogIndex={}.", segmentFile.getPath(), logIndex); + if (!segmentFile.init(new SegmentFileOptions(false, true, 0))) { + throw new IOException("Fail to create new segment file for logIndex: " + logIndex); + } + this.segments.add(segmentFile); + return segmentFile; + } finally { + this.writeLock.unlock(); + } + } + + @Override + protected void onSync() throws IOException { + final SegmentFile lastSegmentFile = getLastSegmentFileForRead(); + if (lastSegmentFile != null) { + lastSegmentFile.sync(); + } + } + + private static final Pattern SEGMENT_FILE_NAME_PATTERN = Pattern.compile("[0-9]+"); + + @Override + protected boolean onInitLoaded() { + final long startMs = Utils.monotonicMs(); + this.writeLock.lock(); + try { + final File segmentsDir = new File(this.segmentsPath); + try { + FileUtils.forceMkdir(segmentsDir); + } catch (final IOException e) { + LOG.error("Fail to create segments directory: {}", this.segmentsPath, e); + return false; + } + final Checkpoint checkpoint; + try { + checkpoint = this.checkpointFile.load(); + if (checkpoint != null) { + LOG.info("Loaded checkpoint: {} from {}.", checkpoint, this.checkpointFile.getPath()); + } + } catch (final IOException e) { + LOG.error("Fail to load checkpoint file: {}", this.checkpointFile.getPath(), e); + return false; + } + + final File[] segmentFiles = segmentsDir + .listFiles((final File dir, final String name) -> SEGMENT_FILE_NAME_PATTERN.matcher(name).matches()); + + final boolean normalExit = !this.abortFile.exists(); + if (!normalExit) { + LOG.info("{} {} did not exit normally, will try to recover last file.", getServiceName(), + this.segmentsPath); + } + this.segments = new ArrayList<>(segmentFiles == null ? 10 : segmentFiles.length); + if (segmentFiles != null && segmentFiles.length > 0) { + // Sort by file names. + Arrays.sort(segmentFiles, Comparator.comparing(a -> Long.valueOf(a.getName()))); + + final String checkpointFileName = getCheckpointFileName(checkpoint); + + boolean needRecover = false; + SegmentFile prevFile = null; + for (int i = 0; i < segmentFiles.length; i++) { + final File segFile = segmentFiles[i]; + final boolean isLastFile = i == segmentFiles.length - 1; + int pos = (int) segFile.length(); //position to recover or write. + if (segFile.getName().equals(checkpointFileName)) { + needRecover = true; + pos = checkpoint.committedPos; + } else { + if (needRecover) { + pos = 0; + } + } + final long firstLogIndex = Long.parseLong(segFile.getName()); + + final SegmentFile segmentFile = new SegmentFile(firstLogIndex, MAX_SEGMENT_FILE_SIZE, + this.segmentsPath); + + if (!segmentFile.init(new SegmentFileOptions(needRecover && !normalExit, isLastFile, pos))) { + LOG.error("Fail to load segment file {}.", segmentFile.getPath()); + segmentFile.shutdown(); + return false; + } + this.segments.add(segmentFile); + if (prevFile != null) { + prevFile.setLastLogIndex(firstLogIndex - 1); + } + prevFile = segmentFile; + } + + if (getLastLogIndex() > 0) { + prevFile.setLastLogIndex(getLastLogIndex()); + } + + } else { + if (checkpoint != null) { + LOG.warn("Missing segment files, checkpoint is: {}", checkpoint); + return false; + } + } + + LOG.info("{} Loaded {} segment files from path {}.", getServiceName(), this.segments.size(), + this.segmentsPath); + + LOG.info("{} segments: \n{}", getServiceName(), descSegments()); + + startCheckpointTask(); + + if (normalExit) { + if (!this.abortFile.create()) { + LOG.error("Fail to create abort file {}.", this.abortFile.getPath()); + return false; + } + } else { + this.abortFile.touch(); + } + return true; + } catch (final Exception e) { + LOG.error("Fail to load segment files from directory {}.", this.segmentsPath, e); + return false; + } finally { + this.writeLock.unlock(); + LOG.info("{} init and load cost {} ms.", getServiceName(), Utils.monotonicMs() - startMs); + } + } + + private String getCheckpointFileName(final Checkpoint checkpoint) { + return checkpoint != null ? SegmentFile.getSegmentFileName(checkpoint.firstLogIndex) : null; + } + + private void startCheckpointTask() { + this.checkpointExecutor = Executors + .newSingleThreadScheduledExecutor(new NamedThreadFactory(getServiceName() + "-Checkpoint-Thread-", true)); + this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint, + DEFAULT_CHECKPOINT_INTERVAL_MS, DEFAULT_CHECKPOINT_INTERVAL_MS, TimeUnit.MILLISECONDS); + LOG.info("{} started checkpoint task.", getServiceName()); + } + + private StringBuilder descSegments() { + final StringBuilder segmentsDesc = new StringBuilder("[\n"); + for (final SegmentFile segFile : this.segments) { + segmentsDesc.append(" ").append(segFile.toString()).append("\n"); + } + segmentsDesc.append("]"); + return segmentsDesc; + } + + private String getServiceName() { + return this.getClass().getSimpleName(); + } + + @Override + protected void onShutdown() { + stopCheckpointTask(); + List shutdownFiles = Collections.emptyList(); + this.writeLock.lock(); + try { + doCheckpoint(); + shutdownFiles = new ArrayList<>(shutdownFiles); + this.segments.clear(); + if (!this.abortFile.destroy()) { + LOG.error("Fail to delete abort file {}.", this.abortFile.getPath()); + } + } finally { + this.writeLock.unlock(); + for (final SegmentFile segmentFile : shutdownFiles) { + segmentFile.shutdown(); + } + } + } + + private void stopCheckpointTask() { + if (this.checkpointExecutor != null) { + this.checkpointExecutor.shutdownNow(); + try { + this.checkpointExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + LOG.info("{} stopped checkpoint task.", getServiceName()); + } + } + + private void doCheckpoint() { + SegmentFile lastSegmentFile = null; + try { + lastSegmentFile = getLastSegmentFileForRead(); + if (lastSegmentFile != null) { + this.checkpointFile.save(new Checkpoint(lastSegmentFile.getFirstLogIndex(), lastSegmentFile + .getCommittedPos())); + } + } catch (final IOException e) { + LOG.error("Fatal error, fail to do checkpoint, last segment file is {}.", + lastSegmentFile != null ? lastSegmentFile.getPath() : "null", e); + } + } + + private SegmentFile getLastSegmentFileForRead() throws IOException { + return getLastSegmentFile(-1, 0, false); + } + + @Override + protected void onReset(final long nextLogIndex) { + this.writeLock.lock(); + try { + this.checkpointFile.destroy(); + for (final SegmentFile segmentFile : this.segments) { + segmentFile.destroy(); + } + this.segments.clear(); + LOG.info("Destroyed segments and checkpoint in path {} by resetting.", this.segmentsPath); + } finally { + this.writeLock.unlock(); + } + } + + @Override + protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException, + IOException { + this.writeLock.lock(); + try { + int fromIndex = binarySearchFileIndexByLogIndex(startIndex); + final int toIndex = binarySearchFileIndexByLogIndex(firstIndexKept); + + if (fromIndex < 0) { + fromIndex = 0; + } + if (toIndex < 0) { + return; + } + + final List removedFiles = this.segments.subList(fromIndex, toIndex); + for (final SegmentFile segmentFile : removedFiles) { + segmentFile.destroy(); + } + removedFiles.clear(); + doCheckpoint(); + } finally { + this.writeLock.unlock(); + } + } + + private boolean isMetadata(final byte[] data) { + for (int offset = 0; offset < SegmentFile.MAGIC_BYTES_SIZE; offset++) { + if (data[offset] != SegmentFile.MAGIC_BYTES[offset]) { + return false; + } + } + return true; + } + + @Override + protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException { + this.writeLock.lock(); + try { + final int keptFileIndex = binarySearchFileIndexByLogIndex(lastIndexKept); + int toIndex = binarySearchFileIndexByLogIndex(getLastLogIndex()); + + if (keptFileIndex < 0) { + LOG.warn("Segment file not found by logIndex={} to be truncate_suffix, current segments:\n{}.", + lastIndexKept, descSegments()); + return; + } + + if (toIndex < 0) { + toIndex = this.segments.size() - 1; + } + + // Destroyed files after keptFile + final List removedFiles = this.segments.subList(keptFileIndex + 1, toIndex + 1); + for (final SegmentFile segmentFile : removedFiles) { + segmentFile.destroy(); + } + removedFiles.clear(); + + // Process logs in keptFile(firstLogIndex=lastIndexKept) + + final SegmentFile keptFile = this.segments.get(keptFileIndex); + int logWrotePos = -1; // The truncate position in keptFile. + + // Try to find the right position to be truncated. + { + // First, find in right [lastIndexKept + 1, getLastLogIndex()] + long nextIndex = lastIndexKept + 1; + final long endIndex = Math.min(getLastLogIndex(), keptFile.getLastLogIndex()); + while (nextIndex <= endIndex) { + final byte[] data = getValueFromRocksDB(getKeyBytes(nextIndex)); + if (data != null) { + if (data.length == LOCATION_METADATA_SIZE) { + if (!isMetadata(data)) { + // Stored in rocksdb directly. + nextIndex++; + continue; + } + logWrotePos = getWrotePosition(data); + break; + } else { + // Stored in rocksdb directly. + nextIndex++; + } + } else { + // No more data. + break; + } + } + } + + // Not found in [lastIndexKept + 1, getLastLogIndex()] + if (logWrotePos < 0) { + // Second, try to find in left [firstLogIndex, lastIndexKept) when lastIndexKept is not stored in segments. + final byte[] keptData = getValueFromRocksDB(getKeyBytes(lastIndexKept)); + // The kept log's data is not stored in segments. + if (!isMetadata(keptData)) { + //lastIndexKept's log is stored in rocksdb directly, try to find the first previous log that stored in segment. + long prevIndex = lastIndexKept - 1; + final long startIndex = keptFile.getFirstLogIndex(); + while (prevIndex >= startIndex) { + final byte[] data = getValueFromRocksDB(getKeyBytes(prevIndex)); + if (data != null) { + if (data.length == LOCATION_METADATA_SIZE) { + if (!isMetadata(data)) { + // Stored in rocksdb directly. + prevIndex--; + continue; + } + // Found the position. + logWrotePos = getWrotePosition(data); + final byte[] logData = onDataGet(prevIndex, data); + // Skip this log, it should be kept(logs that are less than lastIndexKept should be kept). + // Fine the next log position. + logWrotePos += SegmentFile.getWriteBytes(logData); + break; + } else { + // Stored in rocksdb directly. + prevIndex--; + } + } else { + // Data not found, should not happen. + throw new LogEntryCorruptedException("Log entry data not found at index=" + prevIndex); + } + } + } + } + + if (logWrotePos >= 0 && logWrotePos < keptFile.getSize()) { + // Truncate the file from wrotePos and set it's lastLogIndex=lastIndexKept. + keptFile.truncateSuffix(logWrotePos, lastIndexKept); + } + // Finally, do checkpoint. + doCheckpoint(); + + } finally { + this.writeLock.unlock(); + } + } + + /** + * Retrieve the log wrote position from metadata. + * + * @param data the metadata + * @return the log wrote position + */ + private int getWrotePosition(final byte[] data) { + return Bits.getInt(data, SegmentFile.MAGIC_BYTES_SIZE + 2 + 8); + } + + @Override + protected byte[] onDataAppend(final long logIndex, final byte[] value) throws IOException { + this.writeLock.lock(); + try { + final SegmentFile lastSegmentFile = getLastSegmentFile(logIndex, SegmentFile.getWriteBytes(value), true); + if (value.length < this.valueSizeThreshold) { + // Small value will be stored in rocksdb directly. + lastSegmentFile.setLastLogIndex(logIndex); + return value; + } + // Large value is stored in segment file and returns an encoded location info that will be stored in rocksdb. + final long firstLogIndex = lastSegmentFile.getFirstLogIndex(); + final int pos = lastSegmentFile.write(logIndex, value); + return encodeLocationMetadata(firstLogIndex, pos); + } finally { + this.writeLock.unlock(); + } + } + + /** + * Encode segment file firstLogIndex(fileName) and position to a byte array in the format of: + *
    + *
  • magic bytes(2 B)
  • + *
  • reserved (2 B)
  • + *
  • firstLogIndex(8 B)
  • + *
  • wrote position(4 B)
  • + *
+ * @param firstLogIndex the first log index + * @param pos the wrote position + * @return segment info + */ + private byte[] encodeLocationMetadata(final long firstLogIndex, final int pos) { + final byte[] newData = new byte[LOCATION_METADATA_SIZE]; + System.arraycopy(SegmentFile.MAGIC_BYTES, 0, newData, 0, SegmentFile.MAGIC_BYTES_SIZE); + // 2 bytes reserved + Bits.putLong(newData, SegmentFile.MAGIC_BYTES_SIZE + 2, firstLogIndex); + Bits.putInt(newData, SegmentFile.MAGIC_BYTES_SIZE + 2 + 8, pos); + return newData; + } + + private int binarySearchFileIndexByLogIndex(final long logIndex) { + this.readLock.lock(); + try { + if (this.segments.isEmpty()) { + return -1; + } + if (this.segments.size() == 1) { + final SegmentFile firstFile = this.segments.get(0); + if (firstFile.contains(logIndex)) { + return 0; + } else { + return -1; + } + } + + int low = 0; + int high = this.segments.size() - 1; + + while (low <= high) { + final int mid = (low + high) >>> 1; + + final SegmentFile file = this.segments.get(mid); + if (file.getLastLogIndex() < logIndex) { + low = mid + 1; + } else if (file.getFirstLogIndex() > logIndex) { + high = mid - 1; + } else { + return mid; + } + } + return -(low + 1); + } finally { + this.readLock.unlock(); + } + } + + private SegmentFile binarySearchFileByFirstLogIndex(final long logIndex) { + this.readLock.lock(); + try { + if (this.segments.isEmpty()) { + return null; + } + if (this.segments.size() == 1) { + final SegmentFile firstFile = this.segments.get(0); + if (firstFile.getFirstLogIndex() == logIndex) { + return firstFile; + } else { + return null; + } + } + + int low = 0; + int high = this.segments.size() - 1; + + while (low <= high) { + final int mid = (low + high) >>> 1; + + final SegmentFile file = this.segments.get(mid); + if (file.getFirstLogIndex() < logIndex) { + low = mid + 1; + } else if (file.getFirstLogIndex() > logIndex) { + high = mid - 1; + } else { + return file; + } + } + return null; + } finally { + this.readLock.unlock(); + } + } + + @Override + protected byte[] onDataGet(final long logIndex, final byte[] value) throws IOException { + if (value == null || value.length != LOCATION_METADATA_SIZE) { + return value; + } + + int offset = 0; + for (; offset < SegmentFile.MAGIC_BYTES_SIZE; offset++) { + if (value[offset] != SegmentFile.MAGIC_BYTES[offset]) { + return value; + } + } + + // skip reserved + offset += 2; + + final long firstLogIndex = Bits.getLong(value, offset); + final int pos = Bits.getInt(value, offset + 8); + final SegmentFile file = binarySearchFileByFirstLogIndex(firstLogIndex); + if (file == null) { + return null; + } + return file.read(logIndex, pos); + } +} diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java new file mode 100644 index 000000000..92ca7983e --- /dev/null +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alipay.sofa.jraft.Lifecycle; +import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Utils; + +/** + * A fixed size file. + * + * @author boyan(boyan@antfin.com) + * @since 1.2.6 + */ +public class SegmentFile implements Lifecycle { + + private static final int BLANK_HOLE_SIZE = 64; + + private static final Logger LOG = LoggerFactory.getLogger(SegmentFile.class); + + // 4 Bytes for written data length + private static final int DATA_LENGTH_SIZE = 4; + + /** + * Segment file options. + * + * @author boyan(boyan@antfin.com) + */ + public static class SegmentFileOptions { + // Whether to recover + public final boolean recover; + // Recover start position + public final int pos; + // True when is the last file. + public final boolean isLastFile; + + public SegmentFileOptions(final boolean recover, final boolean isLastFile, final int pos) { + super(); + this.isLastFile = isLastFile; + this.recover = recover; + this.pos = pos; + } + + @Override + public String toString() { + return "SegmentFileOptions [recover=" + recover + ", pos=" + pos + ", isLastFile=" + isLastFile + "]"; + } + } + + /** + * Magic bytes for data buffer. + */ + public static final byte[] MAGIC_BYTES = new byte[] { (byte) 0x57, (byte) 0x8A }; + + public static final int MAGIC_BYTES_SIZE = MAGIC_BYTES.length; + + // The file first log index(inclusive) + private final long firstLogIndex; + // The file last log index(inclusive) + private volatile long lastLogIndex = Long.MAX_VALUE; + // File size + private int size; + // File path + private final String path; + // mmap byte buffer. + private MappedByteBuffer buffer; + // Wrote position. + private volatile int wrotePos; + // Committed position + private volatile int committedPos; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(false); + + private final Lock writeLock = this.readWriteLock.writeLock(); + private final Lock readLock = this.readWriteLock.readLock(); + + public SegmentFile(final long firstLogIndex, final int size, final String parentDir) { + super(); + this.firstLogIndex = firstLogIndex; + this.size = size; + this.path = parentDir + File.separator + getSegmentFileName(this.firstLogIndex); + } + + static String getSegmentFileName(long logIndex) { + return String.format("%019d", logIndex); + } + + long getLastLogIndex() { + return this.lastLogIndex; + } + + int getWrotePos() { + return this.wrotePos; + } + + int getCommittedPos() { + return this.committedPos; + } + + long getFirstLogIndex() { + return this.firstLogIndex; + } + + int getSize() { + return this.size; + } + + String getPath() { + return this.path; + } + + public void setLastLogIndex(final long lastLogIndex) { + this.writeLock.lock(); + try { + this.lastLogIndex = lastLogIndex; + } finally { + this.writeLock.unlock(); + } + } + + /** + * Truncate data from wrotePos(inclusive) to the file end and set lastLogIndex=logIndex. + * @param wrotePos the wrote position(inclusive) + * @param logIndex the log index + */ + public void truncateSuffix(final int wrotePos, final long logIndex) { + this.writeLock.lock(); + try { + final int oldPos = this.wrotePos; + clear(wrotePos); + this.wrotePos = wrotePos; + this.lastLogIndex = logIndex; + this.buffer.position(wrotePos); + LOG.info( + "Segment file {} truncate suffix from pos={}, then set lastLogIndex={}, oldWrotePos={}, newWrotePos={}", + this.path, wrotePos, logIndex, oldPos, this.wrotePos); + } finally { + this.writeLock.unlock(); + } + } + + /** + * Returns true when the segment file contains the log index. + * + * @param logIndex the log index + * @return true if the segment file contains the log index, otherwise return false + */ + public boolean contains(final long logIndex) { + this.readLock.lock(); + try { + return logIndex >= this.firstLogIndex && logIndex <= this.lastLogIndex; + } finally { + this.readLock.unlock(); + } + } + + /** + * Clear data in [startPos, startPos+64). + * + * @param startPos the start position(inclusive) + */ + public void clear(final int startPos) { + this.writeLock.lock(); + try { + if (startPos < 0 || startPos > this.size) { + return; + } + final int endPos = Math.min(this.size, startPos + BLANK_HOLE_SIZE); + for (int i = startPos; i < endPos; i++) { + this.buffer.put(i, (byte) 0); + } + fsync(); + LOG.info("Segment file {} cleared data in [{}, {}).", this.path, startPos, endPos); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public boolean init(final SegmentFileOptions opts) { + this.writeLock.lock(); + if (this.buffer != null) { + this.writeLock.unlock(); + LOG.warn("Segment file {} already initialized, the status: {}.", this.path, toString()); + return true; + } + + final File file = new File(this.path); + + if (file.exists()) { + this.size = (int) file.length(); + } + + try (FileChannel fc = openFileChannel(opts)) { + if (opts.isLastFile) { + this.buffer = fc.map(MapMode.READ_WRITE, 0, this.size); + } else { + this.buffer = fc.map(MapMode.READ_ONLY, 0, this.size); + } + + this.buffer.limit(this.size); + if (opts.recover) { + if (!recover(opts)) { + return false; + } + } else { + this.wrotePos = opts.pos; + this.buffer.position(this.wrotePos); + } + assert (this.wrotePos == this.buffer.position()); + this.committedPos = this.wrotePos; + LOG.info("Loaded segment file {}, wrotePosition={}, bufferPosition={}, mappedSize={}.", this.path, + this.wrotePos, this.buffer.position(), this.size); + return true; + } catch (final IOException e) { + LOG.error("Fail to init segment file {}.", this.path, e); + return false; + } finally { + this.writeLock.unlock(); + } + + } + + private FileChannel openFileChannel(final SegmentFileOptions opts) throws IOException { + if (opts.isLastFile) { + return FileChannel.open(Paths.get(this.path), StandardOpenOption.CREATE, StandardOpenOption.READ, + StandardOpenOption.WRITE); + } else { + return FileChannel.open(Paths.get(this.path), StandardOpenOption.READ); + } + } + + @SuppressWarnings("NonAtomicOperationOnVolatileField") + private boolean recover(final SegmentFileOptions opts) throws IOException { + LOG.info("Start to recover segment file {} from position {}.", this.path, opts.pos); + this.wrotePos = opts.pos; + this.buffer.position(this.wrotePos); + final long start = Utils.monotonicMs(); + while (this.wrotePos < this.size) { + if (this.buffer.remaining() < MAGIC_BYTES_SIZE) { + LOG.error("Fail to recover segment file {}, missing magic bytes.", this.path); + return false; + } + final byte[] magicBytes = new byte[MAGIC_BYTES_SIZE]; + this.buffer.get(magicBytes); + + if (!Arrays.equals(MAGIC_BYTES, magicBytes)) { + + boolean truncateDirty = false; + + int i = 0; + for (final byte b : magicBytes) { + i++; + if (b != 0) { + if (opts.isLastFile) { + // It's the last file + // Truncate the dirty data from wrotePos + LOG.error("Corrupted magic bytes in segment file {} at pos={}, will truncate it.", + this.path, this.wrotePos + i); + truncateDirty = true; + break; + } else { + // It's not the last file, but has invalid magic bytes, the data is corrupted. + LOG.error("Fail to recover segment file {}, invalid magic bytes: {} at pos={}.", this.path, + BytesUtil.toHex(magicBytes), this.wrotePos); + return false; + } + } + } + + if (truncateDirty) { + truncateFile(); + } else { + // Reach blank hole, rewind position. + this.buffer.position(this.buffer.position() - MAGIC_BYTES_SIZE); + } + // Reach end or dirty magic bytes, we should break out. + break; + } + + if (this.buffer.remaining() < DATA_LENGTH_SIZE) { + LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path, + this.buffer.position()); + truncateFile(); + break; + } + + final int dataLen = this.buffer.getInt(); + if (this.buffer.remaining() < dataLen) { + LOG.error( + "Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.", + this.path, this.buffer.position(), dataLen, this.buffer.remaining()); + truncateFile(); + break; + } + // Skip data + this.buffer.position(this.buffer.position() + dataLen); + this.wrotePos += MAGIC_BYTES_SIZE + DATA_LENGTH_SIZE + dataLen; + } + LOG.info("Recover segment file {} cost {} millis.", this.path, Utils.monotonicMs() - start); + return true; + } + + private void truncateFile() throws IOException { + // Truncate dirty data. + clear(this.wrotePos); + this.buffer.position(this.wrotePos); + LOG.warn("Truncated segment file {} from pos={}.", this.path, this.wrotePos); + } + + public boolean reachesFileEndBy(final long waitToWroteBytes) { + this.readLock.lock(); + try { + return this.wrotePos + waitToWroteBytes > this.size; + } finally { + this.readLock.unlock(); + } + } + + public boolean isFull() { + return reachesFileEndBy(1); + } + + static int getWriteBytes(final byte[] data) { + return MAGIC_BYTES_SIZE + DATA_LENGTH_SIZE + data.length; + } + + /** + * Write the data and return it's wrote position. + * + * @param logIndex the log index + * @param data data to write + * @return the wrote position + */ + @SuppressWarnings("NonAtomicOperationOnVolatileField") + public int write(final long logIndex, final byte[] data) { + this.writeLock.lock(); + try { + assert (this.wrotePos == this.buffer.position()); + final int pos = this.wrotePos; + + this.buffer.put(MAGIC_BYTES); + this.buffer.putInt(data.length); + this.buffer.put(data); + this.wrotePos += MAGIC_BYTES_SIZE + DATA_LENGTH_SIZE + data.length; + // Update last log index. + this.lastLogIndex = logIndex; + return pos; + } finally { + this.writeLock.unlock(); + } + } + + /** + * Read data from the position. + * + * @param logIndex the log index + * @param pos the position to read + * @return read data + */ + public byte[] read(final long logIndex, final int pos) throws IOException { + this.readLock.lock(); + try { + if (logIndex < this.firstLogIndex || logIndex > this.lastLogIndex) { + LOG.warn( + "Try to read data from segment file {} out of range, logIndex={}, readPos={}, firstLogIndex={}, lastLogIndex={}.", + this.path, logIndex, pos, this.firstLogIndex, this.lastLogIndex); + return null; + } + if (pos >= this.committedPos) { + LOG.warn( + "Try to read data from segment file {} out of comitted position, logIndex={}, readPos={}, wrotePos={}, this.committedPos={}.", + this.path, logIndex, pos, this.wrotePos, this.committedPos); + return null; + } + final ByteBuffer readBuffer = this.buffer.asReadOnlyBuffer(); + readBuffer.position(pos); + if (readBuffer.remaining() < MAGIC_BYTES_SIZE) { + throw new IOException("Missing magic buffer."); + } + readBuffer.position(pos + MAGIC_BYTES_SIZE); + final int dataLen = readBuffer.getInt(); + final byte[] data = new byte[dataLen]; + readBuffer.get(data); + return data; + } finally { + this.readLock.unlock(); + } + } + + /** + * Forces any changes made to this segment file's content to be written to the + * storage device containing the mapped file. + */ + public void sync() throws IOException { + if (this.committedPos >= this.wrotePos) { + return; + } + this.writeLock.lock(); + try { + // double check + if (this.committedPos >= this.wrotePos) { + return; + } + fsync(); + this.committedPos = this.wrotePos; + LOG.debug("Commit segment file {} at pos {}.", this.path, this.committedPos); + } finally { + this.writeLock.unlock(); + } + } + + private void fsync() { + if (this.buffer != null) { + this.buffer.force(); + } + } + + /** + * Destroy the file. + */ + public void destroy() { + this.writeLock.lock(); + try { + shutdown(); + FileUtils.deleteQuietly(new File(this.path)); + LOG.info("Deleted segment file {}.", this.path); + } finally { + this.writeLock.unlock(); + } + } + + // See https://stackoverflow.com/questions/2972986/how-to-unmap-a-file-from-memory-mapped-using-filechannel-in-java + // TODO move into utils + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static void closeDirectBuffer(final MappedByteBuffer cb) { + // JavaSpecVer: 1.6, 1.7, 1.8, 9, 10 + final boolean isOldJDK = System.getProperty("java.specification.version", "99").startsWith("1."); + try { + if (isOldJDK) { + final Method cleaner = cb.getClass().getMethod("cleaner"); + cleaner.setAccessible(true); + final Method clean = Class.forName("sun.misc.Cleaner").getMethod("clean"); + clean.setAccessible(true); + clean.invoke(cleaner.invoke(cb)); + } else { + Class unsafeClass; + try { + unsafeClass = Class.forName("sun.misc.Unsafe"); + } catch (final Exception ex) { + // jdk.internal.misc.Unsafe doesn't yet have an invokeCleaner() method, + // but that method should be added if sun.misc.Unsafe is removed. + unsafeClass = Class.forName("jdk.internal.misc.Unsafe"); + } + final Method clean = unsafeClass.getMethod("invokeCleaner", ByteBuffer.class); + clean.setAccessible(true); + final Field theUnsafeField = unsafeClass.getDeclaredField("theUnsafe"); + theUnsafeField.setAccessible(true); + final Object theUnsafe = theUnsafeField.get(null); + clean.invoke(theUnsafe, cb); + } + } catch (final Exception ex) { + LOG.error("Fail to un-mapped segment file.", ex); + } + } + + @Override + public void shutdown() { + this.writeLock.lock(); + try { + if (this.buffer == null) { + return; + } + closeDirectBuffer(this.buffer); + this.buffer = null; + LOG.info("Unloaded segment file {}, current status: {}.", this.path, toString()); + } finally { + this.writeLock.unlock(); + } + } + + @Override + public String toString() { + return "SegmentFile [firstLogIndex=" + this.firstLogIndex + ", lastLogIndex=" + this.lastLogIndex + ", size=" + + this.size + ", path=" + this.path + ", wrotePos=" + this.wrotePos + ", committedPos=" + + this.committedPos + "]"; + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 6325ac00c..e0bd4b1ad 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -16,6 +16,16 @@ */ package com.alipay.sofa.jraft.core; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -65,16 +75,6 @@ import com.alipay.sofa.jraft.util.Utils; import com.codahale.metrics.ConsoleReporter; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class NodeTest { static final Logger LOG = LoggerFactory.getLogger(NodeTest.class); @@ -127,7 +127,7 @@ public void testNodeTaskOverload() throws Exception { NodeManager.getInstance().addAddress(addr); final NodeOptions nodeOptions = new NodeOptions(); - RaftOptions raftOptions = new RaftOptions(); + final RaftOptions raftOptions = new RaftOptions(); raftOptions.setDisruptorBufferSize(2); nodeOptions.setRaftOptions(raftOptions); final MockStateMachine fsm = new MockStateMachine(addr); @@ -147,7 +147,7 @@ public void testNodeTaskOverload() throws Exception { } final CountDownLatch latch = new CountDownLatch(10); - AtomicInteger c = new AtomicInteger(0); + final AtomicInteger c = new AtomicInteger(0); for (int i = 0; i < 10; i++) { final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); final Task task = new Task(data, status -> { @@ -495,7 +495,7 @@ public void testChecksum() throws Exception { // start with checksum validation { final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); - RaftOptions raftOptions = new RaftOptions(); + final RaftOptions raftOptions = new RaftOptions(); raftOptions.setEnableLogEntryChecksum(true); for (final PeerId peer : peers) { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); @@ -537,7 +537,7 @@ public void testChecksum() throws Exception { // restart with no checksum validation { final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); - RaftOptions raftOptions = new RaftOptions(); + final RaftOptions raftOptions = new RaftOptions(); raftOptions.setEnableLogEntryChecksum(false); for (final PeerId peer : peers) { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); @@ -556,7 +556,7 @@ public void testChecksum() throws Exception { // restart with all peers enable checksum validation { final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); - RaftOptions raftOptions = new RaftOptions(); + final RaftOptions raftOptions = new RaftOptions(); raftOptions.setEnableLogEntryChecksum(true); for (final PeerId peer : peers) { assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); @@ -1243,6 +1243,8 @@ public void testInstallSnapshotWithThrottle() throws Exception { final Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint(); assertTrue(cluster.stop(followerAddr)); + cluster.waitLeader(); + // apply something more this.sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); @@ -2372,7 +2374,7 @@ public void testChangePeersChaosWithSnapshot() throws Exception { final List peers = new ArrayList<>(); peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT)); final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers, 1000); - assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 1)); + assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 2)); // start other peers for (int i = 1; i < 10; i++) { final PeerId peer = new PeerId("127.0.0.1", TestUtils.INIT_PORT + i); @@ -2394,7 +2396,9 @@ public void testChangePeersChaosWithSnapshot() throws Exception { leader.apply(task); final Status status = done.await(); if (status.isOk()) { - LOG.info("Progress:" + (++i)); + if (++i % 100 == 0) { + System.out.println("Progress:" + i); + } } else { assertEquals(RaftError.EPERM, status.getRaftError()); } @@ -2431,7 +2435,8 @@ public void testChangePeersChaosWithoutSnapshot() throws Exception { final ChangeArg arg = new ChangeArg(cluster, peers, false, true); final Future future = startChangePeersThread(arg); - for (int i = 0; i < 10000;) { + final int tasks = 5000; + for (int i = 0; i < tasks;) { cluster.waitLeader(); final Node leader = cluster.getLeader(); if (leader == null) { @@ -2442,7 +2447,9 @@ public void testChangePeersChaosWithoutSnapshot() throws Exception { leader.apply(task); final Status status = done.await(); if (status.isOk()) { - LOG.info("Progress:" + (++i)); + if (++i % 100 == 0) { + System.out.println("Progress:" + i); + } } else { assertEquals(RaftError.EPERM, status.getRaftError()); } @@ -2457,8 +2464,8 @@ public void testChangePeersChaosWithoutSnapshot() throws Exception { cluster.ensureSame(); assertEquals(10, cluster.getFsms().size()); for (final MockStateMachine fsm : cluster.getFsms()) { - assertTrue(fsm.getLogs().size() >= 10000); - assertTrue(fsm.getLogs().size() - 10000 < 100); + assertTrue(fsm.getLogs().size() >= tasks); + assertTrue(fsm.getLogs().size() - tasks < 100); } cluster.stopAll(); } @@ -2499,7 +2506,9 @@ public void testChangePeersChaosApplyTasks() throws Exception { leader.apply(task); final Status status = done.await(); if (status.isOk()) { - LOG.info("Progress:" + (++i)); + if (++i % 100 == 0) { + System.out.println("Progress:" + i); + } } else { assertEquals(RaftError.EPERM, status.getRaftError()); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java index b13ac3df8..542fb050b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestCluster.java @@ -50,7 +50,7 @@ */ public class TestCluster { private final String dataPath; - private final String name; // groupId + private final String name; // groupId private final List peers; private final List nodes; private final List fsms; @@ -58,7 +58,7 @@ public class TestCluster { private final int electionTimeoutMs; private final Lock lock = new ReentrantLock(); - private JRaftServiceFactory raftServiceFactory = new DefaultJRaftServiceFactory(); + private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory(); public JRaftServiceFactory getRaftServiceFactory() { return this.raftServiceFactory; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestJRaftServiceFactory.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestJRaftServiceFactory.java new file mode 100644 index 000000000..731b1e1e7 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/TestJRaftServiceFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.core; + +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.storage.LogStorage; +import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage; + +public class TestJRaftServiceFactory extends DefaultJRaftServiceFactory { + + @Override + public LogStorage createLogStorage(String uri, RaftOptions raftOptions) { + //Force the data to be stored in segments. + return new RocksDBSegmentLogStorage(uri, raftOptions, 0); + } + +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java new file mode 100644 index 000000000..26e550637 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.JRaftUtils; +import com.alipay.sofa.jraft.conf.ConfigurationEntry; +import com.alipay.sofa.jraft.conf.ConfigurationManager; +import com.alipay.sofa.jraft.entity.EnumOutter; +import com.alipay.sofa.jraft.entity.LogEntry; +import com.alipay.sofa.jraft.entity.LogId; +import com.alipay.sofa.jraft.entity.codec.LogEntryCodecFactory; +import com.alipay.sofa.jraft.entity.codec.v2.LogEntryV2CodecFactory; +import com.alipay.sofa.jraft.option.LogStorageOptions; +import com.alipay.sofa.jraft.storage.BaseStorageTest; +import com.alipay.sofa.jraft.storage.LogStorage; +import com.alipay.sofa.jraft.test.TestUtils; +import com.alipay.sofa.jraft.util.Utils; + +public abstract class BaseLogStorageTest extends BaseStorageTest { + private LogStorage logStorage; + private ConfigurationManager confManager; + private LogEntryCodecFactory logEntryCodecFactory; + + @Override + @Before + public void setup() throws Exception { + super.setup(); + this.confManager = new ConfigurationManager(); + this.logEntryCodecFactory = LogEntryV2CodecFactory.getInstance(); + this.logStorage = newLogStorage(); + + final LogStorageOptions opts = newLogStorageOptions(); + + this.logStorage.init(opts); + } + + protected abstract LogStorage newLogStorage(); + + private LogStorageOptions newLogStorageOptions() { + final LogStorageOptions opts = new LogStorageOptions(); + opts.setConfigurationManager(this.confManager); + opts.setLogEntryCodecFactory(this.logEntryCodecFactory); + return opts; + } + + @Override + @After + public void teardown() throws Exception { + this.logStorage.shutdown(); + super.teardown(); + } + + @Test + public void testEmptyState() { + assertEquals(1, this.logStorage.getFirstLogIndex()); + assertEquals(0, this.logStorage.getLastLogIndex()); + assertNull(this.logStorage.getEntry(100)); + assertEquals(0, this.logStorage.getTerm(100)); + } + + @Test + public void testAddOneEntryState() { + final LogEntry entry1 = TestUtils.mockEntry(100, 1); + assertTrue(this.logStorage.appendEntry(entry1)); + + assertEquals(100, this.logStorage.getFirstLogIndex()); + assertEquals(100, this.logStorage.getLastLogIndex()); + Assert.assertEquals(entry1, this.logStorage.getEntry(100)); + assertEquals(1, this.logStorage.getTerm(100)); + + final LogEntry entry2 = TestUtils.mockEntry(200, 2); + assertTrue(this.logStorage.appendEntry(entry2)); + + assertEquals(100, this.logStorage.getFirstLogIndex()); + assertEquals(200, this.logStorage.getLastLogIndex()); + Assert.assertEquals(entry1, this.logStorage.getEntry(100)); + Assert.assertEquals(entry2, this.logStorage.getEntry(200)); + assertEquals(1, this.logStorage.getTerm(100)); + assertEquals(2, this.logStorage.getTerm(200)); + } + + @Test + public void testLoadWithConfigManager() { + assertTrue(this.confManager.getLastConfiguration().isEmpty()); + + final LogEntry confEntry1 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); + confEntry1.setId(new LogId(99, 1)); + confEntry1.setPeers(JRaftUtils.getConfiguration("localhost:8081,localhost:8082").listPeers()); + + final LogEntry confEntry2 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); + confEntry2.setId(new LogId(100, 2)); + confEntry2.setPeers(JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083").listPeers()); + + assertTrue(this.logStorage.appendEntry(confEntry1)); + assertEquals(1, this.logStorage.appendEntries(Arrays.asList(confEntry2))); + + // reload log storage. + this.logStorage.shutdown(); + this.logStorage = newLogStorage(); + this.logStorage.init(newLogStorageOptions()); + + ConfigurationEntry conf = this.confManager.getLastConfiguration(); + assertNotNull(conf); + assertFalse(conf.isEmpty()); + assertEquals("localhost:8081,localhost:8082,localhost:8083", conf.getConf().toString()); + conf = this.confManager.get(99); + assertNotNull(conf); + assertFalse(conf.isEmpty()); + assertEquals("localhost:8081,localhost:8082", conf.getConf().toString()); + } + + @Test + public void testAddManyEntries() { + final List entries = TestUtils.mockEntries(); + + assertEquals(10, this.logStorage.appendEntries(entries)); + + assertEquals(0, this.logStorage.getFirstLogIndex()); + assertEquals(9, this.logStorage.getLastLogIndex()); + for (int i = 0; i < 10; i++) { + assertEquals(i, this.logStorage.getTerm(i)); + final LogEntry entry = this.logStorage.getEntry(i); + assertNotNull(entry); + assertEquals(entries.get(i), entry); + } + } + + @Test + public void testReset() { + testAddManyEntries(); + this.logStorage.reset(5); + assertEquals(5, this.logStorage.getFirstLogIndex()); + assertEquals(5, this.logStorage.getLastLogIndex()); + assertEquals(5, this.logStorage.getTerm(5)); + } + + @Test + public void testTruncatePrefix() { + final List entries = TestUtils.mockEntries(); + + assertEquals(10, this.logStorage.appendEntries(entries)); + this.logStorage.truncatePrefix(5); + assertEquals(5, this.logStorage.getFirstLogIndex()); + assertEquals(9, this.logStorage.getLastLogIndex()); + for (int i = 0; i < 10; i++) { + if (i < 5) { + assertNull(this.logStorage.getEntry(i)); + } else { + Assert.assertEquals(entries.get(i), this.logStorage.getEntry(i)); + } + } + } + + @Test + public void testAppendMantyLargeEntries() { + + appendLargeEntries(10000, 1024, 10); + + final long start = Utils.monotonicMs(); + final int totalLogs = 100000; + final int logSize = 16 * 1024; + final int batch = 100; + + appendLargeEntries(totalLogs, logSize, batch); + + System.out.println("Inserted " + totalLogs + " large logs, cost " + (Utils.monotonicMs() - start) + " ms."); + + for (int i = 0; i < totalLogs; i++) { + final LogEntry log = this.logStorage.getEntry(i); + assertNotNull(log); + assertEquals(i, log.getId().getIndex()); + assertEquals(i, log.getId().getTerm()); + assertEquals(logSize, log.getData().remaining()); + } + + this.logStorage.shutdown(); + this.logStorage.init(newLogStorageOptions()); + + for (int i = 0; i < totalLogs; i++) { + final LogEntry log = this.logStorage.getEntry(i); + assertNotNull(log); + assertEquals(i, log.getId().getIndex()); + assertEquals(i, log.getId().getTerm()); + assertEquals(logSize, log.getData().remaining()); + } + } + + private void appendLargeEntries(final int totalLogs, final int logSize, final int batch) { + for (int i = 0; i < totalLogs; i += batch) { + final List entries = new ArrayList<>(batch); + for (int j = i; j < i + batch; j++) { + entries.add(TestUtils.mockEntry(j, j, logSize)); + } + assertEquals(batch, this.logStorage.appendEntries(entries)); + } + } + + @Test + public void testTruncateSuffix() { + final List entries = TestUtils.mockEntries(); + + assertEquals(10, this.logStorage.appendEntries(entries)); + this.logStorage.truncateSuffix(5); + assertEquals(0, this.logStorage.getFirstLogIndex()); + assertEquals(5, this.logStorage.getLastLogIndex()); + for (int i = 0; i < 10; i++) { + if (i <= 5) { + Assert.assertEquals(entries.get(i), this.logStorage.getEntry(i)); + } else { + assertNull(this.logStorage.getEntry(i)); + } + } + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogStorageBenchmark.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogStorageBenchmark.java new file mode 100644 index 000000000..8ed686072 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/LogStorageBenchmark.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.impl; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import com.alipay.sofa.jraft.conf.ConfigurationManager; +import com.alipay.sofa.jraft.entity.LogEntry; +import com.alipay.sofa.jraft.entity.codec.v2.LogEntryV2CodecFactory; +import com.alipay.sofa.jraft.option.LogStorageOptions; +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.storage.LogStorage; +import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage; +import com.alipay.sofa.jraft.test.TestUtils; +import com.alipay.sofa.jraft.util.SystemPropertyUtil; +import com.alipay.sofa.jraft.util.Utils; + +public class LogStorageBenchmark { + + private final LogStorage logStorage; + + private final int logSize; + + private final int totalLogs; + + private final int batchSize; + + public LogStorageBenchmark(final LogStorage logStorage, final int logSize, final int totalLogs, final int batchSize) { + super(); + this.logStorage = logStorage; + this.logSize = logSize; + this.totalLogs = totalLogs; + this.batchSize = batchSize; + } + + private void write(final int batchSize, final int logSize, final int totalLogs) { + List entries = new ArrayList<>(batchSize); + for (int i = 0; i < totalLogs; i += batchSize) { + for (int j = i; j < i + batchSize; j++) { + entries.add(TestUtils.mockEntry(j, j, logSize)); + } + int ret = this.logStorage.appendEntries(entries); + if (ret != batchSize) { + System.err.println("Fatal error: write failures, expect " + batchSize + ", but was " + ret); + System.exit(1); + } + entries.clear(); //reuse it + } + } + + private static void assertNotNull(final Object obj) { + if (obj == null) { + System.err.println("Null object"); + System.exit(1); + } + } + + private static void assertEquals(final long x, final long y) { + if (x != y) { + System.err.println("Expect " + x + " but was " + y); + System.exit(1); + } + } + + private void read(final int logSize, final int totalLogs) { + for (int i = 0; i < totalLogs; i++) { + LogEntry log = this.logStorage.getEntry(i); + assertNotNull(log); + assertEquals(i, log.getId().getIndex()); + assertEquals(i, log.getId().getTerm()); + assertEquals(logSize, log.getData().remaining()); + } + } + + private void report(final String op, final long cost) { + System.out.println("Test " + op + ":"); + System.out.println(" Log number :" + this.totalLogs); + System.out.println(" Log Size :" + this.logSize); + System.out.println(" Batch Size :" + this.batchSize); + System.out.println(" Cost time(s) :" + cost / 1000); + System.out.println(" Total size :" + (long) this.totalLogs * this.logSize); + } + + private void doTest() { + System.out.println("Begin test..."); + { + System.out.println("Warm up..."); + write(10, 64, 10000); + read(64, 10000); + } + + System.out.println("Start test..."); + { + long start = Utils.monotonicMs(); + write(this.batchSize, this.logSize, this.totalLogs); + long cost = Utils.monotonicMs() - start; + report("write", cost); + } + + { + long start = Utils.monotonicMs(); + read(this.logSize, this.totalLogs); + long cost = Utils.monotonicMs() - start; + report("read", cost); + } + System.out.println("Test done!"); + } + + public static void main(final String[] args) { + String testPath = Paths.get(SystemPropertyUtil.get("user.dir"), "log_storage").toString(); + int batchSize = 100; + int logSize = 16 * 1024; + int totalLogs = 1024 * 1024; + + // LogStorage logStorage = new RocksDBLogStorage(testPath, new RaftOptions()); + LogStorage logStorage = new RocksDBSegmentLogStorage(testPath, new RaftOptions()); + + LogStorageOptions opts = new LogStorageOptions(); + opts.setConfigurationManager(new ConfigurationManager()); + opts.setLogEntryCodecFactory(LogEntryV2CodecFactory.getInstance()); + logStorage.init(opts); + + new LogStorageBenchmark(logStorage, logSize, totalLogs, batchSize).doTest(); + } + +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java index d162697e0..ceb243643 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBLogStorageTest.java @@ -16,181 +16,14 @@ */ package com.alipay.sofa.jraft.storage.impl; -import java.util.Arrays; -import java.util.List; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.conf.ConfigurationEntry; -import com.alipay.sofa.jraft.conf.ConfigurationManager; -import com.alipay.sofa.jraft.entity.EnumOutter; -import com.alipay.sofa.jraft.entity.LogEntry; -import com.alipay.sofa.jraft.entity.LogId; -import com.alipay.sofa.jraft.entity.codec.LogEntryCodecFactory; -import com.alipay.sofa.jraft.entity.codec.v2.LogEntryV2CodecFactory; -import com.alipay.sofa.jraft.option.LogStorageOptions; import com.alipay.sofa.jraft.option.RaftOptions; -import com.alipay.sofa.jraft.storage.BaseStorageTest; import com.alipay.sofa.jraft.storage.LogStorage; -import com.alipay.sofa.jraft.test.TestUtils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class RocksDBLogStorageTest extends BaseStorageTest { - private LogStorage logStorage; - private ConfigurationManager confManager; - private LogEntryCodecFactory logEntryCodecFactory; - - @Override - @Before - public void setup() throws Exception { - super.setup(); - this.confManager = new ConfigurationManager(); - this.logEntryCodecFactory = LogEntryV2CodecFactory.getInstance(); - this.logStorage = new RocksDBLogStorage(this.path, new RaftOptions()); - - LogStorageOptions opts = newLogStorageOptions(); - - this.logStorage.init(opts); - } - private LogStorageOptions newLogStorageOptions() { - LogStorageOptions opts = new LogStorageOptions(); - opts.setConfigurationManager(this.confManager); - opts.setLogEntryCodecFactory(this.logEntryCodecFactory); - return opts; - } +public class RocksDBLogStorageTest extends BaseLogStorageTest { @Override - @After - public void teardown() throws Exception { - this.logStorage.shutdown(); - super.teardown(); - } - - @Test - public void testEmptyState() { - assertEquals(1, this.logStorage.getFirstLogIndex()); - assertEquals(0, this.logStorage.getLastLogIndex()); - assertNull(this.logStorage.getEntry(100)); - assertEquals(0, this.logStorage.getTerm(100)); + protected LogStorage newLogStorage() { + return new RocksDBLogStorage(this.path, new RaftOptions()); } - @Test - public void testAddOneEntryState() { - LogEntry entry1 = TestUtils.mockEntry(100, 1); - assertTrue(this.logStorage.appendEntry(entry1)); - - assertEquals(100, this.logStorage.getFirstLogIndex()); - assertEquals(100, this.logStorage.getLastLogIndex()); - Assert.assertEquals(entry1, this.logStorage.getEntry(100)); - assertEquals(1, this.logStorage.getTerm(100)); - - LogEntry entry2 = TestUtils.mockEntry(200, 2); - assertTrue(this.logStorage.appendEntry(entry2)); - - assertEquals(100, this.logStorage.getFirstLogIndex()); - assertEquals(200, this.logStorage.getLastLogIndex()); - Assert.assertEquals(entry1, this.logStorage.getEntry(100)); - Assert.assertEquals(entry2, this.logStorage.getEntry(200)); - assertEquals(1, this.logStorage.getTerm(100)); - assertEquals(2, this.logStorage.getTerm(200)); - } - - @Test - public void testLoadWithConfigManager() { - assertTrue(this.confManager.getLastConfiguration().isEmpty()); - - LogEntry confEntry1 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); - confEntry1.setId(new LogId(99, 1)); - confEntry1.setPeers(JRaftUtils.getConfiguration("localhost:8081,localhost:8082").listPeers()); - - LogEntry confEntry2 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION); - confEntry2.setId(new LogId(100, 2)); - confEntry2.setPeers(JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083").listPeers()); - - assertTrue(this.logStorage.appendEntry(confEntry1)); - assertEquals(1, this.logStorage.appendEntries(Arrays.asList(confEntry2))); - - //reload log storage. - this.logStorage.shutdown(); - this.logStorage = new RocksDBLogStorage(this.path, new RaftOptions()); - this.logStorage.init(newLogStorageOptions()); - - ConfigurationEntry conf = this.confManager.getLastConfiguration(); - assertNotNull(conf); - assertFalse(conf.isEmpty()); - assertEquals("localhost:8081,localhost:8082,localhost:8083", conf.getConf().toString()); - conf = this.confManager.get(99); - assertNotNull(conf); - assertFalse(conf.isEmpty()); - assertEquals("localhost:8081,localhost:8082", conf.getConf().toString()); - } - - @Test - public void testAddManyEntries() { - List entries = TestUtils.mockEntries(); - - assertEquals(10, this.logStorage.appendEntries(entries)); - - assertEquals(0, this.logStorage.getFirstLogIndex()); - assertEquals(9, this.logStorage.getLastLogIndex()); - for (int i = 0; i < 10; i++) { - assertEquals(i, this.logStorage.getTerm(i)); - LogEntry entry = this.logStorage.getEntry(i); - assertNotNull(entry); - assertEquals(entries.get(i), entry); - } - } - - @Test - public void testReset() { - testAddManyEntries(); - this.logStorage.reset(5); - assertEquals(5, this.logStorage.getFirstLogIndex()); - assertEquals(5, this.logStorage.getLastLogIndex()); - assertEquals(5, this.logStorage.getTerm(5)); - } - - @Test - public void testTruncatePrefix() { - List entries = TestUtils.mockEntries(); - - assertEquals(10, this.logStorage.appendEntries(entries)); - this.logStorage.truncatePrefix(5); - assertEquals(5, this.logStorage.getFirstLogIndex()); - assertEquals(9, this.logStorage.getLastLogIndex()); - for (int i = 0; i < 10; i++) { - if (i < 5) { - assertNull(this.logStorage.getEntry(i)); - } else { - Assert.assertEquals(entries.get(i), this.logStorage.getEntry(i)); - } - } - } - - @Test - public void testTruncateSuffix() { - List entries = TestUtils.mockEntries(); - - assertEquals(10, this.logStorage.appendEntries(entries)); - this.logStorage.truncateSuffix(5); - assertEquals(0, this.logStorage.getFirstLogIndex()); - assertEquals(5, this.logStorage.getLastLogIndex()); - for (int i = 0; i < 10; i++) { - if (i <= 5) { - Assert.assertEquals(entries.get(i), this.logStorage.getEntry(i)); - } else { - assertNull(this.logStorage.getEntry(i)); - } - } - } } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java new file mode 100644 index 000000000..ca12e2d30 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.impl; + +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.storage.LogStorage; +import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage; + +public class RocksDBSegmentLogStorageTest extends BaseLogStorageTest { + + @Override + protected LogStorage newLogStorage() { + return new RocksDBSegmentLogStorage(this.path, new RaftOptions()); + } + +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java index 8458d3a3a..d78452a21 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/io/LocalFileReaderTest.java @@ -16,6 +16,9 @@ */ package com.alipay.sofa.jraft.storage.io; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import java.io.File; import java.io.FileNotFoundException; import java.nio.ByteBuffer; @@ -27,9 +30,6 @@ import com.alipay.sofa.jraft.storage.BaseStorageTest; import com.alipay.sofa.jraft.util.ByteBufferCollector; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - public class LocalFileReaderTest extends BaseStorageTest { private LocalDirReader fileReader; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/AbortFileTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/AbortFileTest.java new file mode 100644 index 000000000..016873808 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/AbortFileTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.storage.BaseStorageTest; + +public class AbortFileTest extends BaseStorageTest { + private AbortFile abortFile; + + @Override + @Before + public void setup() throws Exception { + super.setup(); + this.abortFile = new AbortFile(this.path + File.separator + "abort"); + } + + @Test + public void testMisc() throws Exception { + assertFalse(this.abortFile.exists()); + assertTrue(this.abortFile.create()); + assertTrue(this.abortFile.exists()); + assertFalse(this.abortFile.create()); + this.abortFile.destroy(); + assertFalse(this.abortFile.exists()); + assertTrue(this.abortFile.create()); + assertTrue(this.abortFile.exists()); + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/CheckpointFileTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/CheckpointFileTest.java new file mode 100644 index 000000000..daf170200 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/CheckpointFileTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; + +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.storage.BaseStorageTest; + +public class CheckpointFileTest extends BaseStorageTest { + private CheckpointFile checkpointFile; + + @Override + @Before + public void setup() throws Exception { + super.setup(); + this.checkpointFile = new CheckpointFile(this.path + File.separator + "checkpoint"); + } + + @Test + public void testMisc() throws Exception { + assertNull(this.checkpointFile.load()); + + this.checkpointFile.save(new CheckpointFile.Checkpoint(1, 99)); + CheckpointFile.Checkpoint cp = this.checkpointFile.load(); + assertNotNull(cp); + assertEquals(1, cp.firstLogIndex); + assertEquals(99, cp.committedPos); + + this.checkpointFile.destroy(); + assertNull(this.checkpointFile.load()); + + this.checkpointFile.save(new CheckpointFile.Checkpoint(100, 299)); + cp = this.checkpointFile.load(); + assertNotNull(cp); + assertEquals(100, cp.firstLogIndex); + assertEquals(299, cp.committedPos); + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/SegmentFileTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/SegmentFileTest.java new file mode 100644 index 000000000..2f60d0ad5 --- /dev/null +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/log/SegmentFileTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.storage.log; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.storage.BaseStorageTest; +import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions; + +public class SegmentFileTest extends BaseStorageTest { + private static final int FILE_SIZE = 64; + private SegmentFile segmentFile; + + @Override + @Before + public void setup() throws Exception { + super.setup(); + this.segmentFile = new SegmentFile(0, FILE_SIZE, this.path); + } + + @After + public void tearDown() throws Exception { + this.segmentFile.shutdown(); + super.teardown(); + } + + @Test + public void testInitAndLoad() { + assertTrue(init()); + } + + private boolean init() { + return this.segmentFile.init(new SegmentFileOptions(false, true, 0)); + } + + private byte[] genData(final int size) { + final byte[] bs = new byte[size]; + ThreadLocalRandom.current().nextBytes(bs); + return bs; + } + + @Test + public void testWriteRead() throws IOException { + init(); + assertFalse(this.segmentFile.isFull()); + assertNull(this.segmentFile.read(0, 0)); + final byte[] data = genData(32); + assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data))); + assertEquals(0, this.segmentFile.write(0, data)); + // Can't read before sync + assertNull(this.segmentFile.read(0, 0)); + this.segmentFile.sync(); + assertArrayEquals(data, this.segmentFile.read(0, 0)); + + assertTrue(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data))); + assertEquals(38, this.segmentFile.getWrotePos()); + assertEquals(38, this.segmentFile.getCommittedPos()); + assertFalse(this.segmentFile.isFull()); + final byte[] data2 = genData(20); + assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data2))); + assertEquals(38, this.segmentFile.write(1, data2)); + // Can't read before sync + assertNull(this.segmentFile.read(1, 38)); + this.segmentFile.sync(); + assertArrayEquals(data2, this.segmentFile.read(1, 38)); + assertEquals(64, this.segmentFile.getWrotePos()); + assertEquals(64, this.segmentFile.getCommittedPos()); + assertTrue(this.segmentFile.isFull()); + } + + @Test + public void testRecoverFromDirtyMagic() throws Exception { + testWriteRead(); + + { + // Restart segment file, all data is valid. + this.segmentFile.shutdown(); + assertTrue(this.segmentFile.init(new SegmentFileOptions(true, true, 0))); + assertEquals(32, this.segmentFile.read(0, 0).length); + assertEquals(20, this.segmentFile.read(1, 38).length); + } + + { + // Corrupted magic bytes at pos=39 + this.segmentFile.clear(39); + this.segmentFile.shutdown(); + assertTrue(this.segmentFile.init(new SegmentFileOptions(true, true, 0))); + assertEquals(32, this.segmentFile.read(0, 0).length); + assertNull(this.segmentFile.read(1, 38)); + } + + } + + @Test + public void testRecoverFromInvalidData() throws Exception { + testWriteRead(); + + { + // Restart segment file, all data is valid. + this.segmentFile.shutdown(); + assertTrue(this.segmentFile.init(new SegmentFileOptions(true, true, 0))); + assertEquals(32, this.segmentFile.read(0, 0).length); + assertEquals(20, this.segmentFile.read(1, 38).length); + } + + { + // Corrupted magic bytes at pos=39 + + this.segmentFile.shutdown(); + + try (FileOutputStream out = new FileOutputStream(new File(this.segmentFile.getPath()), true); + FileChannel outChan = out.getChannel()) { + outChan.truncate(44); + } + assertTrue(this.segmentFile.init(new SegmentFileOptions(true, true, 0))); + assertEquals(32, this.segmentFile.read(0, 0).length); + assertNull(this.segmentFile.read(1, 38)); + } + + } +} diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java index 1f8ec7a92..77d82019a 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/test/TestUtils.java @@ -43,7 +43,7 @@ */ public class TestUtils { - public static ConfigurationEntry getConfEntry(String confStr, String oldConfStr) { + public static ConfigurationEntry getConfEntry(final String confStr, final String oldConfStr) { ConfigurationEntry entry = new ConfigurationEntry(); entry.setConf(JRaftUtils.getConfiguration(confStr)); entry.setOldConf(JRaftUtils.getConfiguration(oldConfStr)); @@ -54,9 +54,18 @@ public static String mkTempDir() { return System.getProperty("java.io.tmpdir", "/tmp") + File.separator + "jraft_test_" + System.nanoTime(); } - public static LogEntry mockEntry(int index, int term) { + public static LogEntry mockEntry(final int index, final int term) { + return mockEntry(index, term, 0); + } + + public static LogEntry mockEntry(final int index, final int term, final int dataSize) { LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP); entry.setId(new LogId(index, term)); + if (dataSize > 0) { + byte[] bs = new byte[dataSize]; + ThreadLocalRandom.current().nextBytes(bs); + entry.setData(ByteBuffer.wrap(bs)); + } return entry; } @@ -90,7 +99,7 @@ public static String getMyIp() { } } - public static List mockEntries(int n) { + public static List mockEntries(final int n) { List entries = new ArrayList<>(); for (int i = 0; i < n; i++) { LogEntry entry = mockEntry(i, i); @@ -110,7 +119,7 @@ public static RpcRequests.PingRequest createPingRequest() { public static final int INIT_PORT = 5003; - public static List generatePeers(int n) { + public static List generatePeers(final int n) { List ret = new ArrayList<>(); for (int i = 0; i < n; i++) { ret.add(new PeerId(getMyIp(), INIT_PORT + i)); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index 45c2375d3..522968efa 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -45,6 +45,7 @@ import org.rocksdb.DBOptions; import org.rocksdb.Env; import org.rocksdb.EnvOptions; +import org.rocksdb.IndexType; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.Options; import org.rocksdb.ReadOptions; @@ -1481,10 +1482,19 @@ private void destroyRocksDB(final RocksDBOptions opts) throws RocksDBException { // Creates the config for plain table sst format. private static BlockBasedTableConfig createTableConfig() { + // See https://github.com/sofastack/sofa-jraft/pull/156 return new BlockBasedTableConfig() // - .setBlockSize(4 * SizeUnit.KB) // + // Begin to use partitioned index filters + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters#how-to-use-it + .setIndexType(IndexType.kTwoLevelIndexSearch) // .setFilter(new BloomFilter(16, false)) // - .setCacheIndexAndFilterBlocks(true) // + .setPartitionFilters(true) // + .setMetadataBlockSize(8 * SizeUnit.KB) // + .setCacheIndexAndFilterBlocks(false) // + .setCacheIndexAndFilterBlocksWithHighPriority(true) // + .setPinL0FilterAndIndexBlocksInCache(true) // + // End of partitioned index filters settings. + .setBlockSize(4 * SizeUnit.KB)// .setBlockCacheSize(512 * SizeUnit.MB) // .setCacheNumShardBits(8); } diff --git a/pom.xml b/pom.xml index 12aeefdae..823b059e9 100644 --- a/pom.xml +++ b/pom.xml @@ -73,7 +73,7 @@ UTF-8 3.5.1 1.6.0 - 5.14.2 + 5.18.3 1.7.21