Skip to content

Commit

Permalink
Merge pull request #32 from benjchristensen/servo-eventstream
Browse files Browse the repository at this point in the history
servo-event-stream module
  • Loading branch information
benjchristensen committed Dec 4, 2012
2 parents 505b1d9 + d66ce66 commit a476fee
Show file tree
Hide file tree
Showing 7 changed files with 988 additions and 1 deletion.
6 changes: 6 additions & 0 deletions hystrix-contrib/hystrix-servo-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apply plugin: 'java'
dependencies {
compile project(':hystrix-core')
compile 'javax.servlet:javax.servlet-api:3.0.1'
compile 'org.json:json:20090211'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.netflix.hystrix.contrib.servostream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import javax.servlet.http.HttpServletResponse;

import org.json.JSONException;
import org.json.JSONObject;

import com.netflix.servo.Metric;
import com.netflix.servo.publish.MetricObserver;
import com.netflix.servo.tag.Tag;
import com.netflix.servo.tag.TagList;

/**
* Groups Servo metrics into a MetricGroup and then writes them to HttpServletResponse in text/event-stream format.
*/
public class HystrixEventStreamMetricsObserver implements MetricObserver {

private final HashMap<String, MetricGroup> metricsGroups = new HashMap<String, MetricGroup>();
private final HashMap<String, String> metricsCache = new HashMap<String, String>();
private final String CurrentTime = "currentTime";
private final HttpServletResponse response;
private volatile boolean isRunning = true;

HystrixEventStreamMetricsObserver(HttpServletResponse response) {
this.response = response;
}

public boolean isRunning() {
return isRunning;
}

@Override
public void update(List<Metric> metrics) {
try {
List<JSONObject> groupedJson = getServoMetricsGroupedAsJson(metrics);

for (JSONObject json : groupedJson) {
response.getWriter().println("data: " + json.toString() + "\n");
}
response.flushBuffer();
} catch (Exception e) {
HystrixServoPoller.logger.error("Failed to write metric group.", e);
// the servlet itself will handle closing the connection using 'isRunning'
isRunning = false;
}
}

@Override
public String getName() {
return HystrixEventStreamMetricsObserver.class.getSimpleName();
}

private List<JSONObject> getServoMetricsGroupedAsJson(List<Metric> metrics) {
List<JSONObject> events = new ArrayList<JSONObject>();

for (Metric metric : metrics) {

String type = null;
String id = null;
TagList tagList = metric.getConfig().getTags();
if (tagList != null) {
Tag tag = tagList.getTag("type");
if (tag != null) {
type = tag.getValue();
}
tag = tagList.getTag("instance");
if (tag != null) {
id = tag.getValue();
}
}

String cacheKey = type + id;

MetricGroup group = metricsGroups.get(cacheKey);
if (group == null) {
group = new MetricGroup(type, id);
metricsGroups.put(cacheKey, group);
}

try {
group.addMetricFields(metric);
} catch (Throwable t) {
HystrixServoPoller.logger.error("Caught failure when adding metric: " + metric + " to group: " + group, t);
}
}

for (MetricGroup mg : metricsGroups.values()) {
try {
if (mg.isDirty()) {
// ok we have data in the metric group e.g HystrixCommand : CinematchGetRatings
// but we should check with a cache and see if the metric has changed. Do not send data that has not changed

JSONObject json = mg.getJsonObject();

long currentTime = -1;
if (json.has(CurrentTime)) {
currentTime = (Long) json.remove(CurrentTime);
}
String jsonString = json.toString();
String cacheKey = mg.getCacheKey();
String prev = metricsCache.get(cacheKey);

if (prev == null || (prev != null && !prev.equals(jsonString))) {
metricsCache.put(cacheKey, jsonString);
if (currentTime != -1) {
json.put(CurrentTime, currentTime);
}
events.add(json);
}

mg.clear();
}
} catch (JSONException e) {
HystrixServoPoller.logger.error("Caught failure when getting json from group: " + mg, e);
}
}
return events;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.netflix.hystrix.contrib.servostream;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;

/**
* Streams Hystrix metrics in text/event-stream format.
*/
public class HystrixMetricsStreamServlet extends HttpServlet {

private static final long serialVersionUID = -7548505095303313237L;

private static final Logger logger = LoggerFactory.getLogger(HystrixMetricsStreamServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.servo.stream.maxConcurrentConnections", 5);

/**
* Handle incoming GETs
*/
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
handleRequest(request, response);
}

/**
* - maintain an open connection with the client
* - on initial connection send latest data of each requested event type
* - subsequently send all changes for each requested event type
*
* @param request
* @param response
* @throws javax.servlet.ServletException
* @throws java.io.IOException
*/
private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
/* wrap so we synchronize writes since the response object will be shared across multiple threads for async writing */
response = new SynchronizedHttpServletResponse(response);

/* ensure we aren't allowing more connections than we want */
int numberConnections = concurrentConnections.incrementAndGet();

int delay = 500;
try {
String d = request.getParameter("delay");
if (d != null) {
delay = Integer.parseInt(d);
}
} catch (Exception e) {
// ignore if it's not a number
}

HystrixServoPoller poller = null;
try {
if (numberConnections > maxConcurrentConnections.get()) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get());
} else {

/* initialize response */
response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

poller = new HystrixServoPoller(delay);
// start polling and it will write directly to the output stream
HystrixEventStreamMetricsObserver observer = new HystrixEventStreamMetricsObserver(response);
poller.start(observer);
logger.info("Starting poller");

try {
while (true && observer.isRunning()) {
response.getWriter().println(":ping\n");
response.flushBuffer();
Thread.sleep(2000);
}
} catch (Exception e) {
// do nothing on interruptions.
logger.error("Failed to write", e);
}
logger.error("Stopping Turbine stream to connection");
}
} catch (Exception e) {
logger.error("Error initializing servlet for Servo event stream.", e);
} finally {
concurrentConnections.decrementAndGet();
if (poller != null) {
poller.stop();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.netflix.hystrix.contrib.servostream;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.publish.MetricFilter;
import com.netflix.servo.publish.MonitorRegistryMetricPoller;
import com.netflix.servo.publish.PollRunnable;
import com.netflix.servo.tag.Tag;
import com.netflix.servo.tag.TagList;

/**
* Polls Servo for Hystrix metrics and sends them to a MetricsObserver.
*/
public class HystrixServoPoller {

static final Logger logger = LoggerFactory.getLogger(HystrixServoPoller.class);
private final MonitorRegistryMetricPoller monitorPoller;
private final ScheduledExecutorService executor;
private final int delay;

public HystrixServoPoller(int delay) {
executor = new ScheduledThreadPoolExecutor(1, new TurbineMetricsPollerThreadFactory());
monitorPoller = new MonitorRegistryMetricPoller(DefaultMonitorRegistry.getInstance(), 1, TimeUnit.MINUTES, false);
this.delay = delay;
}

public synchronized void start(HystrixEventStreamMetricsObserver observer) {
logger.info("Starting HystrixServoPoller");
PollRunnable task = new PollRunnable(monitorPoller, new HystrixMetricFilter(), observer);
executor.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
}

public synchronized void stop() {
logger.info("Stopping the Servo Metrics Poller");
executor.shutdownNow();
if (monitorPoller != null) {
monitorPoller.shutdown();
}
}

private class TurbineMetricsPollerThreadFactory implements ThreadFactory {
private static final String MetricsThreadName = "ServoMetricPoller";

private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

public Thread newThread(Runnable r) {
Thread thread = defaultFactory.newThread(r);
thread.setName(MetricsThreadName);
return thread;
}
}

private class HystrixMetricFilter implements MetricFilter {

private HystrixMetricFilter() {
}

@Override
public boolean matches(MonitorConfig mConfig) {

TagList tagList = mConfig.getTags();
if (tagList != null) {
Tag classTag = tagList.getTag("type");
logger.info("HystrixMetricFilter matches: " + classTag);
if (classTag == null) {
return false;
}
if (classTag.getValue().startsWith("Hystrix")) {
return true;
}
}

return false;
}
}

}
Loading

0 comments on commit a476fee

Please sign in to comment.