Skip to content

Commit

Permalink
Merge branch 'master' into rxsocks
Browse files Browse the repository at this point in the history
  • Loading branch information
RockyLOMO committed Dec 7, 2024
2 parents 71c6936 + d9573ac commit d37e705
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 72 deletions.
56 changes: 0 additions & 56 deletions rxlib/src/main/java/org/rx/io/CompositeLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,62 +84,6 @@ private ReadWriteLock overlaps(long position, long size) {
return null;
}

// @SneakyThrows
// private <T> T lock(FileStream.Block block, boolean shared, @NonNull Func<T> fn) {
// RefCounter<ReadWriteLock> rwLock = null;
// Lock lock = null;
// if (flags.has(Flags.READ_WRITE_LOCK)) {
// synchronized (rwLocks) {
// rwLock = rwLocks.computeIfAbsent(block, k -> {
// RefCounter<ReadWriteLock> t = overlaps(k.position, k.size);
// if (t == null) {
// t = new RefCounter<>(new ReentrantReadWriteLock());
// }
// return t;
// });
// int refCnt = rwLock.incrementRefCnt();
// log.info("Lock incrementRefCnt {}[{}] - {}", rwLock, refCnt, block);
// }
// lock = shared ? rwLock.ref.readLock() : rwLock.ref.writeLock();
// lock.lock();
// }
//
// FileLock fLock = null;
// try {
// if (flags.has(Flags.FILE_LOCK)) {
// fLock = owner.getRandomAccessFile().getChannel().lock(block.position, block.size, shared);
// }
//
// return fn.invoke();
// } finally {
// if (fLock != null) {
// fLock.release();
// }
// if (rwLock != null) {
// lock.unlock();
// synchronized (rwLocks) {
// int refCnt = rwLock.decrementRefCnt();
// log.info("Lock decrementRefCnt {}[{}] - {}", rwLock, refCnt, block);
// if (refCnt == 0) {
// rwLocks.remove(block);
// }
// }
// }
// }
// }
//
// private RefCounter<ReadWriteLock> overlaps(long position, long size) {
// for (Map.Entry<FileStream.Block, RefCounter<ReadWriteLock>> entry : rwLocks.entrySet()) {
// FileStream.Block block = entry.getKey();
// if (position + size <= block.position)
// continue; // That is below this
// if (block.position + block.size <= position)
// continue; // This is below that
// return entry.getValue();
// }
// return null;
// }

public void readInvoke(Action action) {
lock(ALL_BLOCK, true, action.toFunc());
}
Expand Down
31 changes: 15 additions & 16 deletions rxlib/src/main/java/org/rx/io/WALFileStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,19 @@ public void setReaderPosition(long position) {
readerPosition.set(position);
}

// public long getWriterPosition() {
// return meta.getLogPosition();
// }
//
// public void setWriterPosition(long position) {
// meta.setLogPosition(position);
// }

@Override
public boolean canSeek() {
return true;
}

@Override
public long getPosition() {
return meta.getLogPosition();
return lock.readInvoke(meta::getLogPosition);
}

@Override
public void setPosition(long position) {
meta.setLogPosition(position);
lock.writeInvoke(() -> meta.setLogPosition(position));
}

@Override
Expand Down Expand Up @@ -386,17 +378,24 @@ private void ensureWrite(BiAction<IOStream> action) {
lock.writeInvoke(() -> {
if (logPosition != meta.getLogPosition()) {
throw new InvalidException("Concurrent error");
// log.warn("Fallback lock");
// lock.writeInvoke(() -> innerWrite(meta.getLogPosition(), action));
// return;
}

ensureGrow();

writer.setPosition(logPosition);
action.invoke(writer);
_flush();
meta.setLogPosition(writer.getPosition());
innerWrite(logPosition, action);
}, logPosition);
}

void innerWrite(long logPosition, BiAction<IOStream> action) {
ensureGrow();

writer.setPosition(logPosition);
action.accept(writer);
_flush();
meta.setLogPosition(writer.getPosition());
}

private void _flush() {
long delay = flushDelayMillis;
if (delay <= 0) {
Expand Down

0 comments on commit d37e705

Please sign in to comment.