Skip to content

Commit

Permalink
DE-6324: New approach: DecodableMetrics group in example repo
Browse files Browse the repository at this point in the history
  • Loading branch information
maxf-decodable committed Mar 28, 2024
1 parent 01c5980 commit 59cada1
Showing 1 changed file with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand Down Expand Up @@ -63,16 +67,22 @@ public static class NameConverter extends RichMapFunction<String, String> {
private static final long serialVersionUID = 1L;

private transient ObjectMapper mapper;
private Counter recordsProcessed;

@Override
public void open(Configuration parameters) throws Exception {
mapper = new ObjectMapper();
recordsProcessed = getRuntimeContext()
.getMetricGroup()
.addGroup("DecodableMetrics")
.counter("recordsProcessed", new SimpleCounter());
}

@Override
public String map(String value) throws Exception {
ObjectNode purchaseOrder = (ObjectNode) mapper.readTree(value);
purchaseOrder.put("customer_name", purchaseOrder.get("customer_name").asText().toUpperCase());
recordsProcessed.inc();
return mapper.writeValueAsString(purchaseOrder);
}
}
Expand Down

0 comments on commit 59cada1

Please sign in to comment.