Skip to content

Commit

Permalink
Removed FunctionStats interface and added FunctionStatus class (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent ee0ba86 commit ab3007f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,33 @@
*/
package org.apache.pulsar.functions.fs;

import lombok.*;
import lombok.experimental.Accessors;

import java.io.Serializable;

/**
* Stats of a running function.
*/
public interface FunctionStats extends Serializable {

/**
* Return total number of messages begin to process.
*
* @return total number of messages begin to process.
*/
long getTotalMsgsProcess();

/**
* Return total number of messages succeed on processing.
*
* @return total number of messages succeed on processing.
*/
long getTotalMsgsProcessSucceed();

/**
* Return total number of messages failed on processing.
*
* @return total number of messages failed on processing.
*/
long getTotalMsgsProcessFailed();
@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
public class FunctionStatus implements Serializable {

private boolean running;

private Exception startupException;

private long numProcessed;

private long numSuccessfullyProcessed;

private long numTimeouts;

private long numUserExceptions;

private long numSystemExceptions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.pulsar.functions.runtime.instance.JavaInstanceConfig;
import org.apache.pulsar.functions.runtime.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.runtime.serde.SerDe;
import org.apache.pulsar.functions.stats.FunctionStatsImpl;
import org.apache.pulsar.functions.stats.FunctionStats;

/**
* A function container implemented using java thread.
Expand Down Expand Up @@ -71,7 +71,7 @@ class ThreadFunctionContainer implements FunctionContainer {
private Consumer sourceConsumer;

// function stats
private final FunctionStatsImpl stats;
private final FunctionStats stats;

ThreadFunctionContainer(JavaInstanceConfig instanceConfig,
int maxBufferedTuples,
Expand All @@ -88,7 +88,7 @@ class ThreadFunctionContainer implements FunctionContainer {
this.id = "fn-" + instanceConfig.getFunctionConfig().getName() + "-instance-" + instanceConfig.getInstanceId();
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stats = new FunctionStatsImpl(
this.stats = new FunctionStats(
id,
client.getConfiguration().getStatsIntervalSeconds(),
client.timer());
Expand Down Expand Up @@ -201,13 +201,13 @@ private void startSourceConsumer() throws Exception {
private void processResult(Message msg, JavaExecutionResult result, long processAt, SerDe serDe) {
if (result.getUserException() != null) {
log.info("Encountered user exception when processing message {}", msg, result.getUserException());
stats.incrementProcessFailure();
stats.incrementUserException();
} else if (result.getSystemException() != null) {
log.info("Encountered system exception when processing message {}", msg, result.getSystemException());
stats.incrementProcessFailure();
stats.incrementSystemException();
} else if (result.getTimeoutException() != null) {
log.info("Timedout when processing message {}", msg, result.getTimeoutException());
stats.incrementProcessFailure();
stats.incrementTimeoutException();
} else if (result.getResult() != null) {
stats.incrementProcessSuccess(System.nanoTime() - processAt);
if (sinkProducer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.fs.FunctionStats;

/**
* Function stats.
*/
@Slf4j
public class FunctionStatsImpl implements FunctionStats, AutoCloseable {
public class FunctionStats implements AutoCloseable {

private static final long serialVersionUID = 1L;

Expand All @@ -43,12 +42,16 @@ public class FunctionStatsImpl implements FunctionStats, AutoCloseable {
private final String functionName;
private final long statsIntervalSeconds;

private final LongAdder numMsgsProcess;
private final LongAdder numMsgsProcessSucceed;
private final LongAdder numMsgsProcessFailed;
private final LongAdder totalMsgsProcess;
private final LongAdder totalMsgsProcessSucceed;
private final LongAdder totalMsgsProcessFailed;
private final LongAdder numProcessed;
private final LongAdder numSuccessfullyProcessed;
private final LongAdder numUserExceptions;
private final LongAdder numSystemExceptions;
private final LongAdder numTimeoutExceptions;
private final LongAdder totalProcessed;
private final LongAdder totalSuccessfullyProcessed;
private final LongAdder totalUserExceptions;
private final LongAdder totalSystemExceptions;
private final LongAdder totalTimeoutExceptions;

private final DoublesSketch ds;

Expand All @@ -57,19 +60,23 @@ public class FunctionStatsImpl implements FunctionStats, AutoCloseable {
private final TimerTask statsTask;
private Timeout statsTimeout;

public FunctionStatsImpl(String functionName,
long statsIntervalSeconds,
Timer timer) {
public FunctionStats(String functionName,
long statsIntervalSeconds,
Timer timer) {
this.functionName = functionName;
this.statsIntervalSeconds = statsIntervalSeconds;
this.timer = timer;

this.totalMsgsProcess = new LongAdder();
this.totalMsgsProcessFailed = new LongAdder();
this.totalMsgsProcessSucceed = new LongAdder();
this.numMsgsProcess = new LongAdder();
this.numMsgsProcessFailed = new LongAdder();
this.numMsgsProcessSucceed = new LongAdder();
this.totalProcessed = new LongAdder();
this.totalSuccessfullyProcessed = new LongAdder();
this.totalUserExceptions = new LongAdder();
this.totalSystemExceptions = new LongAdder();
this.totalTimeoutExceptions = new LongAdder();
this.numProcessed = new LongAdder();
this.numSuccessfullyProcessed = new LongAdder();
this.numUserExceptions = new LongAdder();
this.numSystemExceptions = new LongAdder();
this.numTimeoutExceptions = new LongAdder();
this.ds = DoublesSketch.builder().build(256);

this.statsTask = initializeTimerTask();
Expand All @@ -92,21 +99,26 @@ private TimerTask initializeTimerTask() {
double elapsed = (now - oldTime) / 1e9;
oldTime = now;

long currentNumMsgsProcessed = numMsgsProcess.sumThenReset();
long currentNumMsgsProcessFailed = numMsgsProcessFailed.sumThenReset();
long currentNumMsgsProcessSucceed = numMsgsProcessSucceed.sumThenReset();
long currentNumMsgsProcessed = numProcessed.sumThenReset();
long currentNumMsgsProcessSucceed = numSuccessfullyProcessed.sumThenReset();
long currentNumMsgsUserExceptions = numUserExceptions.sumThenReset();
long currentNumMsgsSystemExceptions = numSystemExceptions.sumThenReset();
long currentNumMsgsTimeoutExceptions = numTimeoutExceptions.sumThenReset();

totalMsgsProcess.add(currentNumMsgsProcessed);
totalMsgsProcessFailed.add(currentNumMsgsProcessFailed);
totalMsgsProcessSucceed.add(currentNumMsgsProcessSucceed);
totalProcessed.add(currentNumMsgsProcessed);
totalSuccessfullyProcessed.add(currentNumMsgsProcessSucceed);
totalUserExceptions.add(currentNumMsgsUserExceptions);
totalSystemExceptions.add(currentNumMsgsSystemExceptions);
totalTimeoutExceptions.add(currentNumMsgsTimeoutExceptions);

double[] percentileValues;
synchronized (ds) {
percentileValues = ds.getQuantiles(percentiles);
ds.reset();
}

if ((currentNumMsgsProcessed | currentNumMsgsProcessFailed | currentNumMsgsProcessSucceed) != 0) {
if ((currentNumMsgsProcessed | currentNumMsgsUserExceptions | currentNumMsgsProcessSucceed
| currentNumMsgsSystemExceptions | currentNumMsgsTimeoutExceptions) != 0) {

for (int i = 0; i < percentileValues.length; i++) {
if (percentileValues[i] == Double.NaN) {
Expand All @@ -123,7 +135,8 @@ private TimerTask initializeTimerTask() {
dec.format(percentileValues[0] / 1000.0), dec.format(percentileValues[1] / 1000.0),
dec.format(percentileValues[2] / 1000.0), dec.format(percentileValues[3] / 1000.0),
dec.format(percentileValues[4] / 1000.0),
throughputFormat.format(currentNumMsgsProcessSucceed / elapsed), currentNumMsgsProcessFailed);
throughputFormat.format(currentNumMsgsProcessSucceed / elapsed),
currentNumMsgsUserExceptions + currentNumMsgsSystemExceptions + currentNumMsgsTimeoutExceptions);
}
} catch (Exception e) {
log.error("[{}]: {}", functionName, e.getMessage());
Expand Down Expand Up @@ -154,38 +167,46 @@ public void close() {
// Internal use only
//

public void incrementProcessFailure() {
this.numMsgsProcessFailed.increment();
public void incrementUserException() {
this.numUserExceptions.increment();
}

public void incrementSystemException() {
this.numSystemExceptions.increment();
}

public void incrementTimeoutException() {
this.numTimeoutExceptions.increment();
}

public void incrementProcessSuccess(long latencyNs) {
this.numMsgsProcessSucceed.increment();
this.numSuccessfullyProcessed.increment();
synchronized (ds) {
ds.update(TimeUnit.NANOSECONDS.toMicros(latencyNs));
}
}

public void incrementProcess() {
this.numMsgsProcess.increment();
this.numProcessed.increment();
}

public long getTotalProcessed() {
return totalProcessed.longValue();
}

//
// User Public API
//
public long getTotalUserExceptions() {
return totalUserExceptions.longValue();
}

@Override
public long getTotalMsgsProcess() {
return totalMsgsProcess.longValue();
public long getTotalSystemExceptions() {
return totalSystemExceptions.longValue();
}

@Override
public long getTotalMsgsProcessFailed() {
return totalMsgsProcessFailed.longValue();
public long getTotalTimeoutExceptions() {
return totalTimeoutExceptions.longValue();
}

@Override
public long getTotalMsgsProcessSucceed() {
return totalMsgsProcessSucceed.longValue();
public long getTotalSuccessfullyProcessed() {
return totalSuccessfullyProcessed.longValue();
}
}

0 comments on commit ab3007f

Please sign in to comment.