Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
RockyLOMO committed Dec 7, 2024
1 parent d9573ac commit 3c334b3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 77 deletions.
34 changes: 17 additions & 17 deletions rxlib/src/main/java/org/rx/io/KeyValueStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean enableCompress() {
@RequiredArgsConstructor
class IteratorContext {
final Entry<TK, TV>[] buf;
long logPos = wal.meta.getLogPosition();
long logPos = wal.getPosition();
int writePos;
int readPos;
int remaining;
Expand Down Expand Up @@ -130,7 +130,7 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se
indexer = new ExternalSortingIndexer<>(new File(String.format("%s/%s", config.getDirectoryPath(), idxName)), config.getIndexBufferSize(), config.getIndexReaderCount());

wal.lock.writeInvoke(() -> {
long pos = wal.meta.getLogPosition();
long pos = wal.getPosition();
Entry<TK, TV> val;
$<Long> endPos = $();
while ((val = unsafeRead(pos, null, endPos)) != null) {
Expand All @@ -146,18 +146,18 @@ public KeyValueStore(@NonNull KeyValueStoreConfig config, @NonNull Serializer se
}

key.logPosition = pos;
wal.meta.setLogPosition(endPos.v);
wal.setPosition(endPos.v);
indexer.save(key);

if (incr) {
wal.meta.incrementSize();
wal.setSize(wal.getSize() + 1);
}
log.debug("recover {}", key);
pos = endPos.v;
}
});
if (wal.meta.extra == null) {
wal.meta.extra = new AtomicInteger();
if (wal.extra == null) {
wal.extra = new AtomicInteger();
}

if (config.getApiPort() > 0) {
Expand Down Expand Up @@ -186,27 +186,27 @@ public void fastPut(@NonNull TK k, TV v) {
incr = true;
}

long pos = wal.meta.getLogPosition();
long pos = wal.getPosition();
if (key.logPosition >= WALFileStream.HEADER_SIZE) {
KeyIndexer.KeyEntity<TK> finalKey = key;
// wal.lock.writeInvoke(() -> {
wal.meta.setLogPosition(finalKey.logPosition);
wal.setPosition(finalKey.logPosition);
wal.write(TOMB_MARK);
wal.meta.setLogPosition(pos);
wal.setPosition(pos);
log.debug("fastPut mark TOMB {} <- {}", finalKey.logPosition, pos);
// }, key.logPosition, 1);
}

key.logPosition = pos;
wal.write(0);
serializer.serialize(val, wal);
int size = (int) (wal.meta.getLogPosition() - key.logPosition);
int size = (int) (wal.getPosition() - key.logPosition);
wal.writeInt(size);
// log.debug("fastPut {} {}", key, val);

indexer.save(key);
if (incr) {
wal.meta.incrementSize();
wal.setSize(wal.getSize() + 1);
}
}, WALFileStream.HEADER_SIZE);
}
Expand All @@ -220,15 +220,15 @@ public void fastRemove(@NonNull TK k) {
return;
}

long pos = wal.meta.getLogPosition();
wal.meta.setLogPosition(key.logPosition);
long pos = wal.getPosition();
wal.setPosition(key.logPosition);
wal.write(TOMB_MARK);
wal.meta.setLogPosition(pos);
wal.setPosition(pos);
log.debug("fastRemove {}", key);

key.logPosition = TOMB_MARK;
indexer.save(key);
wal.meta.decrementSize();
wal.setSize(wal.getSize() - 1);
}, WALFileStream.HEADER_SIZE);
}

Expand Down Expand Up @@ -257,7 +257,7 @@ private Entry<TK, TV> unsafeRead(long logPosition, TK k, $<Long> position) {
val = serializer.deserialize(wal, true);

if (k != null && !k.equals(val.key)) {
AtomicInteger counter = (AtomicInteger) wal.meta.extra;
AtomicInteger counter = (AtomicInteger) wal.extra;
int total = counter == null ? -1 : counter.incrementAndGet();
log.warn("LogPosError hash collision {} total={}", k, total);
Files.writeLines("./hc_err.log", Linq.from(String.format("%s %s hc=%s total=%s", DateTime.now(), logName
Expand Down Expand Up @@ -436,7 +436,7 @@ private void apiCheck(ServerRequest req) {
//region map
@Override
public int size() {
return wal.meta.getSize();
return (int) wal.getSize();
}

@Override
Expand Down
94 changes: 34 additions & 60 deletions rxlib/src/main/java/org/rx/io/WALFileStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static org.rx.core.Extends.require;

Expand Down Expand Up @@ -45,63 +44,21 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new UnsupportedEncodingException();
}

static class MetaHeader implements Serializable {
private static class MetaHeader implements Serializable {
private static final long serialVersionUID = 3894764623767567837L;

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeLong(logPos);
out.writeInt(size.get());
out.writeLong(size);
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
logPos = in.readLong();
size = new AtomicInteger();
size.set(in.readInt());
size = in.readLong();
}

private transient WALFileStream owner;
private volatile long logPos = HEADER_SIZE;
private AtomicInteger size = new AtomicInteger();
Object extra;

public long getLogPosition() {
return logPos;
}

public void setLogPosition(long logPosition) {
require(logPosition, logPosition >= HEADER_SIZE);

logPos = logPosition;
writeBack();
}

public int getSize() {
return size.get();
}

public void setSize(int size) {
this.size.set(size);
writeBack();
}

public int incrementSize() {
int s = size.incrementAndGet();
writeBack();
return s;
}

public int decrementSize() {
int s = size.decrementAndGet();
writeBack();
return s;
}

private void writeBack() {
if (owner == null) {
return;
}
owner.saveMeta();
}
long logPos = HEADER_SIZE;
long size;
}

static final float GROW_FACTOR = 0.75f;
Expand Down Expand Up @@ -198,19 +155,37 @@ public boolean canSeek() {

@Override
public long getPosition() {
return lock.readInvoke(meta::getLogPosition);
return lock.readInvoke(() -> meta.logPos);
}

@Override
public void setPosition(long position) {
lock.writeInvoke(() -> meta.setLogPosition(position));
require(position, position >= HEADER_SIZE);

lock.writeInvoke(() -> {
meta.logPos = position;
saveMeta();
}, 0, HEADER_SIZE);
}

@Override
public long getLength() {
return lock.readInvoke(file::getLength);
}

Object extra;

public long getSize() {
return lock.readInvoke(() -> meta.size);
}

public void setSize(long size) {
lock.writeInvoke(() -> {
meta.size = size;
saveMeta();
}, 0, HEADER_SIZE);
}

public WALFileStream(File file, long growSize, int readerCount, @NonNull Serializer serializer) {
this.growSize = growSize;
this.readerCount = readerCount;
Expand All @@ -223,7 +198,6 @@ public WALFileStream(File file, long growSize, int readerCount, @NonNull Seriali
}

meta = loadMeta();
meta.owner = this;
}

@Override
Expand All @@ -234,14 +208,13 @@ protected void freeObjects() {

public void clear() {
lock.writeInvoke(() -> {
meta.setLogPosition(HEADER_SIZE);
meta.setSize(0);
meta.size = 0;
setPosition(HEADER_SIZE);
});
}

public void saveMeta() {
checkNotClosed();

void saveMeta() {
// checkNotClosed();
lock.writeInvoke(() -> {
writer.setPosition(0);
serializer.serialize(meta, writer);
Expand Down Expand Up @@ -302,7 +275,7 @@ private boolean ensureGrow() {
return lock.writeInvoke(() -> {
long length = file.getLength();
if (length < growSize
|| (meta != null && meta.getLogPosition() / (float) length > GROW_FACTOR)) {
if (length < growSize || (meta != null && meta.logPos / (float) length > GROW_FACTOR)) {
long resize = length + growSize;
log.info("growSize {} {}->{}", getName(), length, resize);
_setLength(resize);
Expand Down Expand Up @@ -374,9 +347,9 @@ public void write(ByteBuf src, int length) {
}

private void ensureWrite(BiAction<IOStream> action) {
long logPosition = meta.getLogPosition();
long logPosition = meta.logPos;
lock.writeInvoke(() -> {
if (logPosition != meta.getLogPosition()) {
if (logPosition != meta.logPos) {
throw new InvalidException("Concurrent error");
// log.warn("Fallback lock");
// lock.writeInvoke(() -> innerWrite(meta.getLogPosition(), action));
Expand All @@ -393,7 +366,8 @@ void innerWrite(long logPosition, BiAction<IOStream> action) {
writer.setPosition(logPosition);
action.accept(writer);
_flush();
meta.setLogPosition(writer.getPosition());

setPosition(writer.getPosition());
}

private void _flush() {
Expand Down

0 comments on commit 3c334b3

Please sign in to comment.