Skip to content

Commit 120736c

Browse files
authored
[chore] Move obs_queue to the internal/queue package (#13363)
Updates #13077 Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent e8ccfc3 commit 120736c

File tree

5 files changed

+26
-17
lines changed

5 files changed

+26
-17
lines changed

exporter/exporterhelper/internal/queuebatch/obs_queue.go renamed to exporter/exporterhelper/internal/queue/obs_queue.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
55

66
import (
77
"context"
@@ -11,7 +11,6 @@ import (
1111
"go.opentelemetry.io/otel/trace"
1212

1313
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
14-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
1514
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1615
"go.opentelemetry.io/collector/pipeline"
1716
)
@@ -26,14 +25,14 @@ const (
2625

2726
// obsQueue is a helper to add observability to a queue.
2827
type obsQueue[T request.Request] struct {
29-
queue.Queue[T]
28+
Queue[T]
3029
tb *metadata.TelemetryBuilder
3130
metricAttr metric.MeasurementOption
3231
enqueueFailedInst metric.Int64Counter
3332
tracer trace.Tracer
3433
}
3534

36-
func newObsQueue[T request.Request](set Settings[T], delegate queue.Queue[T]) (queue.Queue[T], error) {
35+
func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) {
3736
tb, err := metadata.NewTelemetryBuilder(set.Telemetry)
3837
if err != nil {
3938
return nil, err

exporter/exporterhelper/internal/queuebatch/obs_queue_test.go renamed to exporter/exporterhelper/internal/queue/obs_queue_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package queuebatch
4+
package queue
55

66
import (
77
"context"
@@ -16,7 +16,6 @@ import (
1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
19-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2120
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2221
"go.opentelemetry.io/collector/exporter/exportertest"
@@ -26,7 +25,7 @@ import (
2625
var exporterID = component.NewID(exportertest.NopType)
2726

2827
type fakeQueue[T any] struct {
29-
queue.Queue[T]
28+
Queue[T]
3029
offerErr error
3130
size int64
3231
capacity int64
@@ -44,7 +43,7 @@ func (fq *fakeQueue[T]) Offer(context.Context, T) error {
4443
return fq.offerErr
4544
}
4645

47-
func newFakeQueue[T request.Request](offerErr error, size, capacity int64) queue.Queue[T] {
46+
func newFakeQueue[T request.Request](offerErr error, size, capacity int64) Queue[T] {
4847
return &fakeQueue[T]{offerErr: offerErr, size: size, capacity: capacity}
4948
}
5049

exporter/exporterhelper/internal/queue/queue.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,33 @@ func (set *Settings[T]) activeSizer() request.Sizer[T] {
8181
}
8282
}
8383

84-
func NewQueue[T any](set Settings[T], next ConsumeFunc[T]) (Queue[T], error) {
84+
func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T], error) {
85+
q, err := newBaseQueue(set)
86+
if err != nil {
87+
return nil, err
88+
}
89+
90+
oq, err := newObsQueue(set, newAsyncQueue(q, set.NumConsumers, next))
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
return oq, nil
96+
}
97+
98+
func newBaseQueue[T any](set Settings[T]) (readableQueue[T], error) {
8599
// Configure memory queue or persistent based on the config.
86100
if !set.StorageID.HasValue() {
87-
return newAsyncQueue(newMemoryQueue[T](set), set.NumConsumers, next), nil
101+
return newMemoryQueue[T](set), nil
88102
}
89103
if set.ItemsSizer == nil {
90104
return nil, errors.New("PersistentQueue requires ItemsSizer to be set")
91105
}
92106
if set.BytesSizer == nil {
93107
return nil, errors.New("PersistentQueue requires BytesSizer to be set")
94108
}
95-
return newAsyncQueue(newPersistentQueue[T](set), set.NumConsumers, next), nil
109+
110+
return newPersistentQueue[T](set), nil
96111
}
97112

98113
// TODO: Investigate why linter "unused" fails if add a private "read" func on the Queue.

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func TestDisabledBatcher(t *testing.T) {
4343
Capacity: 1000,
4444
BlockOnOverflow: true,
4545
NumConsumers: tt.maxWorkers,
46+
Telemetry: componenttest.NewNopTelemetrySettings(),
4647
}, ba.Consume)
4748
require.NoError(t, err)
4849

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,7 @@ func newQueueBatch(
9797
return nil, err
9898
}
9999

100-
oq, err := newObsQueue(set, q)
101-
if err != nil {
102-
return nil, err
103-
}
104-
105-
return &QueueBatch{queue: oq, batcher: b}, nil
100+
return &QueueBatch{queue: q, batcher: b}, nil
106101
}
107102

108103
// Start is invoked during service startup.

0 commit comments

Comments
 (0)