-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathqueue.go
140 lines (118 loc) · 5.6 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package queue
import (
"errors"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/opt"
)
// Factory for creating a queue used by a pipeline instance.
type Factory func(ACKListener, *logp.Logger, *config.C, int) (Queue, error)
// ACKListener listens to special events to be send by queue implementations.
type ACKListener interface {
OnACK(eventCount int) // number of consecutively published events acked by producers
}
//Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user.
type Metrics struct {
//EventCount is the total events currently in the queue
EventCount opt.Uint
//ByteCount is the total byte size of the queue
ByteCount opt.Uint
//ByteLimit is the user-configured byte limit of the queue
ByteLimit opt.Uint
//EventLimit is the user-configured event limit of the queue
EventLimit opt.Uint
//UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed
UnackedConsumedEvents opt.Uint
//OldestActiveTimestamp is the timestamp of the oldest item in the queue.
OldestActiveTimestamp common.Time
}
// ErrMetricsNotImplemented is a hopefully temporary type to mark queue metrics as not yet implemented
var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented")
// Queue is responsible for accepting, forwarding and ACKing events.
// A queue will receive and buffer single events from its producers.
// Consumers will receive events in batches from the queues buffers.
// Once a consumer has finished processing a batch, it must ACK the batch, for
// the queue to advance its buffers. Events in progress or ACKed are not readable
// from the queue.
// When the queue decides it is safe to progress (events have been ACKed by
// consumer or flush to some other intermediate storage), it will send an ACK signal
// with the number of ACKed events to the Producer (ACK happens in batches).
type Queue interface {
Close() error
BufferConfig() BufferConfig
Producer(cfg ProducerConfig) Producer
// Get retrieves a batch of up to eventCount events. If eventCount <= 0,
// there is no bound on the number of returned events.
Get(eventCount int) (Batch, error)
Metrics() (Metrics, error)
}
// BufferConfig returns the pipelines buffering settings,
// for the pipeline to use.
// In case of the pipeline itself storing events for reporting ACKs to clients,
// but still dropping events, the pipeline can use the buffer information,
// to define an upper bound of events being active in the pipeline.
type BufferConfig struct {
// MaxEvents is the maximum number of events the queue can hold at capacity.
// A value <= 0 means there is no fixed limit.
MaxEvents int
}
// ProducerConfig as used by the Pipeline to configure some custom callbacks
// between pipeline and queue.
type ProducerConfig struct {
// if ACK is set, the callback will be called with number of events produced
// by the producer instance and being ACKed by the queue.
ACK func(count int)
// OnDrop provided to the queue, to report events being silently dropped by
// the queue. For example an async producer close and publish event,
// with close happening early might result in the event being dropped. The callback
// gives a queue user a chance to keep track of total number of events
// being buffered by the queue.
OnDrop func(beat.Event)
// DropOnCancel is a hint to the queue to drop events if the producer disconnects
// via Cancel.
DropOnCancel bool
}
// Producer is an interface to be used by the pipelines client to forward
// events to a queue.
type Producer interface {
// Publish adds an event to the queue, blocking if necessary, and returns
// true on success.
Publish(event interface{}) bool
// TryPublish adds an event to the queue if doing so will not block the
// caller, otherwise it immediately returns. The reasons a publish attempt
// might block are defined by the specific queue implementation and its
// configuration. Returns true if the event was successfully added, false
// otherwise.
TryPublish(event interface{}) bool
// Cancel closes this Producer endpoint. If the producer is configured to
// drop its events on Cancel, the number of dropped events is returned.
// Note: A queue may still send ACK signals even after Cancel is called on
// the originating Producer. The pipeline client must accept and
// discard these ACKs.
Cancel() int
}
// Batch of events to be returned to Consumers. The `Done` method will tell the
// queue that the batch has been consumed and its events can be discarded.
type Batch interface {
Count() int
Event(i int) interface{}
Done()
}