-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathinterfaces.go
361 lines (349 loc) · 19.3 KB
/
interfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
// Package consumer is a framework for distributed, stateful consumption of Gazette journals.
//
// Most users will want to use package go.gazette.dev/core/mainboilerplate/runconsumer
// to build complete, configured framework applications. This package focuses on
// interface definitions and implementation of the consumer runtime.
//
// The primary entry point to this package is the Application interface, which
// users implement to power the event-driven callback logic of the consumer.
// Service is then the top-level runtime of a consumer process, cooperatively
// executing an Application across a number of distributed peer processes and
// servicing the gRPC shard API.
//
// A number of convenience functions are also supplied for interfacing with a
// remote Shard server endpoint.
package consumer
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.gazette.dev/core/broker/client"
pb "go.gazette.dev/core/broker/protocol"
pc "go.gazette.dev/core/consumer/protocol"
"go.gazette.dev/core/consumer/recoverylog"
"go.gazette.dev/core/keyspace"
"go.gazette.dev/core/message"
)
// Shard is the processing context of a ShardSpec which is assigned to the
// local consumer process.
type Shard interface {
// Context of this shard. Notably, the Context will be cancelled when this
// process is no longer responsible for the shard.
Context() context.Context
// Fully-qualified name of this shard, AKA the Etcd key of the ShardSpec.
// That key is used as the Shard FQN because it composes the consumer
// application prefix with this ShardID, and is universally unique
// within the backing Etcd cluster. An FQN can only conflict if another
// consumer deployment, utilizing another Etcd cluster, also reuses the
// same application prefix and ShardID.
FQN() string
// Current Spec of the shard. Fields of a returned Spec instance will never
// change, but the instance returned by Spec may change over time to reflect
// updated Etcd states.
Spec() *pc.ShardSpec
// Assignment of the shard to this process. Fields of a returned KeyValue
// instance will never change, but the instance returned by Assignment may
// change over time to reflect updated Etcd states.
Assignment() keyspace.KeyValue
// JournalClient to be used for raw journal []byte appends made on behalf
// of this Shard. Consistent use of this client enables Gazette to ensure
// that all writes issued within a consumer transaction have completed prior
// to that transaction being committed. Put differently, if a consumed message
// triggers a raw append to a journal, Gazette can guarantee that append will
// occur at-least-once no matter how this Shard or brokers may fail.
JournalClient() client.AsyncJournalClient
// RecoveredHints returns the GetHintsResponse which was used in the
// recovery of this Shard from its recovery log.
// If the Shard doesn't use a recovery log, RecoveredHints is nil.
RecoveredHints() *pc.GetHintsResponse
// Progress of the Shard as-of its most recent completed transaction.
// readThrough is offsets of source journals which have been read
// through. publishAt is journals and offsets this Shard has published
// through, including acknowledgements. If a read message A results in this
// Shard publishing messages B, and A falls within readThrough, then all
// messages B (& their acknowledgements) fall within publishAt.
//
// While readThrough is comprehensive and persists across Shard faults,
// note that publishAt is *advisory* and not necessarily complete: it
// includes only journals written to since this Shard was assigned to
// this process.
Progress() (readThrough, publishAt pb.Offsets)
}
// Store is a durable and transactional storage backend used to persist arbitrary
// Application-defined states alongside the consumer Checkpoints which produced
// those states. The particular means by which the Store represents transactions
// varies from implementation to implementation, and is not modeled by this
// interface. However for correct exactly-once processing semantics, it must be
// the case that Store modifications made by Applications are made in transactions
// which are committed by StartCommit, and which incorporate the Checkpoint
// provided to StartCommit.
//
// Often Stores are implemented as embedded databases which record their file
// operations into a provided `recoverylog.Recorder`. Stores which instead utilize
// an external transactional system (eg, an RDBMS) are also supported.
//
// Application implementations control the selection, initialization, and usage
// of an appropriate Store for their use case.
type Store interface {
// StartCommit starts an asynchronous, atomic "commit" to the Store of state
// updates from this transaction along with the Checkpoint. If Store uses an
// external transactional system, then StartCommit must fail if another
// process has invoked RestoreCheckpoint after *this* Store instance did
// so. Put differently, Store cannot commit a Checkpoint that another Store
// may never see (because it continues from an earlier Checkpoint that it
// restored).
//
// In general terms, this means StartCommit must verify a "write fence"
// previously installed by RestoreCheckpoint as part of its transaction.
// Embedded Stores which use recovery logs can rely on the write fence of
// the log itself, implemented via journal registers.
//
// StartCommit may immediately begin a transaction with the external system
// (if one hasn't already been started from ConsumeMessage), but cannot
// allow it to commit until all waitFor OpFutures have completed
// successfully. A failure of one of these OpFutures must also fail this
// commit.
//
// StartCommit will never be called if the OpFuture returned by a previous
// StartCommit hasn't yet completed. However, ConsumeMessage *will* be
// called while a previous StartCommit continues to run. Stores should
// account for this, typically by starting a new transaction which runs
// alongside the old.
StartCommit(_ Shard, _ pc.Checkpoint, waitFor OpFutures) OpFuture
// RestoreCheckpoint recovers the most recent Checkpoint previously committed
// to the Store. It is called at Shard start-up, and may be called again
// if a MessageProducer drains its message channel. If an external system
// is used, it should install a transactional "write fence" to ensure
// that an older Store instance of another process cannot successfully
// StartCommit after this RestoreCheckpoint returns.
RestoreCheckpoint(Shard) (pc.Checkpoint, error)
// Destroy releases all resources associated with the Store (eg, local files).
// It is guaranteed that the Store is no longer being used or referenced at
// the onset of this call.
Destroy()
}
// OpFuture represents an operation which is executing in the background.
// Aliased for brevity from the "client" package.
type OpFuture = client.OpFuture
type OpFutures = client.OpFutures
// Application is the interface provided by user applications running as Gazette
// consumers. Only unrecoverable errors should be returned by Application.
// A returned error will abort processing of an assigned Shard, and will update
// the assignment's ReplicaStatus to FAILED.
//
// Gazette consumers process messages within pipelined transactions. A
// transaction begins upon the first call to ConsumeMessage, which is invoked
// for each read-committed message of a source journal. In the course of the
// transaction many more messages may be passed to ConsumeMessage. When
// consuming a message the Application is free to:
//
// 1) Begin or continue a transaction with its Store.
// 2) Publish exactly-once Messages to other journals via the provided Publisher.
// 3) Append raw at-least-once []bytes via the Shard's JournalClient.
// 4) Keep in-memory-only aggregates such as counts.
//
// Messages published via PublishUncommitted will be visible to read-committed
// readers once the consumer transaction completes. Read-uncommitted readers will see
// them while the transaction continues to run. Similarly writes issued directly
// through the shard JournalClient are also readable while the transaction runs.
//
// A transaction *must* continue to run while asynchronous appends of the _prior_
// transaction are ongoing (including appends to the shard recovery log and
// appends of post-commit acknowledgements). The transaction will continue to
// process messages during this time until the ShardSpec's MaxTxnDuration is
// reached, at which point the transaction will stop reading messages but
// continue to wait for prior appends to finish.
//
// It must also continue to run until the ShardSpec's MinTxnDuration is reached.
//
// Assuming both of those conditions are satisfied, the transaction will close
// upon encountering a stall in a buffered channel of decoded input messages.
// If a stall isn't forthcoming (as is frequent at high write rates), it will
// close upon reaching the ShardSpec's MaxTxnDuration.
//
// Upon transaction close, FinalizeTxn is called. At this point the Application
// must publish any pending messages and/or begin related journal appends,
// and must flush any in-memory caches or aggregates into its Store transaction
// (simply starting appends is sufficient: the Application does _not_ need to
// wait for journal appends to complete).
//
// StartCommit of the Store is then called with a Checkpoint. For correct
// exactly-once processing semantics, the Checkpoint must be written in a
// single transaction alongside all other Store mutations made within the
// scope of this consumer transaction:
//
// * Eg for `store-rocksdb`, all store mutations and the Checkpoint are
// written together within a single RocksDB WriteBatch.
// * For `SQLStore`, verification of the write fence, INSERTS, UPDATES, and the
// Checkpoint itself are written within a single BEGIN/COMMIT transaction.
// * Similarly, `store-sqlite` persists Checkpoints within the scope of the
// current SQL transaction.
//
// Note that other, non-transactional Store mutations are permitted, but will
// have a weaker at-least-once processing guarantee with respect to Store state.
// This can make sense for applications filling caches in BigTable, Memcache, or
// other systems which support transactions only over a single key (ie, check-and-set).
// In this case the Store should apply all mutations, followed by a fenced CAS
// Checkpoint update. So long as the Checkpoint itself is fenced properly,
// messages will continue to have exactly-once semantics (though be cautious
// of publishing messages which are derived from other Store keys that may
// have been updated more than once).
//
// Once the commit completes, acknowledgements of messages published during the
// transaction are written to applicable journals, which informs downstream
// readers that those messages have committed and may now be consumed.
//
// Transactions are fully pipelined: once StartCommit has returned, the next
// consumer transaction immediately begins processing even though the prior
// transaction continues to commit in the background (and could even fail). In
// fact, at this point there's no guarantee that any journal writes of the
// previous transaction have even _started_, including those to the recovery log.
//
// To make this work safely and correctly, transactions use barriers to ensure
// that background operations are started and complete in the correct order:
// for example, that the prior transaction doesn't commit until all its writes
// to other journals have also completed, and that writes of message
// acknowledgements don't start until mutations & the checkpoint have committed
// to the Store.
//
// While those operations complete in the background, the next transaction will
// consume new messages concurrently. Its one constraint is that it may not
// itself start to commit until its predecessor transaction has fully completed.
// This means that transactions will almost always exhibit some degree of
// batching, which will depend on the rate of incoming messages, the latency to
// Gazette, and the latency and speed of a utilized external store. If the prior
// commit takes so long that the successor transaction reaches its maximum
// duration, then that successor will stall without processing further messages
// until its predecessor commits. This is the _only_ case under which a consumer
// can be blocked from processing ready messages.
type Application interface {
// NewStore constructs a Store for the Shard around the initialize file-
// system *Recorder. If the ShardSpec does not have a configured recovery
// log, then *Recorder will be nil.
NewStore(Shard, *recoverylog.Recorder) (Store, error)
// NewMessage returns a zero-valued Message of an appropriate representation
// for the JournalSpec.
NewMessage(*pb.JournalSpec) (message.Message, error)
// ConsumeMessage consumes a read-committed message within the scope of a
// transaction. It should use the provided Publisher to PublishUncommitted
// messages to other journals. Doing so enables Gazette to properly sequence
// messages and ensure they are either acknowledged or rolled-back atomically
// with this consumer transaction.
ConsumeMessage(Shard, Store, message.Envelope, *message.Publisher) error
// FinalizeTxn indicates a consumer transaction is ending, and that the
// Application must flush any in-memory transaction state or caches, and
// begin any deferred journal appends. At completion:
// * All transaction messages must have been published to the provided Publisher,
// * All raw transaction []byte content must have been appended via the shard
// JournalClient, and
// * All in-memory state must be marshalled to the active Store transaction.
FinalizeTxn(Shard, Store, *message.Publisher) error
}
// BeginFinisher is an optional interface of Application which is informed
// when consumer transactions begin or finish.
type BeginFinisher interface {
// BeginTxn is called to notify the Application that a transaction is beginning
// (and a call to ConsumeMessage will be immediately forthcoming), allowing
// the Application to perform any preparatory work. For consumers doing
// extensive aggregation, it may be beneficial to focus available compute
// resource on a small number of transactions while completely stalling
// others: this can be accomplished by blocking in BeginTxn until a semaphore
// is acquired. A call to BeginTx is always paired with a call to FinishTxn.
BeginTxn(Shard, Store) error
// FinishedTxn is notified that a previously begun transaction has started to
// commit, or has errored. It allows the Application to perform related cleanup
// (eg, releasing a previously acquired semaphore). Note transactions are
// pipelined, and commit operations of this transaction may still be ongoing.
// FinishedTxn can await the provided OpFuture for its final status.
FinishedTxn(Shard, Store, OpFuture)
}
// MessageProducer is an optional interface of Application which controls the
// means by which messages to process are identified and produced into the
// provided channel, for processing by consumer transactions.
type MessageProducer interface {
// StartReadingMessages identifies journals and messages to be processed
// by this consumer Shard, and dispatches them to the provided channel.
// Any terminal error encountered during initialization or while reading
// messages should also be delivered over |intoCh|. Reads of journals
// included in |from| should begin from the given offset.
//
// If |intoCh| closes without having sent an error then a current transaction,
// if any, is completed and committed. A Store checkpoint is next restored and
// StartReadingMessages is called again, effectively restarting processing.
// Implementations can use this behavior to update the joint read and
// processing context of a shard at a transaction boundary.
StartReadingMessages(_ Shard, _ Store, _ pc.Checkpoint, intoCh chan<- EnvelopeOrError)
// ReplayRange builds and returns a read-uncommitted Iterator over the
// identified byte-range of the journal. The Journal and offset range are
// guaranteed to be a journal segment which was previously produced via
// StartReadingMessages. The returned Iterator must re-produce the exact
// set and ordering of Messages from the identified Journal. If an error
// occurs while initializing the replay, it should be returned via
// Next() of the returned message.Iterator.
ReplayRange(_ Shard, _ Store, journal pb.Journal, begin, end pb.Offset) message.Iterator
// ReadThrough is used by Resolver to identify a set of Offsets
// (eg, a sub-set of the Offsets present in ResolveArgs.ReadThrough)
// which must be read-through before resolution may complete.
ReadThrough(Shard, Store, ResolveArgs) (pb.Offsets, error)
}
var (
shardUpDesc = prometheus.NewDesc(
"gazette_shard_up",
"Indicates the processing status of a shard by this consumer.",
[]string{"shard", "status"}, nil)
shardReadHeadDesc = prometheus.NewDesc(
"gazette_shard_read_head",
"Current read head of the consumer (i.e., next journal byte offset to be read).",
[]string{"shard", "journal"}, nil)
)
var (
shardTxnTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_shard_transactions_total",
Help: "Total number of consumer transactions.",
}, []string{"shard"})
shardReadMsgsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_shard_read_messages_total",
Help: "Total number of messages processed by completed consumer transactions.",
}, []string{"shard"})
shardReadBytesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_shard_read_bytes_total",
Help: "Total byte-length of messages processed by completed consumer transactions.",
}, []string{"shard"})
shardTxnPhaseSecondsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "gazette_shard_phase_seconds_total",
Help: "Cumulative number of seconds processing transactions.",
}, []string{"shard", "phase", "type"})
// DEPRECATED metrics to be removed:
txCountTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_count_total",
Help: "Cumulative number of transactions",
})
txMessagesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_messages_total",
Help: "Cumulative number of messages.",
})
txSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_seconds_total",
Help: "Cumulative number of seconds processing transactions.",
})
txConsumeSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_consume_seconds_total",
Help: "Cumulative number of seconds transactions were processing messages.",
})
txStalledSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_stalled_seconds_total",
Help: "Cumulative number of seconds transactions were stalled waiting for Gazette IO.",
})
txFlushSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_flush_seconds_total",
Help: "Cumulative number of seconds transactions were flushing their commit.",
})
txSyncSecondsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_tx_sync_seconds_total",
Help: "Cumulative number of seconds transactions were waiting for their commit to sync.",
})
bytesConsumedTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazette_consumer_consumed_bytes_total",
Help: "Cumulative number of bytes consumed.",
})
)