-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathcapture.proto
310 lines (291 loc) · 12.3 KB
/
capture.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
syntax = "proto3";
package capture;
option go_package = "github.com/estuary/flow/go/protocols/capture";
import "go/protocols/flow/flow.proto";
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.protosizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
// Captures is a very long lived RPC through which the Flow runtime and a
// connector cooperatively execute an unbounded number of transactions.
//
// The Pull workflow pulls streams of documents into capturing Flow
// collections. Streams are incremental and resume-able, with resumption
// semantics defined by the connector. The Flow Runtime uses a transactional
// recovery log to support this workflow, and the connector may persist arbitrary
// driver checkpoints into that log as part of the RPC lifecycle,
// to power its chosen resumption semantics.
//
// Pull tasks are split-able, and many concurrent invocations of the RPC
// may collectively capture from a source, where each task split has an
// identified range of keys it's responsible for. The meaning of a "key",
// and it's application within the remote store being captured from, is up
// to the connector. The connector might map partitions or shards into the keyspace,
// and from there to a covering task split. Or, it might map distinct files,
// or some other unit of scaling.
//
// RPC Lifecycle
// =============
//
// :Request.Open:
// - The Flow runtime opens the pull stream.
// :Response.Opened:
// - The connector responds with Opened.
//
// Request.Open and Request.Opened are sent only once, at the
// commencement of the stream. Thereafter the protocol loops:
//
// :Response.Captured:
// - The connector tells the runtime of documents,
// which are pending a future Checkpoint.
// - If the connector sends multiple Documents messages without an
// interleaving Checkpoint, the Flow runtime MUST commit
// documents of all such messages in a single transaction.
// :Response.Checkpoint:
// - The connector tells the runtime of a checkpoint: a watermark in the
// captured documents stream which is eligble to be used as a
// transaction commit boundary.
// - Whether the checkpoint becomes a commit boundary is at the
// discretion of the Flow runtime. It may combine multiple checkpoints
// into a single transaction.
// :Request.Acknowledge:
// - The Flow runtime tells the connector that its Checkpoint has committed.
// - The runtime sends one ordered Acknowledge for each Checkpoint.
service Connector {
rpc Capture(stream Request) returns (stream Response);
}
message Request {
// Spec requests the specification definition of this connector.
// Notably this includes its configuration JSON schemas.
message Spec {
// Connector type addressed by this request.
flow.CaptureSpec.ConnectorType connector_type = 1;
// Connector configuration, as an encoded JSON object.
// This may be a partial specification (for example, a Docker image),
// providing only enough information to fetch the remainder of the
// specification schema.
string config_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "config"
];
}
Spec spec = 1;
// Discover returns the set of resources available from this connector.
message Discover {
// Connector type addressed by this request.
flow.CaptureSpec.ConnectorType connector_type = 1;
// Connector configuration, as an encoded JSON object.
string config_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "config"
];
}
Discover discover = 2;
// Validate a capture configuration and proposed bindings.
// Validate is run out-of-band with ongoing capture invocations.
// It's purpose is to confirm that the proposed configuration
// is likely to succeed if applied and run, or to report any
// potential issues for the user to address.
message Validate {
// Name of the capture being validated.
string name = 1
[ (gogoproto.casttype) = "github.com/estuary/flow/go/protocols/flow.Capture" ];
// Connector type addressed by this request.
flow.CaptureSpec.ConnectorType connector_type = 2;
// Connector configuration, as an encoded JSON object.
string config_json = 3 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "config"
];
// Bindings of endpoint resources and collections to which they would be
// captured. Bindings are ordered and unique on the bound collection name.
message Binding {
// JSON-encoded object which specifies the endpoint resource to be captured.
string resource_config_json = 1 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "resourceConfig"
];
// Collection to be captured.
flow.CollectionSpec collection = 2 [ (gogoproto.nullable) = false ];
// Backfill counter for this binding.
uint32 backfill = 3;
}
repeated Binding bindings = 4;
// Last CaptureSpec which was validated and published.
// Note that this CaptureSpec may not have been applied.
flow.CaptureSpec last_capture = 5;
// Version of the last validated CaptureSpec.
string last_version = 6;
}
Validate validate = 3;
// Apply an updated capture specification to its endpoint,
// in preparation for an Open of a capture session.
// Apply is run by the leader shard of a capture task
// (having key_begin: 0) while the capture is quiescent.
// Apply may be called multiple times for a given `version` and
// `last_version`, even if a prior call succeeded from the connector's
// perspective, so implementations must be idempotent. However, the next
// session will not Open until it's preceding Apply has durably completed.
message Apply {
// Capture to be applied.
flow.CaptureSpec capture = 1;
// Version of the CaptureSpec being applied.
string version = 2;
// Last CaptureSpec which was successfully applied.
flow.CaptureSpec last_capture = 4;
// Version of the last applied CaptureSpec.
string last_version = 5;
}
Apply apply = 4;
// Open a capture for reading documents from the endpoint.
// Unless the connector requests explicit acknowledgements,
// Open is the last message which will be sent to the connector.
message Open {
// CaptureSpec to be pulled.
flow.CaptureSpec capture = 1;
// Version of the opened CaptureSpec.
// The connector may want to require that this match the version last
// provided to a successful Apply RPC. It's possible that it won't,
// due to expected propagation races in Flow's distributed runtime.
string version = 2;
// Range of documents to be processed by this invocation.
flow.RangeSpec range = 3;
// Last-persisted connector checkpoint state from a previous invocation.
string state_json = 4 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "state"
];
}
Open open = 5;
// Tell the connector that some number of its preceding Checkpoints have
// committed to the Flow recovery log.
//
// Acknowledge is sent only if the connector set
// Response.Opened.explicit_acknowledgements.
message Acknowledge {
// Number of preceeding Response.Checkpoint messages which have
// committed and are being acknowledged. Always one or more.
uint32 checkpoints = 1;
}
Acknowledge acknowledge = 6;
// Reserved for internal use.
bytes internal = 100 [ json_name = "$internal" ];
}
message Response {
// Spec responds to Request.Spec.
message Spec {
// Protocol version must be 3032023.
uint32 protocol = 1;
// JSON schema of the connector's configuration.
string config_schema_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "configSchema"
];
// JSON schema of the connector's resource configuration.
string resource_config_schema_json = 3 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "resourceConfigSchema"
];
// URL for connector's documentation.
string documentation_url = 4;
// Optional OAuth2 configuration.
flow.OAuth2 oauth2 = 5;
// One or more JSON pointers, which are used to extract resource paths
// from resource configurations of this connector. For example,
// a database connector might have a resource config like:
// {"schema": "foo", "table": "bar", "other": "config", "answer": 42}
// The connector would specify `resource_path_pointers: ["/schema", "/table"]`,
// which would result in a `resource_path` of `["foo", "bar"]`.
repeated string resource_path_pointers = 6;
}
Spec spec = 1;
// Discovered responds to Request.Discover.
message Discovered {
// Potential bindings which the capture could provide.
// Bindings may be returned in any order.
message Binding {
// The recommended name for this discovered binding,
// which is normalized and then appended to a catalog prefix of the
// proposed capture to form the name of its recommended collection.
string recommended_name = 1;
// JSON-encoded object which specifies the captured resource configuration.
string resource_config_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "resourceConfig"
];
// JSON schema of documents produced by this binding.
string document_schema_json = 3 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "documentSchema"
];
// Composite key of documents (if known), as JSON-Pointers.
repeated string key = 4;
// When true, this binding should be added but marked as disabled.
// The user may explicitly enable it, but the garden-path expectation
// for most users is that they likely wouldn't want it.
bool disable = 5;
// Components of the resource path which fully qualify the resource
// identified by this binding.
// - For an RDBMS, this might be []{dbname, schema, table}.
// - For Kafka, this might be []{topic}.
// - For Redis, this might be []{key_prefix}.
repeated string resource_path = 6;
}
repeated Binding bindings = 1;
}
Discovered discovered = 2;
// Validated responds to Request.Validate.
message Validated {
// Validation responses for each binding of the request, and matching the
// request ordering. Each Binding must have a unique resource_path.
message Binding {
// Components of the resource path which fully qualify the resource
// identified by this binding.
// - For an RDBMS, this might be []{dbname, schema, table}.
// - For Kafka, this might be []{topic}.
// - For Redis, this might be []{key_prefix}.
repeated string resource_path = 1;
}
repeated Binding bindings = 1;
}
Validated validated = 3;
// Applied responds to Request.Apply.
message Applied {
// Human-readable description of the action that the connector took.
// If empty, this Apply is to be considered a "no-op".
string action_description = 1;
}
Applied applied = 4;
// Opened responds to Request.Open.
// After Opened, the connector beings sending Captured and Checkpoint.
message Opened {
// If true then the runtime will send one Request.Acknowledge
// for each Response.Checkpoint sent by the connector,
// upon that Checkpoint having fully committed.
bool explicit_acknowledgements = 1;
}
Opened opened = 5;
// Document captured by this connector invocation.
// Emitted documents are pending, and are not committed to their bound collection
// until a following Checkpoint is emitted.
message Captured {
// Index of the Open binding for which this document is captured.
uint32 binding = 1;
// Published JSON document.
string doc_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "doc"
];
}
Captured captured = 6;
// Checkpoint all preceding documents of this invocation since the last checkpoint.
// The Flow runtime may begin to commit documents in a transaction.
// Note that the runtime may include more than one checkpoint in a single transaction.
message Checkpoint {
flow.ConnectorState state = 1;
}
Checkpoint checkpoint = 7;
// Reserved for internal use.
bytes internal = 100 [ json_name = "$internal" ];
}