Skip to content

Commit 92ae4c6

Browse files
prateekmjagadish-northguard
authored andcommitted
SAMZA-1219; Add metrics for operator message received and execution times
Author: Prateek Maheshwari <pmaheshw@linkedin.com> Reviewers: Jagadish <jagadish@apache.org> Closes apache#142 from prateekm/operator-metrics
1 parent f7e1736 commit 92ae4c6

20 files changed

+547
-259
lines changed

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java

Lines changed: 113 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,19 @@
1818
*/
1919
package org.apache.samza.operators.impl;
2020

21+
import org.apache.samza.config.Config;
22+
import org.apache.samza.config.MetricsConfig;
23+
import org.apache.samza.metrics.Counter;
24+
import org.apache.samza.metrics.MetricsRegistry;
25+
import org.apache.samza.metrics.Timer;
26+
import org.apache.samza.operators.spec.OperatorSpec;
2127
import org.apache.samza.task.MessageCollector;
28+
import org.apache.samza.task.TaskContext;
2229
import org.apache.samza.task.TaskCoordinator;
30+
import org.apache.samza.util.HighResolutionClock;
2331

32+
import java.util.Collection;
33+
import java.util.Collections;
2434
import java.util.HashSet;
2535
import java.util.Set;
2636

@@ -29,60 +39,140 @@
2939
* Abstract base class for all stream operator implementations.
3040
*/
3141
public abstract class OperatorImpl<M, RM> {
42+
private static final String METRICS_GROUP = OperatorImpl.class.getName();
3243

33-
private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
44+
private boolean initialized;
45+
private Set<OperatorImpl<RM, ?>> registeredOperators;
46+
private HighResolutionClock highResClock;
47+
private Counter numMessage;
48+
private Timer handleMessageNs;
49+
private Timer handleTimerNs;
3450

3551
/**
36-
* Register the next operator in the chain that this operator should propagate its output to.
37-
* @param nextOperator the next operator in the chain.
52+
* Initialize this {@link OperatorImpl} and its user-defined functions.
53+
*
54+
* @param config the {@link Config} for the task
55+
* @param context the {@link TaskContext} for the task
56+
*/
57+
public final void init(Config config, TaskContext context) {
58+
String opName = getOperatorSpec().getOpName();
59+
60+
if (initialized) {
61+
throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
62+
}
63+
64+
this.highResClock = createHighResClock(config);
65+
registeredOperators = new HashSet<>();
66+
MetricsRegistry metricsRegistry = context.getMetricsRegistry();
67+
this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
68+
this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
69+
this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
70+
71+
handleInit(config, context);
72+
73+
initialized = true;
74+
}
75+
76+
/**
77+
* Initialize this {@link OperatorImpl} and its user-defined functions.
78+
*
79+
* @param config the {@link Config} for the task
80+
* @param context the {@link TaskContext} for the task
81+
*/
82+
protected abstract void handleInit(Config config, TaskContext context);
83+
84+
/**
85+
* Register an operator that this operator should propagate its results to.
86+
*
87+
* @param nextOperator the next operator to propagate results to
3888
*/
3989
void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
40-
nextOperators.add(nextOperator);
90+
if (!initialized) {
91+
throw new IllegalStateException(
92+
String.format("Attempted to register next operator before initializing operator %s.",
93+
getOperatorSpec().getOpName()));
94+
}
95+
this.registeredOperators.add(nextOperator);
4196
}
4297

4398
/**
44-
* Perform the transformation required for this operator and call the downstream operators.
99+
* Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators.
100+
* <p>
101+
* Delegates to {@link #handleMessage(Object, MessageCollector, TaskCoordinator)} for handling the message.
45102
*
46-
* Must call {@link #propagateResult} to propagate the output to registered downstream operators correctly.
103+
* @param message the input message
104+
* @param collector the {@link MessageCollector} for this message
105+
* @param coordinator the {@link TaskCoordinator} for this message
106+
*/
107+
public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
108+
this.numMessage.inc();
109+
long startNs = this.highResClock.nanoTime();
110+
Collection<RM> results = handleMessage(message, collector, coordinator);
111+
long endNs = this.highResClock.nanoTime();
112+
this.handleMessageNs.update(endNs - startNs);
113+
114+
results.forEach(rm ->
115+
this.registeredOperators.forEach(op ->
116+
op.onMessage(rm, collector, coordinator)));
117+
}
118+
119+
/**
120+
* Handle the incoming {@code message} and return the results to be propagated to registered operators.
47121
*
48122
* @param message the input message
49123
* @param collector the {@link MessageCollector} in the context
50124
* @param coordinator the {@link TaskCoordinator} in the context
125+
* @return results of the transformation
51126
*/
52-
public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
127+
protected abstract Collection<RM> handleMessage(M message, MessageCollector collector,
128+
TaskCoordinator coordinator);
53129

54130
/**
55-
* Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)}
131+
* Handle timer ticks for this {@link OperatorImpl} and propagate the results and timer tick to registered operators.
132+
* <p>
133+
* Delegates to {@link #handleTimer(MessageCollector, TaskCoordinator)} for handling the timer tick.
56134
*
57135
* @param collector the {@link MessageCollector} in the context
58136
* @param coordinator the {@link TaskCoordinator} in the context
59137
*/
60-
public final void onTick(MessageCollector collector, TaskCoordinator coordinator) {
61-
onTimer(collector, coordinator);
62-
nextOperators.forEach(sub -> sub.onTick(collector, coordinator));
138+
public final void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
139+
long startNs = this.highResClock.nanoTime();
140+
Collection<RM> results = handleTimer(collector, coordinator);
141+
long endNs = this.highResClock.nanoTime();
142+
this.handleTimerNs.update(endNs - startNs);
143+
144+
results.forEach(rm ->
145+
this.registeredOperators.forEach(op ->
146+
op.onMessage(rm, collector, coordinator)));
147+
this.registeredOperators.forEach(op ->
148+
op.onTimer(collector, coordinator));
63149
}
64150

65151
/**
66-
* Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output
67-
* to registered downstream operators.
152+
* Handle the the timer tick for this operator and return the results to be propagated to registered operators.
153+
* <p>
154+
* Defaults to a no-op implementation.
68155
*
69156
* @param collector the {@link MessageCollector} in the context
70157
* @param coordinator the {@link TaskCoordinator} in the context
158+
* @return results of the timed operation
71159
*/
72-
public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
160+
protected Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
161+
return Collections.emptyList();
73162
}
74163

75164
/**
76-
* Helper method to propagate the output of this operator to all registered downstream operators.
77-
*
78-
* This method <b>must</b> be called from {@link #onNext} and {@link #onTimer}
79-
* to propagate the operator output correctly.
165+
* Get the {@link OperatorSpec} for this {@link OperatorImpl}.
80166
*
81-
* @param outputMessage output message
82-
* @param collector the {@link MessageCollector} in the context
83-
* @param coordinator the {@link TaskCoordinator} in the context
167+
* @return the {@link OperatorSpec} for this {@link OperatorImpl}
84168
*/
85-
void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
86-
nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
169+
protected abstract OperatorSpec<RM> getOperatorSpec();
170+
171+
private HighResolutionClock createHighResClock(Config config) {
172+
if (new MetricsConfig(config).getMetricsTimerEnabled()) {
173+
return System::nanoTime;
174+
} else {
175+
return () -> 0;
176+
}
87177
}
88178
}

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,22 @@ public Collection<RootOperatorImpl> getAllRootOperators() {
104104
* creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
105105
*
106106
* @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
107-
* @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl}
108107
* @param config the {@link Config} required to instantiate operators
109108
* @param context the {@link TaskContext} required to instantiate operators
109+
* @param <M> the type of messages in the {@code source} {@link MessageStreamImpl}
110110
* @return root node for the {@link OperatorImpl} DAG
111111
*/
112-
private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) {
112+
private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
113+
Config config, TaskContext context) {
113114
// since the source message stream might have multiple operator specs registered on it,
114115
// create a new root node as a single point of entry for the DAG.
115116
RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
117+
rootOperator.init(config, context);
116118
// create the pipeline/topology starting from the source
117119
source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
118-
// pass in the source and context s.t. stateful stream operators can initialize their stores
120+
// pass in the context so that operator implementations can initialize their functions
119121
OperatorImpl<M, ?> operatorImpl =
120-
createAndRegisterOperatorImpl(registeredOperator, source, config, context);
122+
createAndRegisterOperatorImpl(registeredOperator, config, context);
121123
rootOperator.registerNextOperator(operatorImpl);
122124
});
123125
return rootOperator;
@@ -127,27 +129,26 @@ private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
127129
* Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
128130
* {@link OperatorImpl}s.
129131
*
130-
* @param operatorSpec the operatorSpec registered with the {@code source}
131-
* @param source the source {@link MessageStreamImpl}
132-
* @param <M> type of input message
132+
* @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for
133133
* @param config the {@link Config} required to instantiate operators
134134
* @param context the {@link TaskContext} required to instantiate operators
135+
* @param <M> type of input message
135136
* @return the operator implementation for the operatorSpec
136137
*/
137138
private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
138-
MessageStreamImpl<M> source, Config config, TaskContext context) {
139+
Config config, TaskContext context) {
139140
if (!operatorImpls.containsKey(operatorSpec)) {
140-
OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
141+
OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context);
141142
if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
142143
// this is the first time we've added the operatorImpl corresponding to the operatorSpec,
143144
// so traverse and initialize and register the rest of the DAG.
144145
// initialize the corresponding operator function
145-
operatorSpec.init(config, context);
146+
operatorImpl.init(config, context);
146147
MessageStreamImpl nextStream = operatorSpec.getNextStream();
147148
if (nextStream != null) {
148149
Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
149150
registeredSpecs.forEach(registeredSpec -> {
150-
OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
151+
OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context);
151152
operatorImpl.registerNextOperator(subImpl);
152153
});
153154
}
@@ -163,24 +164,21 @@ private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
163164
/**
164165
* Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
165166
*
166-
* @param source the source {@link MessageStreamImpl}
167-
* @param <M> type of input message
168167
* @param operatorSpec the immutable {@link OperatorSpec} definition.
169168
* @param config the {@link Config} required to instantiate operators
170169
* @param context the {@link TaskContext} required to instantiate operators
170+
* @param <M> type of input message
171171
* @return the {@link OperatorImpl} implementation instance
172172
*/
173-
private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source,
174-
OperatorSpec operatorSpec, Config config, TaskContext context) {
173+
private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) {
175174
if (operatorSpec instanceof StreamOperatorSpec) {
176-
StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
177-
return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
175+
return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context);
178176
} else if (operatorSpec instanceof SinkOperatorSpec) {
179177
return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
180178
} else if (operatorSpec instanceof WindowOperatorSpec) {
181179
return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
182180
} else if (operatorSpec instanceof PartialJoinOperatorSpec) {
183-
return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context, clock);
181+
return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock);
184182
}
185183
throw new IllegalArgumentException(
186184
String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));

samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
*/
1919
package org.apache.samza.operators.impl;
2020

21-
import java.util.ArrayList;
22-
import java.util.List;
2321
import org.apache.samza.config.Config;
24-
import org.apache.samza.operators.MessageStreamImpl;
22+
import org.apache.samza.metrics.Counter;
2523
import org.apache.samza.operators.functions.PartialJoinFunction;
2624
import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
25+
import org.apache.samza.operators.spec.OperatorSpec;
2726
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
2827
import org.apache.samza.storage.kv.Entry;
2928
import org.apache.samza.storage.kv.KeyValueIterator;
@@ -35,6 +34,11 @@
3534
import org.slf4j.Logger;
3635
import org.slf4j.LoggerFactory;
3736

37+
import java.util.ArrayList;
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
import java.util.List;
41+
3842
/**
3943
* Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
4044
* with buffered messages of type {@code JM} in the other stream.
@@ -47,35 +51,45 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
4751

4852
private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
4953

54+
private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
5055
private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
5156
private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
5257
private final long ttlMs;
53-
private final int opId;
5458
private final Clock clock;
5559

56-
PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, MessageStreamImpl<M> source,
60+
private Counter keysRemoved;
61+
62+
PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec,
5763
Config config, TaskContext context, Clock clock) {
58-
this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
59-
this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
60-
this.ttlMs = partialJoinOperatorSpec.getTtlMs();
61-
this.opId = partialJoinOperatorSpec.getOpId();
64+
this.partialJoinOpSpec = partialJoinOpSpec;
65+
this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn();
66+
this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn();
67+
this.ttlMs = partialJoinOpSpec.getTtlMs();
6268
this.clock = clock;
6369
}
6470

6571
@Override
66-
public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
72+
protected void handleInit(Config config, TaskContext context) {
73+
keysRemoved = context.getMetricsRegistry()
74+
.newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
75+
this.thisPartialJoinFn.init(config, context);
76+
}
77+
78+
@Override
79+
public Collection<RM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
6780
K key = thisPartialJoinFn.getKey(message);
6881
thisPartialJoinFn.getState().put(key, new PartialJoinMessage<>(message, clock.currentTimeMillis()));
6982
PartialJoinMessage<JM> otherMessage = otherPartialJoinFn.getState().get(key);
7083
long now = clock.currentTimeMillis();
7184
if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) {
7285
RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage());
73-
this.propagateResult(joinResult, collector, coordinator);
86+
return Collections.singletonList(joinResult);
7487
}
88+
return Collections.emptyList();
7589
}
7690

7791
@Override
78-
public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
92+
public Collection<RM> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
7993
long now = clock.currentTimeMillis();
8094

8195
KeyValueStore<K, PartialJoinMessage<M>> thisState = thisPartialJoinFn.getState();
@@ -87,14 +101,18 @@ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
87101
if (entry.getValue().getReceivedTimeMs() < now - ttlMs) {
88102
keysToRemove.add(entry.getKey());
89103
} else {
90-
break;
104+
break; // InternalInMemoryStore uses a LinkedHashMap and will return entries in insertion order
91105
}
92106
}
93107

94108
iterator.close();
95109
thisState.deleteAll(keysToRemove);
96-
97-
LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, clock.currentTimeMillis() - now);
110+
keysRemoved.inc(keysToRemove.size());
111+
return Collections.emptyList();
98112
}
99113

114+
@Override
115+
protected OperatorSpec<RM> getOperatorSpec() {
116+
return partialJoinOpSpec;
117+
}
100118
}

0 commit comments

Comments
 (0)