From e59951700d71206586a0f3390cd7601860e5ba25 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 14 Jan 2022 09:34:23 +0100 Subject: [PATCH] One more micrometer AsyncInstrumentRegistry fix --- .../v1_5/AsyncInstrumentRegistry.java | 280 ++++++------------ 1 file changed, 98 insertions(+), 182 deletions(-) diff --git a/instrumentation/micrometer/micrometer-1.5/library/src/main/java/io/opentelemetry/instrumentation/micrometer/v1_5/AsyncInstrumentRegistry.java b/instrumentation/micrometer/micrometer-1.5/library/src/main/java/io/opentelemetry/instrumentation/micrometer/v1_5/AsyncInstrumentRegistry.java index 15b4b24c824e..71afa599b2d7 100644 --- a/instrumentation/micrometer/micrometer-1.5/library/src/main/java/io/opentelemetry/instrumentation/micrometer/v1_5/AsyncInstrumentRegistry.java +++ b/instrumentation/micrometer/micrometer-1.5/library/src/main/java/io/opentelemetry/instrumentation/micrometer/v1_5/AsyncInstrumentRegistry.java @@ -12,10 +12,9 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; import io.opentelemetry.api.metrics.ObservableLongMeasurement; -import io.opentelemetry.instrumentation.api.internal.GuardedBy; import java.lang.ref.WeakReference; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.ToDoubleFunction; import java.util.function.ToLongFunction; @@ -32,23 +31,17 @@ final class AsyncInstrumentRegistry { // OpentelemetryMeterRegistry is GC'd private final WeakReference meter; - // we're always locking lock on the whole instrument map; the add/remove methods aren't called - // that often, so it's probably better to opt for correctness in that case - there is a small - // window between removing a single measurement and removing the whole instrument (if it has no - // more measurements) when potentially a new measurement could be added; a ConcurrentHashMap - // wouldn't be enough in this case + // values from the maps below are never removed - that is because the underlying OpenTelemetry + // async instruments are never removed; if we removed the recorder and tried to register it once + // again OTel would log an error and basically ignore the new callback + // these maps are GC'd together with this AsyncInstrumentRegistry instance - that is, when the + // whole OpenTelemetry Meter gets GC'd - @GuardedBy("gauges") - private final Map gauges = new HashMap<>(); - - @GuardedBy("doubleCounters") - private final Map doubleCounters = new HashMap<>(); - - @GuardedBy("longCounters") - private final Map longCounters = new HashMap<>(); - - @GuardedBy("upDownDoubleCounters") - private final Map upDownDoubleCounters = new HashMap<>(); + private final Map gauges = new ConcurrentHashMap<>(); + private final Map doubleCounters = new ConcurrentHashMap<>(); + private final Map longCounters = new ConcurrentHashMap<>(); + private final Map upDownDoubleCounters = + new ConcurrentHashMap<>(); AsyncInstrumentRegistry(Meter meter) { this.meter = new WeakReference<>(meter); @@ -71,29 +64,22 @@ AsyncMeasurementHandle buildGauge( @Nullable T obj, ToDoubleFunction objMetric) { - synchronized (gauges) { - // use the gauges map as lock for the recorder state - this way all gauge-related mutable - // state will always be accessed in synchronized(gauges) - Object recorderLock = gauges; - - DoubleMeasurementsRecorder recorder = - gauges.computeIfAbsent( - name, - n -> { - DoubleMeasurementsRecorder recorderCallback = - new DoubleMeasurementsRecorder(recorderLock); - otelMeter() - .gaugeBuilder(name) - .setDescription(description) - .setUnit(baseUnit) - .buildWithCallback(recorderCallback); - return recorderCallback; - }); - recorder.addMeasurement( - attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); - - return new AsyncMeasurementHandle(gauges, name, attributes); - } + DoubleMeasurementsRecorder recorder = + gauges.computeIfAbsent( + name, + n -> { + DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder(); + otelMeter() + .gaugeBuilder(name) + .setDescription(description) + .setUnit(baseUnit) + .buildWithCallback(recorderCallback); + return recorderCallback; + }); + recorder.addMeasurement( + attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); + + return new AsyncMeasurementHandle(recorder, attributes); } AsyncMeasurementHandle buildDoubleCounter( @@ -113,30 +99,23 @@ AsyncMeasurementHandle buildDoubleCounter( @Nullable T obj, ToDoubleFunction objMetric) { - synchronized (doubleCounters) { - // use the counters map as lock for the recorder state - this way all double counter-related - // mutable state will always be accessed in synchronized(doubleCounters) - Object recorderLock = doubleCounters; - - DoubleMeasurementsRecorder recorder = - doubleCounters.computeIfAbsent( - name, - n -> { - DoubleMeasurementsRecorder recorderCallback = - new DoubleMeasurementsRecorder(recorderLock); - otelMeter() - .counterBuilder(name) - .setDescription(description) - .setUnit(baseUnit) - .ofDoubles() - .buildWithCallback(recorderCallback); - return recorderCallback; - }); - recorder.addMeasurement( - attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); - - return new AsyncMeasurementHandle(doubleCounters, name, attributes); - } + DoubleMeasurementsRecorder recorder = + doubleCounters.computeIfAbsent( + name, + n -> { + DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder(); + otelMeter() + .counterBuilder(name) + .setDescription(description) + .setUnit(baseUnit) + .ofDoubles() + .buildWithCallback(recorderCallback); + return recorderCallback; + }); + recorder.addMeasurement( + attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); + + return new AsyncMeasurementHandle(recorder, attributes); } AsyncMeasurementHandle buildLongCounter( @@ -147,29 +126,22 @@ AsyncMeasurementHandle buildLongCounter( @Nullable T obj, ToLongFunction objMetric) { - synchronized (longCounters) { - // use the counters map as lock for the recorder state - this way all gauge-related mutable - // state will always be accessed in synchronized(longCounters) - Object recorderLock = longCounters; - - LongMeasurementsRecorder recorder = - longCounters.computeIfAbsent( - name, - n -> { - LongMeasurementsRecorder recorderCallback = - new LongMeasurementsRecorder(recorderLock); - otelMeter() - .counterBuilder(name) - .setDescription(description) - .setUnit(baseUnit) - .buildWithCallback(recorderCallback); - return recorderCallback; - }); - recorder.addMeasurement( - attributes, new LongMeasurementSource(obj, (ToLongFunction) objMetric)); - - return new AsyncMeasurementHandle(longCounters, name, attributes); - } + LongMeasurementsRecorder recorder = + longCounters.computeIfAbsent( + name, + n -> { + LongMeasurementsRecorder recorderCallback = new LongMeasurementsRecorder(); + otelMeter() + .counterBuilder(name) + .setDescription(description) + .setUnit(baseUnit) + .buildWithCallback(recorderCallback); + return recorderCallback; + }); + recorder.addMeasurement( + attributes, new LongMeasurementSource(obj, (ToLongFunction) objMetric)); + + return new AsyncMeasurementHandle(recorder, attributes); } AsyncMeasurementHandle buildUpDownDoubleCounter( @@ -180,30 +152,23 @@ AsyncMeasurementHandle buildUpDownDoubleCounter( T obj, ToDoubleFunction objMetric) { - synchronized (upDownDoubleCounters) { - // use the counters map as lock for the recorder state - this way all double counter-related - // mutable state will always be accessed in synchronized(upDownDoubleCounters) - Object recorderLock = upDownDoubleCounters; - - DoubleMeasurementsRecorder recorder = - upDownDoubleCounters.computeIfAbsent( - name, - n -> { - DoubleMeasurementsRecorder recorderCallback = - new DoubleMeasurementsRecorder(recorderLock); - otelMeter() - .upDownCounterBuilder(name) - .setDescription(description) - .setUnit(baseUnit) - .ofDoubles() - .buildWithCallback(recorderCallback); - return recorderCallback; - }); - recorder.addMeasurement( - attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); - - return new AsyncMeasurementHandle(upDownDoubleCounters, name, attributes); - } + DoubleMeasurementsRecorder recorder = + upDownDoubleCounters.computeIfAbsent( + name, + n -> { + DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder(); + otelMeter() + .upDownCounterBuilder(name) + .setDescription(description) + .setUnit(baseUnit) + .ofDoubles() + .buildWithCallback(recorderCallback); + return recorderCallback; + }); + recorder.addMeasurement( + attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction) objMetric)); + + return new AsyncMeasurementHandle(recorder, attributes); } private Meter otelMeter() { @@ -217,37 +182,14 @@ private Meter otelMeter() { private abstract static class MeasurementsRecorder { - private final Object lock; - - @GuardedBy("lock") - private final Map measurements = new HashMap<>(); - - protected MeasurementsRecorder(Object lock) { - this.lock = lock; - } - - Map copyForRead() { - synchronized (lock) { - return new HashMap<>(measurements); - } - } + final Map measurements = new ConcurrentHashMap<>(); void addMeasurement(Attributes attributes, I info) { - synchronized (lock) { - measurements.put(attributes, info); - } + measurements.put(attributes, info); } void removeMeasurement(Attributes attributes) { - synchronized (lock) { - measurements.remove(attributes); - } - } - - boolean isEmpty() { - synchronized (lock) { - return measurements.isEmpty(); - } + measurements.remove(attributes); } } @@ -255,20 +197,15 @@ private static final class DoubleMeasurementsRecorder extends MeasurementsRecorder implements Consumer { - private DoubleMeasurementsRecorder(Object lock) { - super(lock); - } - @Override public void accept(ObservableDoubleMeasurement measurement) { - copyForRead() - .forEach( - (attributes, gauge) -> { - Object obj = gauge.objWeakRef.get(); - if (obj != null) { - measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes); - } - }); + measurements.forEach( + (attributes, gauge) -> { + Object obj = gauge.objWeakRef.get(); + if (obj != null) { + measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes); + } + }); } } @@ -276,20 +213,15 @@ private static final class LongMeasurementsRecorder extends MeasurementsRecorder implements Consumer { - private LongMeasurementsRecorder(Object lock) { - super(lock); - } - @Override public void accept(ObservableLongMeasurement measurement) { - copyForRead() - .forEach( - (attributes, gauge) -> { - Object obj = gauge.objWeakRef.get(); - if (obj != null) { - measurement.record(gauge.metricFunction.applyAsLong(obj), attributes); - } - }); + measurements.forEach( + (attributes, gauge) -> { + Object obj = gauge.objWeakRef.get(); + if (obj != null) { + measurement.record(gauge.metricFunction.applyAsLong(obj), attributes); + } + }); } } @@ -317,32 +249,16 @@ private LongMeasurementSource(@Nullable Object obj, ToLongFunction metri static final class AsyncMeasurementHandle { - @GuardedBy("instrumentRegistry") - private final Map> instrumentRegistry; - - private final String name; + private final MeasurementsRecorder measurementsRecorder; private final Attributes attributes; - AsyncMeasurementHandle( - Map> instrumentRegistry, - String name, - Attributes attributes) { - this.instrumentRegistry = instrumentRegistry; - this.name = name; + AsyncMeasurementHandle(MeasurementsRecorder measurementsRecorder, Attributes attributes) { + this.measurementsRecorder = measurementsRecorder; this.attributes = attributes; } void remove() { - synchronized (instrumentRegistry) { - MeasurementsRecorder recorder = instrumentRegistry.get(name); - if (recorder != null) { - recorder.removeMeasurement(attributes); - // if this was the last measurement then let's remove the whole recorder - if (recorder.isEmpty()) { - instrumentRegistry.remove(name); - } - } - } + measurementsRecorder.removeMeasurement(attributes); } } }