diff --git a/rxlib/src/main/java/org/rx/io/KeyValueStore.java b/rxlib/src/main/java/org/rx/io/KeyValueStore.java index c8c9a240..41679cd8 100644 --- a/rxlib/src/main/java/org/rx/io/KeyValueStore.java +++ b/rxlib/src/main/java/org/rx/io/KeyValueStore.java @@ -79,7 +79,7 @@ public boolean enableCompress() { @RequiredArgsConstructor class IteratorContext { final Entry[] buf; - long logPos = wal.meta.getLogPosition(); + long logPos = wal.getPosition(); int writePos; int readPos; int remaining; @@ -130,7 +130,7 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se indexer = new ExternalSortingIndexer<>(new File(String.format("%s/%s", config.getDirectoryPath(), idxName)), config.getIndexBufferSize(), config.getIndexReaderCount()); wal.lock.writeInvoke(() -> { - long pos = wal.meta.getLogPosition(); + long pos = wal.getPosition(); Entry val; $ endPos = $(); while ((val = unsafeRead(pos, null, endPos)) != null) { @@ -146,18 +146,18 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se } key.logPosition = pos; - wal.meta.setLogPosition(endPos.v); + wal.setPosition(endPos.v); indexer.save(key); if (incr) { - wal.meta.incrementSize(); + wal.setSize(wal.getSize() + 1); } log.debug("recover {}", key); pos = endPos.v; } }); - if (wal.meta.extra == null) { - wal.meta.extra = new AtomicInteger(); + if (wal.extra == null) { + wal.extra = new AtomicInteger(); } if (config.getApiPort() > 0) { @@ -186,13 +186,13 @@ public void fastPut(@NonNull TK k, TV v) { incr = true; } - long pos = wal.meta.getLogPosition(); + long pos = wal.getPosition(); if (key.logPosition >= WALFileStream.HEADER_SIZE) { KeyIndexer.KeyEntity finalKey = key; // wal.lock.writeInvoke(() -> { - wal.meta.setLogPosition(finalKey.logPosition); + wal.setPosition(finalKey.logPosition); wal.write(TOMB_MARK); - wal.meta.setLogPosition(pos); + wal.setPosition(pos); log.debug("fastPut mark TOMB {} <- {}", finalKey.logPosition, pos); // }, key.logPosition, 1); } @@ -200,13 +200,13 @@ public void fastPut(@NonNull TK k, TV v) { key.logPosition = pos; wal.write(0); serializer.serialize(val, wal); - int size = (int) (wal.meta.getLogPosition() - key.logPosition); + int size = (int) (wal.getPosition() - key.logPosition); wal.writeInt(size); // log.debug("fastPut {} {}", key, val); indexer.save(key); if (incr) { - wal.meta.incrementSize(); + wal.setSize(wal.getSize() + 1); } }, WALFileStream.HEADER_SIZE); } @@ -220,15 +220,15 @@ public void fastRemove(@NonNull TK k) { return; } - long pos = wal.meta.getLogPosition(); - wal.meta.setLogPosition(key.logPosition); + long pos = wal.getPosition(); + wal.setPosition(key.logPosition); wal.write(TOMB_MARK); - wal.meta.setLogPosition(pos); + wal.setPosition(pos); log.debug("fastRemove {}", key); key.logPosition = TOMB_MARK; indexer.save(key); - wal.meta.decrementSize(); + wal.setSize(wal.getSize() - 1); }, WALFileStream.HEADER_SIZE); } @@ -257,7 +257,7 @@ private Entry unsafeRead(long logPosition, TK k, $ position) { val = serializer.deserialize(wal, true); if (k != null && !k.equals(val.key)) { - AtomicInteger counter = (AtomicInteger) wal.meta.extra; + AtomicInteger counter = (AtomicInteger) wal.extra; int total = counter == null ? -1 : counter.incrementAndGet(); log.warn("LogPosError hash collision {} total={}", k, total); Files.writeLines("./hc_err.log", Linq.from(String.format("%s %s hc=%s total=%s", DateTime.now(), logName @@ -436,7 +436,7 @@ private void apiCheck(ServerRequest req) { //region map @Override public int size() { - return wal.meta.getSize(); + return (int) wal.getSize(); } @Override diff --git a/rxlib/src/main/java/org/rx/io/WALFileStream.java b/rxlib/src/main/java/org/rx/io/WALFileStream.java index 4a99615c..c021b632 100644 --- a/rxlib/src/main/java/org/rx/io/WALFileStream.java +++ b/rxlib/src/main/java/org/rx/io/WALFileStream.java @@ -14,7 +14,6 @@ import java.io.*; import java.nio.channels.FileChannel; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.atomic.AtomicInteger; import static org.rx.core.Extends.require; @@ -45,63 +44,21 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE throw new UnsupportedEncodingException(); } - static class MetaHeader implements Serializable { + private static class MetaHeader implements Serializable { private static final long serialVersionUID = 3894764623767567837L; private void writeObject(ObjectOutputStream out) throws IOException { out.writeLong(logPos); - out.writeInt(size.get()); + out.writeLong(size); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { logPos = in.readLong(); - size = new AtomicInteger(); - size.set(in.readInt()); + size = in.readLong(); } - private transient WALFileStream owner; - private volatile long logPos = HEADER_SIZE; - private AtomicInteger size = new AtomicInteger(); - Object extra; - - public long getLogPosition() { - return logPos; - } - - public void setLogPosition(long logPosition) { - require(logPosition, logPosition >= HEADER_SIZE); - - logPos = logPosition; - writeBack(); - } - - public int getSize() { - return size.get(); - } - - public void setSize(int size) { - this.size.set(size); - writeBack(); - } - - public int incrementSize() { - int s = size.incrementAndGet(); - writeBack(); - return s; - } - - public int decrementSize() { - int s = size.decrementAndGet(); - writeBack(); - return s; - } - - private void writeBack() { - if (owner == null) { - return; - } - owner.saveMeta(); - } + long logPos = HEADER_SIZE; + long size; } static final float GROW_FACTOR = 0.75f; @@ -198,12 +155,17 @@ public boolean canSeek() { @Override public long getPosition() { - return lock.readInvoke(meta::getLogPosition); + return lock.readInvoke(() -> meta.logPos); } @Override public void setPosition(long position) { - lock.writeInvoke(() -> meta.setLogPosition(position)); + require(position, position >= HEADER_SIZE); + + lock.writeInvoke(() -> { + meta.logPos = position; + saveMeta(); + }, 0, HEADER_SIZE); } @Override @@ -211,6 +173,19 @@ public long getLength() { return lock.readInvoke(file::getLength); } + Object extra; + + public long getSize() { + return lock.readInvoke(() -> meta.size); + } + + public void setSize(long size) { + lock.writeInvoke(() -> { + meta.size = size; + saveMeta(); + }, 0, HEADER_SIZE); + } + public WALFileStream(File file, long growSize, int readerCount, @NonNull Serializer serializer) { this.growSize = growSize; this.readerCount = readerCount; @@ -223,7 +198,6 @@ public WALFileStream(File file, long growSize, int readerCount, @NonNull Seriali } meta = loadMeta(); - meta.owner = this; } @Override @@ -234,14 +208,13 @@ protected void freeObjects() { public void clear() { lock.writeInvoke(() -> { - meta.setLogPosition(HEADER_SIZE); - meta.setSize(0); + meta.size = 0; + setPosition(HEADER_SIZE); }); } - public void saveMeta() { - checkNotClosed(); - + void saveMeta() { +// checkNotClosed(); lock.writeInvoke(() -> { writer.setPosition(0); serializer.serialize(meta, writer); @@ -302,7 +275,7 @@ private boolean ensureGrow() { return lock.writeInvoke(() -> { long length = file.getLength(); if (length < growSize - || (meta != null && meta.getLogPosition() / (float) length > GROW_FACTOR)) { + if (length < growSize || (meta != null && meta.logPos / (float) length > GROW_FACTOR)) { long resize = length + growSize; log.info("growSize {} {}->{}", getName(), length, resize); _setLength(resize); @@ -374,9 +347,9 @@ public void write(ByteBuf src, int length) { } private void ensureWrite(BiAction action) { - long logPosition = meta.getLogPosition(); + long logPosition = meta.logPos; lock.writeInvoke(() -> { - if (logPosition != meta.getLogPosition()) { + if (logPosition != meta.logPos) { throw new InvalidException("Concurrent error"); // log.warn("Fallback lock"); // lock.writeInvoke(() -> innerWrite(meta.getLogPosition(), action)); @@ -393,7 +366,8 @@ void innerWrite(long logPosition, BiAction action) { writer.setPosition(logPosition); action.accept(writer); _flush(); - meta.setLogPosition(writer.getPosition()); + + setPosition(writer.getPosition()); } private void _flush() {