-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
proposals for the connector specification #5084
Comments
Hi @jgraettinger! Thank you for posting this issue. We'd love to work together on developing this standard! Here is how we are thinking about some of the points you raised.
Most connectors do emit multiple STATE message over their lifetime. The Airbyte job runner only records the last emitted STATE message one (will discussed more in #2), but connectors can emit STATE messages incrementally. Let me know if there is something I'm not understanding with regard to how connectors interact / emit state. Also helpful if the feedback is just that this was not clear from our docs.
I agree that we should add first-class support for streaming connectors that are designed to run indefinitely. I'd like to explore a little more how to make this intuitive. The way the Airbyte implementation of the protocol works right now, there is (mostly) nothing that really prevents a connector from running indefinitely. We do eventually stop a sync job if it has both run for a long time and no data has been received for a long time. It will not stop though as long as the connector emits something however. So as long as the connector is emitting a state message periodically (even if no data has come in), it can run forever. So what I'm still thinking through is whether this belongs in the ConfiguredCatalog or if part of the protocol is to say that as long as a connector is emitting messages, it should be allowed to run. Would love your perspective on what would be most intuitive to you in this regard.
Agreed.
Similar to #2, this concept fits into how we are thinking about evolving the protocol. The goal here, as you mentioned, is to pass a range to an instantiation of a connector so that it can operate on a shard of the data. I agree that this needs to be an argument passed to a connector at runtime. Thus it needs to be passed into this interface somehow. I'm less sure if it makes sense to put this in ConfiguredCatalog, but I think this is just a detail of how we structure the standard. Naively, instead of:
we want:
This aligns really well with our vision as well. Our preference is to surface this operation separate from the
Good point! We will brainstorm about this idea. |
Thanks, this was an interesting and informative response. I think some of these proposals seem to warrant issues of their own, so that we can discuss further without mixing conversations. We're both out on vacation for another week, but I'm looking forward to discussing further after I'm back. Would it be ok for us to split off some separate issues then to resolve the discussions around proposals 2-6? |
Some more thoughts on these:
Yeah, I think we just didn't get the right understanding from the docs on this. I took another look at these docs and they don't seem to specify one way or the other. We can just file a docs issue to clear that up.
I think the issue here is that all connectors that can support running in tailing (blocking) mode can also support reading all avaialable data and then exiting (non-blocking mode), and we want the runtime to determine which mode should be used. Take the source-kinesis connector, for example. In tailing mode, it will never exit normally. In non-tailing mode, it will read and emit all the records up through the current wall clock time, and then exit. We want our runtime to tell the connectors which mode to run in because it supports a mode of running our CLI where the runtime itself can exit once it's processed all the available data.
Excellent! What's the next step with this? Can we just create a docs issue? Do we need to do any additional problem solving to ensure that existing connectors don't break due to the new behavior?
I don't think we really care exactly how this is passed, but the ConfiguredCatalog does seem like the most sensible place. Adding it to the State or Config seems just out of place. And adding a new CLI option seems like it would make backward compatibility a problem since most argument parsers would probably error on encountering an unrecognized
This one is really interesting, and I'd love to hear some other perspectives on it. My perspective is that since the connector is already responsible for converting tabular (or whatever kind of) data into JSON, then it might as well use the desired field names. Otherwise, you need to re-parse and re-shape the JSON as a separate step. That might seem like it's appropriate to do in a separate step, given that it's likely a capability that you'll want anyway, but I think it's really preferable to have connectors do it because it minimizes the changes to pipelines when changing from one source and/or format to another. It also potentially avoids needing to re-parse and re-shape the JSON, so it's at least theoretically more efficient. |
Hi team Airbyte!
tl;dr: We (team Estuary) want to work with you to evolve the Airbyte spec into a community standard for building data connectors.
Things we especially like about your improvements to the Singer spec are the schema-driven configuration discovery workflow and the use of Docker for packaging and discovery of connectors (rather than PyPi).
The protocol is adaptable to continuous / low-latency streaming contexts, which is a particular focus for Flow.
We've worked to integrate the "source" protocol into Flow, and have also worked up some OSS source connectors for various technical systems (S3 & GCS with pattern matching, incremental bucket scans, CSV, and compression handling ; Kinesis ; Kafka soon).
In the process we've implemented and experimented with several extensions to the spec we wanted to talk through and propose for inclusion. I've bundled these in this Github issue, but they're largely separable and can be discussed/evolved/adopted independently.
Proposals
Allow a connector to produce multiple states over its lifetime.
A connector invocation is permitted to produce many STATE messages over its lifetime.
Each STATE reflects a checkpoint that reflects the RECORDS which preceded it.
Add optional
tail
to ConfiguredCatalogtail
is an boolean which tells the connector that it may run indefinitely, producing new RECORDs and STATEs as new data becomes available.If false, the connector should produce available RECORDS and then exit, as is the current behavior.
tail
is optional (assumed to befalse
if missing), and connectors are not required to do anything with it.A Kafka connector might use
tail
to determine whether it should consume indefinitely, producing new RECORDs as they arrive, or should read through offsets determined at startup and then exit.Output STATEs are reduced using RFC 7368 Merge PATCH.
As currently specified, each STATE produced across connector invocations is a complete replacement of a prior STATE.
Consider an AWS Kinesis source which is capturing from many underlying Kinesis shards, where the connector STATE encapsulates separate read offsets for each shard.
Today, when any one shard has data, the connector emits its RECORDs and then must re-state the STATE of all shards. Under this proposal, it could instead write a STATE that reflects just the new shard data. The client of the connector then knows to PATCH in that delta STATE in order to update the complete STATE.
One option is to say that all STATEs are PATCH merges, which changes the assumed behavior of a bunch of existing connectors. This may be backwards-compatible in practice, given how JSON PATCH works and the stable of existing Airbyte connectors, but we're not certain of that.
Another option might be to add a
patch
bool property to the STATE message that toggles this behavior.Add optional
range
to ConfiguredCatalogrange
is a pair of {begin, end} inclusive uint32 integers which tell each connector invocation what it's responsible for.range
facilitates connector parallelism: each invocation can consult itsrange
to determine if it should do a unit of work, or that one if its peers will.The exact usage of
range
is up to the connector.Examples of ranges:
{"begin":"00000000","end":"ffffffff"}
: connector has only one invocation, covering the full range{"begin":"00000000","end":"7fffffff"}
,{"begin":"800000","end":"ffffffff"}
: connector has two invocations, each with 1/2 of the range.If not specified,
range
is assumed to be{"begin":"00000000","end":"ffffffff"}
.A particular rationale for
range
(over, say, worker count and index) is that it allows for dynamic scale-out without breaking existing STATEs. Arange
and its prior STATE can be subdivided into new ranges, each starting from the common parent STATE.Add optional
projections
to ConfiguredStreamProjections is a map of {field name: JSON-pointer}.
Many sources of data are tabular in nature (SQL tables, CSVs), while connectors work with JSON documents.
An reasonable projection used to map between these models is map table columns into properties of a root document object.
But it's not the only one!
projections
instruct the connector of an alternative location where a given field should be inserted into a RECORD document.To give a grounded use case, Citi Bike provides (somewhat messy CSV) of their system data.
Our S3 connector is able to use projections to map CSV columns into more natural document locations in the RECORDs it produces.
Adopting community-oriented naming for the protocol
It will be challenging to build multi-company momentum behind a specification named for one company.
While pretty blah, would you be open to something like the "Open Connector Specification" (or have better ideas?).
The text was updated successfully, but these errors were encountered: