39
39
import io .opentelemetry .sdk .metrics .InstrumentType ;
40
40
import io .opentelemetry .sdk .metrics .data .AggregationTemporality ;
41
41
import io .opentelemetry .sdk .metrics .data .MetricData ;
42
+ import io .opentelemetry .sdk .metrics .data .PointData ;
42
43
import io .opentelemetry .sdk .metrics .export .MetricExporter ;
43
44
import java .io .IOException ;
44
45
import java .time .Duration ;
49
50
import java .util .logging .Level ;
50
51
import java .util .logging .Logger ;
51
52
import java .util .stream .Collectors ;
53
+ import javax .annotation .Nonnull ;
52
54
import javax .annotation .Nullable ;
53
55
54
56
/**
@@ -66,7 +68,7 @@ class SpannerCloudMonitoringExporter implements MetricExporter {
66
68
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
67
69
private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
68
70
private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean (false );
69
- private CompletableResultCode lastExportCode ;
71
+ private final AtomicBoolean lastExportSkippedData = new AtomicBoolean ( false ) ;
70
72
private final MetricServiceClient client ;
71
73
private final String spannerProjectId ;
72
74
@@ -101,44 +103,49 @@ static SpannerCloudMonitoringExporter create(
101
103
}
102
104
103
105
@ Override
104
- public CompletableResultCode export (Collection <MetricData > collection ) {
106
+ public CompletableResultCode export (@ Nonnull Collection <MetricData > collection ) {
105
107
if (client .isShutdown ()) {
106
108
logger .log (Level .WARNING , "Exporter is shut down" );
107
109
return CompletableResultCode .ofFailure ();
108
110
}
109
111
110
- this .lastExportCode = exportSpannerClientMetrics (collection );
111
- return lastExportCode ;
112
+ return exportSpannerClientMetrics (collection );
112
113
}
113
114
114
115
/** Export client built in metrics */
115
116
private CompletableResultCode exportSpannerClientMetrics (Collection <MetricData > collection ) {
116
- // Filter spanner metrics
117
+ // Filter spanner metrics. Only include metrics that contain a project and instance ID.
117
118
List <MetricData > spannerMetricData =
118
119
collection .stream ()
119
120
.filter (md -> SPANNER_METRICS .contains (md .getName ()))
120
121
.collect (Collectors .toList ());
121
122
122
- // Skips exporting if there's none
123
- if (spannerMetricData .isEmpty ()) {
124
- return CompletableResultCode .ofSuccess ();
125
- }
126
-
127
- // Verifies metrics project id is the same as the spanner project id set on this client
128
- if (!spannerMetricData .stream ()
123
+ // Log warnings for metrics that will be skipped.
124
+ boolean mustFilter = false ;
125
+ if (spannerMetricData .stream ()
129
126
.flatMap (metricData -> metricData .getData ().getPoints ().stream ())
130
- .allMatch (
131
- pd -> spannerProjectId . equals ( SpannerCloudMonitoringExporterUtils . getProjectId ( pd )))) {
132
- logger . log ( Level .WARNING , "Metric data has a different projectId. Skipping export ." );
133
- return CompletableResultCode . ofFailure () ;
127
+ .anyMatch ( this :: shouldSkipPointDataDueToProjectId )) {
128
+ logger . log (
129
+ Level .WARNING , "Some metric data contain a different projectId. These will be skipped ." );
130
+ mustFilter = true ;
134
131
}
135
-
136
- // Verifies if metrics data has missing instance id.
137
132
if (spannerMetricData .stream ()
138
133
.flatMap (metricData -> metricData .getData ().getPoints ().stream ())
139
- .anyMatch (pd -> SpannerCloudMonitoringExporterUtils .getInstanceId (pd ) == null )) {
140
- logger .log (Level .WARNING , "Metric data has missing instanceId. Skipping export." );
141
- return CompletableResultCode .ofFailure ();
134
+ .anyMatch (this ::shouldSkipPointDataDueToMissingInstanceId )) {
135
+ logger .log (Level .WARNING , "Some metric data miss instanceId. These will be skipped." );
136
+ mustFilter = true ;
137
+ }
138
+ if (mustFilter ) {
139
+ spannerMetricData =
140
+ spannerMetricData .stream ()
141
+ .filter (this ::shouldSkipMetricData )
142
+ .collect (Collectors .toList ());
143
+ }
144
+ lastExportSkippedData .set (mustFilter );
145
+
146
+ // Skips exporting if there's none
147
+ if (spannerMetricData .isEmpty ()) {
148
+ return CompletableResultCode .ofSuccess ();
142
149
}
143
150
144
151
List <TimeSeries > spannerTimeSeries ;
@@ -190,6 +197,26 @@ public void onSuccess(List<Empty> empty) {
190
197
return spannerExportCode ;
191
198
}
192
199
200
+ private boolean shouldSkipMetricData (MetricData metricData ) {
201
+ return metricData .getData ().getPoints ().stream ()
202
+ .anyMatch (
203
+ pd ->
204
+ shouldSkipPointDataDueToProjectId (pd )
205
+ || shouldSkipPointDataDueToMissingInstanceId (pd ));
206
+ }
207
+
208
+ private boolean shouldSkipPointDataDueToProjectId (PointData pointData ) {
209
+ return !spannerProjectId .equals (SpannerCloudMonitoringExporterUtils .getProjectId (pointData ));
210
+ }
211
+
212
+ private boolean shouldSkipPointDataDueToMissingInstanceId (PointData pointData ) {
213
+ return SpannerCloudMonitoringExporterUtils .getInstanceId (pointData ) == null ;
214
+ }
215
+
216
+ boolean lastExportSkippedData () {
217
+ return this .lastExportSkippedData .get ();
218
+ }
219
+
193
220
private ApiFuture <List <Empty >> exportTimeSeriesInBatch (
194
221
ProjectName projectName , List <TimeSeries > timeSeries ) {
195
222
List <ApiFuture <Empty >> batchResults = new ArrayList <>();
@@ -233,7 +260,7 @@ public CompletableResultCode shutdown() {
233
260
* metric over time.
234
261
*/
235
262
@ Override
236
- public AggregationTemporality getAggregationTemporality (InstrumentType instrumentType ) {
263
+ public AggregationTemporality getAggregationTemporality (@ Nonnull InstrumentType instrumentType ) {
237
264
return AggregationTemporality .CUMULATIVE ;
238
265
}
239
266
}
0 commit comments