From 3c334b3ca1ab76a5825469cc3e90cf551fc39e48 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E2=84=9E?= <ilovehaley.kid@gmail.com>
Date: Sat, 7 Dec 2024 18:03:15 +0800
Subject: [PATCH 1/2] up

---
 .../main/java/org/rx/io/KeyValueStore.java    | 34 +++----
 .../main/java/org/rx/io/WALFileStream.java    | 94 +++++++------------
 2 files changed, 51 insertions(+), 77 deletions(-)

diff --git a/rxlib/src/main/java/org/rx/io/KeyValueStore.java b/rxlib/src/main/java/org/rx/io/KeyValueStore.java
index c8c9a240..41679cd8 100644
--- a/rxlib/src/main/java/org/rx/io/KeyValueStore.java
+++ b/rxlib/src/main/java/org/rx/io/KeyValueStore.java
@@ -79,7 +79,7 @@ public boolean enableCompress() {
     @RequiredArgsConstructor
     class IteratorContext {
         final Entry<TK, TV>[] buf;
-        long logPos = wal.meta.getLogPosition();
+        long logPos = wal.getPosition();
         int writePos;
         int readPos;
         int remaining;
@@ -130,7 +130,7 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se
         indexer = new ExternalSortingIndexer<>(new File(String.format("%s/%s", config.getDirectoryPath(), idxName)), config.getIndexBufferSize(), config.getIndexReaderCount());
 
         wal.lock.writeInvoke(() -> {
-            long pos = wal.meta.getLogPosition();
+            long pos = wal.getPosition();
             Entry<TK, TV> val;
             $<Long> endPos = $();
             while ((val = unsafeRead(pos, null, endPos)) != null) {
@@ -146,18 +146,18 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se
                 }
 
                 key.logPosition = pos;
-                wal.meta.setLogPosition(endPos.v);
+                wal.setPosition(endPos.v);
                 indexer.save(key);
 
                 if (incr) {
-                    wal.meta.incrementSize();
+                    wal.setSize(wal.getSize() + 1);
                 }
                 log.debug("recover {}", key);
                 pos = endPos.v;
             }
         });
-        if (wal.meta.extra == null) {
-            wal.meta.extra = new AtomicInteger();
+        if (wal.extra == null) {
+            wal.extra = new AtomicInteger();
         }
 
         if (config.getApiPort() > 0) {
@@ -186,13 +186,13 @@ public void fastPut(@NonNull TK k, TV v) {
                 incr = true;
             }
 
-            long pos = wal.meta.getLogPosition();
+            long pos = wal.getPosition();
             if (key.logPosition >= WALFileStream.HEADER_SIZE) {
                 KeyIndexer.KeyEntity<TK> finalKey = key;
 //                wal.lock.writeInvoke(() -> {
-                wal.meta.setLogPosition(finalKey.logPosition);
+                wal.setPosition(finalKey.logPosition);
                 wal.write(TOMB_MARK);
-                wal.meta.setLogPosition(pos);
+                wal.setPosition(pos);
                 log.debug("fastPut mark TOMB {} <- {}", finalKey.logPosition, pos);
 //                }, key.logPosition, 1);
             }
@@ -200,13 +200,13 @@ public void fastPut(@NonNull TK k, TV v) {
             key.logPosition = pos;
             wal.write(0);
             serializer.serialize(val, wal);
-            int size = (int) (wal.meta.getLogPosition() - key.logPosition);
+            int size = (int) (wal.getPosition() - key.logPosition);
             wal.writeInt(size);
 //            log.debug("fastPut {} {}", key, val);
 
             indexer.save(key);
             if (incr) {
-                wal.meta.incrementSize();
+                wal.setSize(wal.getSize() + 1);
             }
         }, WALFileStream.HEADER_SIZE);
     }
@@ -220,15 +220,15 @@ public void fastRemove(@NonNull TK k) {
                 return;
             }
 
-            long pos = wal.meta.getLogPosition();
-            wal.meta.setLogPosition(key.logPosition);
+            long pos = wal.getPosition();
+            wal.setPosition(key.logPosition);
             wal.write(TOMB_MARK);
-            wal.meta.setLogPosition(pos);
+            wal.setPosition(pos);
             log.debug("fastRemove {}", key);
 
             key.logPosition = TOMB_MARK;
             indexer.save(key);
-            wal.meta.decrementSize();
+            wal.setSize(wal.getSize() - 1);
         }, WALFileStream.HEADER_SIZE);
     }
 
@@ -257,7 +257,7 @@ private Entry<TK, TV> unsafeRead(long logPosition, TK k, $<Long> position) {
             val = serializer.deserialize(wal, true);
 
             if (k != null && !k.equals(val.key)) {
-                AtomicInteger counter = (AtomicInteger) wal.meta.extra;
+                AtomicInteger counter = (AtomicInteger) wal.extra;
                 int total = counter == null ? -1 : counter.incrementAndGet();
                 log.warn("LogPosError hash collision {} total={}", k, total);
                 Files.writeLines("./hc_err.log", Linq.from(String.format("%s %s hc=%s total=%s", DateTime.now(), logName
@@ -436,7 +436,7 @@ private void apiCheck(ServerRequest req) {
     //region map
     @Override
     public int size() {
-        return wal.meta.getSize();
+        return (int) wal.getSize();
     }
 
     @Override
diff --git a/rxlib/src/main/java/org/rx/io/WALFileStream.java b/rxlib/src/main/java/org/rx/io/WALFileStream.java
index 4a99615c..c021b632 100644
--- a/rxlib/src/main/java/org/rx/io/WALFileStream.java
+++ b/rxlib/src/main/java/org/rx/io/WALFileStream.java
@@ -14,7 +14,6 @@
 import java.io.*;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.rx.core.Extends.require;
 
@@ -45,63 +44,21 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
         throw new UnsupportedEncodingException();
     }
 
-    static class MetaHeader implements Serializable {
+    private static class MetaHeader implements Serializable {
         private static final long serialVersionUID = 3894764623767567837L;
 
         private void writeObject(ObjectOutputStream out) throws IOException {
             out.writeLong(logPos);
-            out.writeInt(size.get());
+            out.writeLong(size);
         }
 
         private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
             logPos = in.readLong();
-            size = new AtomicInteger();
-            size.set(in.readInt());
+            size = in.readLong();
         }
 
-        private transient WALFileStream owner;
-        private volatile long logPos = HEADER_SIZE;
-        private AtomicInteger size = new AtomicInteger();
-        Object extra;
-
-        public long getLogPosition() {
-            return logPos;
-        }
-
-        public void setLogPosition(long logPosition) {
-            require(logPosition, logPosition >= HEADER_SIZE);
-
-            logPos = logPosition;
-            writeBack();
-        }
-
-        public int getSize() {
-            return size.get();
-        }
-
-        public void setSize(int size) {
-            this.size.set(size);
-            writeBack();
-        }
-
-        public int incrementSize() {
-            int s = size.incrementAndGet();
-            writeBack();
-            return s;
-        }
-
-        public int decrementSize() {
-            int s = size.decrementAndGet();
-            writeBack();
-            return s;
-        }
-
-        private void writeBack() {
-            if (owner == null) {
-                return;
-            }
-            owner.saveMeta();
-        }
+        long logPos = HEADER_SIZE;
+        long size;
     }
 
     static final float GROW_FACTOR = 0.75f;
@@ -198,12 +155,17 @@ public boolean canSeek() {
 
     @Override
     public long getPosition() {
-        return lock.readInvoke(meta::getLogPosition);
+        return lock.readInvoke(() -> meta.logPos);
     }
 
     @Override
     public void setPosition(long position) {
-        lock.writeInvoke(() -> meta.setLogPosition(position));
+        require(position, position >= HEADER_SIZE);
+
+        lock.writeInvoke(() -> {
+            meta.logPos = position;
+            saveMeta();
+        }, 0, HEADER_SIZE);
     }
 
     @Override
@@ -211,6 +173,19 @@ public long getLength() {
         return lock.readInvoke(file::getLength);
     }
 
+    Object extra;
+
+    public long getSize() {
+        return lock.readInvoke(() -> meta.size);
+    }
+
+    public void setSize(long size) {
+        lock.writeInvoke(() -> {
+            meta.size = size;
+            saveMeta();
+        }, 0, HEADER_SIZE);
+    }
+
     public WALFileStream(File file, long growSize, int readerCount, @NonNull Serializer serializer) {
         this.growSize = growSize;
         this.readerCount = readerCount;
@@ -223,7 +198,6 @@ public WALFileStream(File file, long growSize, int readerCount, @NonNull Seriali
         }
 
         meta = loadMeta();
-        meta.owner = this;
     }
 
     @Override
@@ -234,14 +208,13 @@ protected void freeObjects() {
 
     public void clear() {
         lock.writeInvoke(() -> {
-            meta.setLogPosition(HEADER_SIZE);
-            meta.setSize(0);
+            meta.size = 0;
+            setPosition(HEADER_SIZE);
         });
     }
 
-    public void saveMeta() {
-        checkNotClosed();
-
+    void saveMeta() {
+//        checkNotClosed();
         lock.writeInvoke(() -> {
             writer.setPosition(0);
             serializer.serialize(meta, writer);
@@ -302,7 +275,7 @@ private boolean ensureGrow() {
         return lock.writeInvoke(() -> {
             long length = file.getLength();
             if (length < growSize
-                    || (meta != null && meta.getLogPosition() / (float) length > GROW_FACTOR)) {
+            if (length < growSize || (meta != null && meta.logPos / (float) length > GROW_FACTOR)) {
                 long resize = length + growSize;
                 log.info("growSize {} {}->{}", getName(), length, resize);
                 _setLength(resize);
@@ -374,9 +347,9 @@ public void write(ByteBuf src, int length) {
     }
 
     private void ensureWrite(BiAction<IOStream> action) {
-        long logPosition = meta.getLogPosition();
+        long logPosition = meta.logPos;
         lock.writeInvoke(() -> {
-            if (logPosition != meta.getLogPosition()) {
+            if (logPosition != meta.logPos) {
                 throw new InvalidException("Concurrent error");
 //                log.warn("Fallback lock");
 //                lock.writeInvoke(() -> innerWrite(meta.getLogPosition(), action));
@@ -393,7 +366,8 @@ void innerWrite(long logPosition, BiAction<IOStream> action) {
         writer.setPosition(logPosition);
         action.accept(writer);
         _flush();
-        meta.setLogPosition(writer.getPosition());
+
+        setPosition(writer.getPosition());
     }
 
     private void _flush() {

From 48d7a4ae6aa9c3d495abb94619f9f3c57a89621f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E2=84=9E?= <ilovehaley.kid@gmail.com>
Date: Sat, 7 Dec 2024 18:03:25 +0800
Subject: [PATCH 2/2] Update WALFileStream.java

---
 rxlib/src/main/java/org/rx/io/WALFileStream.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/rxlib/src/main/java/org/rx/io/WALFileStream.java b/rxlib/src/main/java/org/rx/io/WALFileStream.java
index c021b632..fd024c7d 100644
--- a/rxlib/src/main/java/org/rx/io/WALFileStream.java
+++ b/rxlib/src/main/java/org/rx/io/WALFileStream.java
@@ -274,7 +274,6 @@ private void releaseReaderAndWriter() {
     private boolean ensureGrow() {
         return lock.writeInvoke(() -> {
             long length = file.getLength();
-            if (length < growSize
             if (length < growSize || (meta != null && meta.logPos / (float) length > GROW_FACTOR)) {
                 long resize = length + growSize;
                 log.info("growSize {} {}->{}", getName(), length, resize);