Skip to content

Commit

Permalink
Use of BlockingQueue.drainTo() rather than multiple poll() calls (htt…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 7, 2016
1 parent e4ee360 commit 88d6a21
Showing 1 changed file with 66 additions and 36 deletions.
102 changes: 66 additions & 36 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
@@ -1,84 +1,95 @@
package org.jgroups.protocols;

/**
* @author Bela Ban
* @since x.y
*/

import org.jgroups.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static org.jgroups.protocols.TP.assertPositive;

/**
* This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send
* messages immediately when no other messages are available. https://issues.jboss.org/browse/JGRP-1540
* @author Bela Ban
*/
public class TransferQueueBundler extends BaseBundler implements Runnable {
protected BlockingQueue<Message> queue;
protected List<Message> remove_queue;
protected volatile Thread bundler_thread;
protected volatile boolean running=true;
protected static final String THREAD_NAME="TransferQueueBundler";

public TransferQueueBundler() {
this.remove_queue=new ArrayList<>(16);
}

protected TransferQueueBundler(BlockingQueue<Message> queue) {
this.queue = queue;
this.queue=queue;
this.remove_queue=new ArrayList<>(16);
}

protected TransferQueueBundler(int capacity) {
public TransferQueueBundler(int capacity) {
this(new ArrayBlockingQueue<Message>(assertPositive(capacity, "bundler capacity cannot be " + capacity)));
}

public Thread getThread() {return bundler_thread;}
public int getBufferSize() {return queue.size();}
public Thread getThread() {return bundler_thread;}
public int getBufferSize() {return queue.size();}
public int removeQueueSize() {return remove_queue.size();}
public TransferQueueBundler removeQueueSize(int size) {this.remove_queue=new ArrayList<>(size); return this;}

public void init(TP transport) {
super.init(transport);
queue=new ArrayBlockingQueue<>(assertPositive(transport.getBundlerCapacity(), "bundler capacity cannot be " + transport.getBundlerCapacity()));
}
if(queue == null)
queue=new ArrayBlockingQueue<>(assertPositive(transport.getBundlerCapacity(), "bundler capacity cannot be " + transport.getBundlerCapacity()));
}

public synchronized void start() {
if(bundler_thread != null)
if(running)
stop();
bundler_thread=transport.getThreadFactory().newThread(this, THREAD_NAME);
running=true;
bundler_thread.start();
}

public synchronized void stop() {
Thread tmp=bundler_thread;
bundler_thread=null;
if(tmp != null) {
tmp.interrupt();
if(tmp.isAlive()) {
try {tmp.join(500);} catch(InterruptedException e) {}
}
}
queue.clear();
_stop(true);
}

public synchronized void stopAndFlush() {
_stop(false);
}

public void send(Message msg) throws Exception {
if(bundler_thread != null)
if(running)
queue.put(msg);
}

public void run() {
while(Thread.currentThread() == bundler_thread) {
while(running) {
Message msg=null;
try {
if(count == 0) {
msg=queue.take();
if(msg == null)
continue;
long size=msg.size();
if(count + size >= transport.getMaxBundleSize())
sendBundledMessages();
addMessage(msg, size);
}
while(null != (msg=queue.poll())) {
long size=msg.size();
if(count + size >= transport.getMaxBundleSize())
sendBundledMessages();
addMessage(msg, size);
if((msg=queue.take()) == null)
continue;
long size=msg.size();
if(count + size >= transport.getMaxBundleSize())
sendBundledMessages();
addMessage(msg, size);
while(true) {
remove_queue.clear();
int num_msgs=queue.drainTo(remove_queue);
if(num_msgs <= 0)
break;
for(int i=0; i < remove_queue.size(); i++) {
msg=remove_queue.get(i);
size=msg.size();
if(count + size >= transport.getMaxBundleSize())
sendBundledMessages();
addMessage(msg, size);
}
}
if(count > 0)
sendBundledMessages();
Expand All @@ -87,5 +98,24 @@ public void run() {
}
}
}
}

protected void _stop(boolean clear_queue) {
running=false;
Thread tmp=bundler_thread;
bundler_thread=null;
if(tmp != null) {
tmp.interrupt();
if(tmp.isAlive()) {
try {tmp.join(500);} catch(InterruptedException e) {}
}
}
if(clear_queue)
queue.clear();
}


protected static int assertPositive(int value, String message) {
if(value <= 0) throw new IllegalArgumentException(message);
return value;
}
}

0 comments on commit 88d6a21

Please sign in to comment.