2323import java .util .Collections ;
2424import java .util .HashMap ;
2525import java .util .HashSet ;
26- import java .util .Iterator ;
2726import java .util .List ;
2827import java .util .Map ;
2928import java .util .Objects ;
30- import java .util .Optional ;
3129import java .util .Set ;
3230import java .util .concurrent .TimeUnit ;
3331import java .util .function .BiConsumer ;
3432import java .util .function .Consumer ;
33+ import java .util .stream .Collectors ;
34+
3535import org .elasticsearch .ElasticsearchParseException ;
3636import org .elasticsearch .ExceptionsHelper ;
3737import org .elasticsearch .ResourceNotFoundException ;
4949import org .elasticsearch .cluster .metadata .MetaData ;
5050import org .elasticsearch .cluster .node .DiscoveryNode ;
5151import org .elasticsearch .cluster .service .ClusterService ;
52- import org .elasticsearch .common .metrics .CounterMetric ;
53- import org .elasticsearch .common .metrics .MeanMetric ;
5452import org .elasticsearch .common .regex .Regex ;
5553import org .elasticsearch .common .unit .TimeValue ;
5654import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
@@ -79,8 +77,7 @@ public class IngestService implements ClusterStateApplier {
7977 // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
8078 private volatile Map <String , Pipeline > pipelines = new HashMap <>();
8179 private final ThreadPool threadPool ;
82- private final StatsHolder totalStats = new StatsHolder ();
83- private volatile Map <String , StatsHolder > statsHolderPerPipeline = Collections .emptyMap ();
80+ private final IngestMetric totalMetrics = new IngestMetric ();
8481
8582 public IngestService (ClusterService clusterService , ThreadPool threadPool ,
8683 Environment env , ScriptService scriptService , AnalysisRegistry analysisRegistry ,
@@ -257,10 +254,16 @@ Map<String, Pipeline> pipelines() {
257254 @ Override
258255 public void applyClusterState (final ClusterChangedEvent event ) {
259256 ClusterState state = event .state ();
257+ Map <String , Pipeline > originalPipelines = pipelines ;
260258 innerUpdatePipelines (event .previousState (), state );
261- IngestMetadata ingestMetadata = state .getMetaData ().custom (IngestMetadata .TYPE );
262- if (ingestMetadata != null ) {
263- updatePipelineStats (ingestMetadata );
259+ //pipelines changed, so add the old metrics to the new metrics
260+ if (originalPipelines != pipelines ) {
261+ pipelines .forEach ((id , pipeline ) -> {
262+ Pipeline originalPipeline = originalPipelines .get (id );
263+ if (originalPipeline != null ) {
264+ pipeline .getMetrics ().add (originalPipeline .getMetrics ());
265+ }
266+ });
264267 }
265268 }
266269
@@ -325,6 +328,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
325328 public void executeBulkRequest (Iterable <DocWriteRequest <?>> actionRequests ,
326329 BiConsumer <IndexRequest , Exception > itemFailureHandler , Consumer <Exception > completionHandler ,
327330 Consumer <IndexRequest > itemDroppedHandler ) {
331+
328332 threadPool .executor (ThreadPool .Names .WRITE ).execute (new AbstractRunnable () {
329333
330334 @ Override
@@ -367,37 +371,11 @@ protected void doRun() {
367371 }
368372
369373 public IngestStats stats () {
370- Map <String , StatsHolder > statsHolderPerPipeline = this .statsHolderPerPipeline ;
371374
372- Map <String , IngestStats .Stats > statsPerPipeline = new HashMap <>(statsHolderPerPipeline .size ());
373- for (Map .Entry <String , StatsHolder > entry : statsHolderPerPipeline .entrySet ()) {
374- statsPerPipeline .put (entry .getKey (), entry .getValue ().createStats ());
375- }
375+ Map <String , IngestStats .Stats > statsPerPipeline =
376+ pipelines .entrySet ().stream ().collect (Collectors .toMap (Map .Entry ::getKey , v -> v .getValue ().getMetrics ().createStats ()));
376377
377- return new IngestStats (totalStats .createStats (), statsPerPipeline );
378- }
379-
380- void updatePipelineStats (IngestMetadata ingestMetadata ) {
381- boolean changed = false ;
382- Map <String , StatsHolder > newStatsPerPipeline = new HashMap <>(statsHolderPerPipeline );
383- Iterator <String > iterator = newStatsPerPipeline .keySet ().iterator ();
384- while (iterator .hasNext ()) {
385- String pipeline = iterator .next ();
386- if (ingestMetadata .getPipelines ().containsKey (pipeline ) == false ) {
387- iterator .remove ();
388- changed = true ;
389- }
390- }
391- for (String pipeline : ingestMetadata .getPipelines ().keySet ()) {
392- if (newStatsPerPipeline .containsKey (pipeline ) == false ) {
393- newStatsPerPipeline .put (pipeline , new StatsHolder ());
394- changed = true ;
395- }
396- }
397-
398- if (changed ) {
399- statsHolderPerPipeline = Collections .unmodifiableMap (newStatsPerPipeline );
400- }
378+ return new IngestStats (totalMetrics .createStats (), statsPerPipeline );
401379 }
402380
403381 private void innerExecute (IndexRequest indexRequest , Pipeline pipeline , Consumer <IndexRequest > itemDroppedHandler ) throws Exception {
@@ -408,10 +386,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
408386 long startTimeInNanos = System .nanoTime ();
409387 // the pipeline specific stat holder may not exist and that is fine:
410388 // (e.g. the pipeline may have been removed while we're ingesting a document
411- Optional <StatsHolder > pipelineStats = Optional .ofNullable (statsHolderPerPipeline .get (pipeline .getId ()));
412389 try {
413- totalStats .preIngest ();
414- pipelineStats .ifPresent (StatsHolder ::preIngest );
390+ totalMetrics .preIngest ();
415391 String index = indexRequest .index ();
416392 String type = indexRequest .type ();
417393 String id = indexRequest .id ();
@@ -437,13 +413,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
437413 indexRequest .source (ingestDocument .getSourceAndMetadata ());
438414 }
439415 } catch (Exception e ) {
440- totalStats .ingestFailed ();
441- pipelineStats .ifPresent (StatsHolder ::ingestFailed );
416+ totalMetrics .ingestFailed ();
442417 throw e ;
443418 } finally {
444419 long ingestTimeInMillis = TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - startTimeInNanos );
445- totalStats .postIngest (ingestTimeInMillis );
446- pipelineStats .ifPresent (statsHolder -> statsHolder .postIngest (ingestTimeInMillis ));
420+ totalMetrics .postIngest (ingestTimeInMillis );
447421 }
448422 }
449423
@@ -480,27 +454,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state
480454 ExceptionsHelper .rethrowAndSuppress (exceptions );
481455 }
482456
483- private static class StatsHolder {
484-
485- private final MeanMetric ingestMetric = new MeanMetric ();
486- private final CounterMetric ingestCurrent = new CounterMetric ();
487- private final CounterMetric ingestFailed = new CounterMetric ();
488-
489- void preIngest () {
490- ingestCurrent .inc ();
491- }
492-
493- void postIngest (long ingestTimeInMillis ) {
494- ingestCurrent .dec ();
495- ingestMetric .inc (ingestTimeInMillis );
496- }
497-
498- void ingestFailed () {
499- ingestFailed .inc ();
500- }
501-
502- IngestStats .Stats createStats () {
503- return new IngestStats .Stats (ingestMetric .count (), ingestMetric .sum (), ingestCurrent .count (), ingestFailed .count ());
504- }
505- }
506457}
0 commit comments