Skip to content

Commit

Permalink
Implement BufferLease using Phaser
Browse files Browse the repository at this point in the history
Implement BufferLease using Phaser for #61 

---------

Co-authored-by: Mikko Kortelainen <1@teragrep.com>
  • Loading branch information
eemhu and kortemik authored Mar 6, 2024
1 parent 1def6db commit 7d5c9ba
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void handleEvent(SelectionKey selectionKey) {

@Override
public InterestOps interestOps() {
return this.interestOps;
return interestOps;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.teragrep.rlp_03.context.buffer;

import java.nio.ByteBuffer;

/**
* Interface for a buffer container object.
*/
public interface BufferContainer {
long id();
ByteBuffer buffer();
boolean isStub();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.teragrep.rlp_03.context.buffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;

/**
* Implementation of the BufferContainer interface. Contains the buffer with a synchronized (lock-free)
* way of accessing it.
*/
public class BufferContainerImpl implements BufferContainer {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferContainerImpl.class);
private final long id;
private final ByteBuffer buffer;

public BufferContainerImpl(long id, ByteBuffer buffer) {
this.id = id;
this.buffer = buffer;
}

@Override
public long id() {
return id;
}

@Override
public synchronized ByteBuffer buffer() {
return buffer;
}

@Override
public String toString() {
return "BufferContainer{" +
"buffer=" + buffer +
", id=" + id +
'}';
}

@Override
public boolean isStub() {
LOGGER.debug("id <{}>", id);
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.teragrep.rlp_03.context.buffer;

import java.nio.ByteBuffer;

/**
* Buffer container stub object. Use isStub() to check.
* Other methods will result in an IllegalStateException.
*/
public class BufferContainerStub implements BufferContainer {
@Override
public long id() {
throw new IllegalStateException("BufferContainerStub does not have an id!");
}

@Override
public ByteBuffer buffer() {
throw new IllegalStateException("BufferContainerStub does not allow access to the buffer!");
}

@Override
public boolean isStub() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public interface BufferLease {
long id();

long refs();

ByteBuffer buffer();

void addRef();
Expand All @@ -61,6 +62,4 @@ public interface BufferLease {
boolean isRefCountZero();

boolean isStub();

boolean attemptRelease();
}
116 changes: 34 additions & 82 deletions src/main/java/com/teragrep/rlp_03/context/buffer/BufferLeaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,128 +46,80 @@

package com.teragrep.rlp_03.context.buffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.Phaser;

public class BufferLeaseImpl implements BufferLease {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferLeaseImpl.class);
private final long id;
private final ByteBuffer buffer;
private long refCount; // TODO consider using a semaphore
private final Lock lock;
private final BufferContainer bufferContainer;
private final Phaser phaser;
private final BufferLeasePool bufferLeasePool;

public BufferLeaseImpl(BufferContainer bc, BufferLeasePool bufferLeasePool) {
this.bufferContainer = bc;
this.bufferLeasePool = bufferLeasePool;

public BufferLeaseImpl(long id, ByteBuffer buffer) {
this.id = id;
this.buffer = buffer;
this.refCount = 0;
this.lock = new ReentrantLock();
// initial registered parties set to 1
this.phaser = new ClearingPhaser(1);
}

@Override
public long id() {
return id;
return bufferContainer.id();
}

@Override
public long refs() {
lock.lock();
try {
return refCount;
}
finally {
lock.unlock();
}
// initial number of registered parties is 1
return phaser.getRegisteredParties();
}

@Override
public ByteBuffer buffer() {
lock.lock();
try {
return buffer;
}
finally {
lock.unlock();
}

return bufferContainer.buffer();
}

@Override
public void addRef() {
lock.lock();
try {
refCount++;
}
finally {
lock.unlock();
if (phaser.register() < 0) {
throw new IllegalStateException("Cannot add reference, BufferLease phaser was already terminated!");
}
}

@Override
public void removeRef() {
lock.lock();
try {

long newRefs = refCount - 1;
if (newRefs < 0) {
throw new IllegalStateException("refs must not be negative");
}

refCount = newRefs;
}
finally {
lock.unlock();
if (phaser.arriveAndDeregister() < 0) {
throw new IllegalStateException("Cannot remove reference, BufferLease phaser was already terminated!");
}
}

@Override
public boolean isRefCountZero() {
lock.lock();
try {
return refCount == 0;
}
finally {
lock.unlock();
}
}


@Override
public String toString() {
lock.lock();
try {
return "BufferLease{" +
"buffer=" + buffer +
", refCount=" + refCount +
'}';
} finally {
lock.unlock();
}
return phaser.isTerminated();
}

@Override
public boolean isStub() {
LOGGER.debug("id <{}>", id);
return false;
return bufferContainer.isStub();
}

@Override
public boolean attemptRelease() {
lock.lock();
try {
/**
* Phaser that clears the buffer on termination (registeredParties=0)
*/
private class ClearingPhaser extends Phaser {
public ClearingPhaser(int i) {
super(i);
}

@Override
protected boolean onAdvance(int phase, int registeredParties) {
boolean rv = false;
removeRef();
if (isRefCountZero()) {
if (registeredParties == 0) {
buffer().clear();
bufferLeasePool.internalOffer(bufferContainer);
rv = true;
}
return rv;

} finally {
lock.unlock();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ public class BufferLeasePool {

private final Supplier<ByteBuffer> byteBufferSupplier;

private final ConcurrentLinkedQueue<BufferLease> queue;
private final ConcurrentLinkedQueue<BufferContainer> queue;

private final BufferLease bufferLeaseStub;
private final BufferContainer bufferContainerStub;
private final AtomicBoolean close;

private final int segmentSize;
Expand All @@ -84,23 +85,33 @@ public BufferLeasePool() {
this.byteBufferSupplier = () -> ByteBuffer.allocateDirect(segmentSize); // TODO configurable extents
this.queue = new ConcurrentLinkedQueue<>();
this.bufferLeaseStub = new BufferLeaseStub();
this.bufferContainerStub = new BufferContainerStub();
this.close = new AtomicBoolean();
this.bufferId = new AtomicLong();
this.lock = new ReentrantLock();
}

private BufferLease take() {
// get or create
BufferLease bufferLease = queue.poll();
if (bufferLease == null) {
bufferLease = new BufferLeaseImpl(bufferId.incrementAndGet(), byteBufferSupplier.get());
BufferContainer bufferContainer = queue.poll();
BufferLease bufferLease;
if (bufferContainer == null) {
// if queue is empty or stub object, create a new BufferContainer and BufferLease.
bufferLease = new BufferLeaseImpl(
new BufferContainerImpl(bufferId.incrementAndGet(), byteBufferSupplier.get()), this);
} else {
// otherwise, wrap bufferContainer with phaser decorator (bufferLease)
bufferLease = new BufferLeaseImpl(bufferContainer, this);
}

bufferLease.addRef(); // all start with one ref

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("returning bufferLease id <{}> with refs <{}> at buffer position <{}>", bufferLease.id(), bufferLease.refs(), bufferLease.buffer().position());
}

if (bufferLease.buffer().position() != 0) {
throw new IllegalStateException("Dirty buffer in pool, terminating!");
}

return bufferLease;

}
Expand All @@ -124,26 +135,20 @@ public List<BufferLease> take(long size) {
}

public void offer(BufferLease bufferLease) {
if (bufferLease.attemptRelease()) {
internalOffer(bufferLease);
}
bufferLease.removeRef();
}

private void internalOffer(BufferLease bufferLease) {
if (!bufferLease.isStub()) {
queue.add(bufferLease);
void internalOffer(BufferContainer bufferContainer) {
// Add buffer back to pool if it is not a stub object
if (!bufferContainer.isStub()) {
queue.add(bufferContainer);
}

if (close.get()) {
LOGGER.debug("closing in offer");
while (queue.peek() != null) {
while (!queue.isEmpty()) {
if (lock.tryLock()) {
while (true) {
BufferLease queuedBufferLease = queue.poll();
if (queuedBufferLease == null) {
break;
}
}
queue.clear();
lock.unlock();
} else {
break;
Expand All @@ -162,11 +167,11 @@ public void close() {
close.set(true);

// close all that are in the pool right now
internalOffer(bufferLeaseStub);
internalOffer(bufferContainerStub);

}

public int estimatedSize() {
return queue.size();
}
}
}
Loading

0 comments on commit 7d5c9ba

Please sign in to comment.