Skip to content

Commit

Permalink
Merge pull request #59 from joschi/SERVER-30
Browse files Browse the repository at this point in the history
Introduced threadsafe data structures for ChunkedGELFClientManager
  • Loading branch information
Lennart Koopmann committed Nov 7, 2011
2 parents c0f23b4 + 98cebcd commit f9fb83c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/main/java/org/graylog2/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.graylog2.messagehandlers.amqp.AMQPBroker;
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
import org.graylog2.messagehandlers.amqp.AMQPSubscriberThread;
import org.graylog2.messagehandlers.gelf.ChunkedGELFClientManager;
import org.graylog2.messagehandlers.gelf.GELFMainThread;
import org.graylog2.messagehandlers.syslog.SyslogServerThread;
import org.graylog2.periodical.ChunkedGELFClientManagerThread;
Expand Down Expand Up @@ -172,7 +173,7 @@ private static void initializeGELFThreads(int gelfPort) {
GELFMainThread gelfThread = new GELFMainThread(gelfPort);
gelfThread.start();

ChunkedGELFClientManagerThread gelfManager = new ChunkedGELFClientManagerThread();
ChunkedGELFClientManagerThread gelfManager = new ChunkedGELFClientManagerThread(ChunkedGELFClientManager.getInstance());
gelfManager.start();

LOG.info("GELF threads started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

package org.graylog2.messagehandlers.gelf;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* ChunkedGELFClientManager.java: Sep 20, 2010 6:52:36 PM
Expand All @@ -33,7 +33,7 @@
*/
public final class ChunkedGELFClientManager {

private static Map<String, ChunkedGELFMessage> messageMap = new HashMap<String, ChunkedGELFMessage>();
private static ConcurrentMap<String, ChunkedGELFMessage> messageMap = new ConcurrentHashMap<String, ChunkedGELFMessage>();

private static ChunkedGELFClientManager instance;

Expand Down Expand Up @@ -93,7 +93,7 @@ public void dropMessage(String hash) {
*
* @return
*/
public Map<String, ChunkedGELFMessage> getMessageMap() {
public ConcurrentMap<String, ChunkedGELFMessage> getMessageMap() {
return messageMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,62 @@
import org.graylog2.messagehandlers.gelf.ChunkedGELFMessage;
import org.graylog2.messagehandlers.gelf.EmptyGELFMessageException;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* ChunkedGELFClientManagerThread.java: Sep 20, 2010 9:28:37 PM
*
* <p/>
* [description]
*
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class ChunkedGELFClientManagerThread extends Thread {

private static final Logger LOG = Logger.getLogger(ChunkedGELFClientManagerThread.class);
private static final int RUN_INTERVAL = 10000;

private ChunkedGELFClientManager gelfClientManager;

public ChunkedGELFClientManagerThread(ChunkedGELFClientManager gelfClientManager) {
this.gelfClientManager = gelfClientManager;
}

/**
* Start the thread. Runs forever.
*/
@Override public void run() {
@Override
public void run() {
// Run forever.
while (true) {
try {
Map<String, ChunkedGELFMessage> messageMap = ChunkedGELFClientManager.getInstance().getMessageMap();
ConcurrentMap<String, ChunkedGELFMessage> messageMap = gelfClientManager.getMessageMap();

for(Map.Entry<String, ChunkedGELFMessage> entry : messageMap.entrySet()) {
for (ConcurrentMap.Entry<String, ChunkedGELFMessage> entry : messageMap.entrySet()) {

String messageId = entry.getKey();
ChunkedGELFMessage message = entry.getValue();

int fiveSecondsAgo = (int) (System.currentTimeMillis()/1000)-5;
int fiveSecondsAgo = (int) (System.currentTimeMillis() / 1000) - 5;

try {
if (message.getFirstChunkArrival() < fiveSecondsAgo) {
this.dropMessage(messageId, "Did not completely arrive in time.");
dropMessage(messageId, "Did not completely arrive in time.");
}
} catch (EmptyGELFMessageException e) {
// getFirstChunkArrival() did not work because first part did not arrive yet. Drop anyways.
this.dropMessage(messageId, "First chunk did not arrive.");
dropMessage(messageId, "First chunk did not arrive.");
}
}

} catch (Exception e) {
LOG.warn("Error in ChunkedGELFClientManagerThread: " + e.getMessage(), e);
}

// Run every 10 seconds.
try { Thread.sleep(10000); } catch(InterruptedException e) {}
// Run every 10 seconds.
try {
Thread.sleep(RUN_INTERVAL);
} catch (InterruptedException e) {
}
}
}

Expand All @@ -81,7 +92,7 @@ public class ChunkedGELFClientManagerThread extends Thread {
*/
public void dropMessage(String messageId, String reason) {
LOG.info("Dropping incomplete chunked GELF message <" + messageId + "> (" + reason + ")");
ChunkedGELFClientManager.getInstance().dropMessage(messageId);
gelfClientManager.dropMessage(messageId);
}

}

0 comments on commit f9fb83c

Please sign in to comment.