These release notes can also view viewed on the Arroyo blog
This release introduces support for metadata from sources, a new RabbitMQ connector, improved CDC support, IAM auth for Kafka, a more efficient core dataflow, among many other improvements.
Arroyo is a community project, and we're very grateful to all of our contributors. We are particularly excited to welcome four new contributors to the project in this release:
- @tiagogcampos made their first contribution in #735
- @emef made their first contribution in #759
- @vaibhawvipul made their first contribution in #765
- @ecarrara made their first contribution in #798
Thanks to all of our contributors for this release:
Features
Source metadata
In Arroyo, users write SQL DDL statements (CREATE TABLE
) to define the schema of incoming data from sources. But the underlying data in those sources is in some format or encoding, like JSON or Avro. Arroyo deserializes the data, and maps the fields in the data into the SQL schema so it can be used in queries.
So that works for data. But there are other bits of context that users might want to be able to use as well, not part of the actual data of the message but related metadata. For example, in Kafka a user might want to access the partition of a message, or the offset, or timestamp.
Previously this was not possible, but in 0.13 we have added support for metadata fields in source tables. It looks like this:
create table users (
id TEXT,
name TEXT,
offset BIGINT GENERATED ALWAYS AS (metadata('offset_id')) STORED,
partition INT GENERATED ALWAYS AS (metadata('partition')) STORED
) with (
connector = 'kafka',
...
);
To access metadata fields, you can now define a generated column on the source table that is defined with the special function metadata
. This takes a single string-literal argument that names the metadata field that should be injected into that column.
Initially, source metadata is supported in the Kafka (offset_id
, partition
, topic
, and timestamp
), and MQTT (topic
) connectors.
Thanks to @vaibhawvipul for this incredible contribution!
- Non-data fields - Metadata fields to Kafka Connector by @vaibhawvipul in #765
- Adding metadata to mqtt connector by @vaibhawvipul in #774
- Add timestamp metadata field to kafka connector by @mwylde in #776
RabbitMQ Streams connector
RabbitMQ is a message broker that supports a wide variety of event-processing patterns and protocols. Arroyo has long supported its MQTT mode via our MQTT connector. Now in 0.13 we are adding support for its native
Streams protocol, which adds capabilities around replay and persistence to support at-least-once semantics for processing.
A RabbitMQ source table looks like this
create table stream (
user_id INT,
item TEXT,
price FLOAT,
timestamp TIMESTAMP
) with (
connector = 'rabbitmq',
host = 'localhost',
stream = 'orders',
type = 'source',
format = 'json'
);
See the connector docs for all the details on how to configure and use the source.
Thanks to @ecarrara for contributing this new source to the project!
- Add RabbitMQ Stream source connector by @ecarrara in #798
- RabbitMQ connection testing by @ecarrara in #808
Atomic update outputs
Arroyo supports two forms of streaming SQL, which we call dataflow semantics and update semantics. Update queries are modeled as materialized views which incrementally update as new events come in.
A simple example is a count query:
SELECT count(*) FROM events;
In a batch system, running this query would return the total number of records in the table. But in a streaming system like Arroyo, records come in indefinitely—to produce a result for count we'd need to wait forever. A query engine that never returns a result isn't very useful, so instead we periodically report the result based on the events we've seen so far.
But how will we report this? In this case there's a single value that we're reporting, so we could just emit an updating count, like
count(*) 10
count(*) 20
count(*) 30
...
Our goal is to allow downstream systems to consume these updates and keep track of the current state of the table. For more complex queries that have multiple rows, we need to tell the consumer which row we're updating. There are three kinds of updates we need to be able to handle: creates, updates, and deletes.
For example, for a query like this one
SELECT user_id, count(*) as count
FROM events
GROUP BY user_id
HAVING count < 10;
we would emit a create when first encountering an event for a user_id, updates on subsequent events, and finally a delete once they accumulated more than 10 events.
At least, in theory. In practice, the internal details of our implementation could not actually handle updates. Instead, we modeled updates as a delete followed by a create. This is not only inefficient, but it's also non-atomic: if the system crashes between the delete and the create, the downstream system be left in an inconsistent state.
This is now addressed in 0.13; internally Arroyo can now correctly ingest updates from CDC sources, model them internally, and emit them to sinks. So in 0.12, that first query would return (in the Debezium JSON format we use)
{"before":null,"after":{"count(*)":10},"op":"c"}
{"before":{"count(*)":10},"after":null,"op":"d"}
{"before":null,"after":{"count(*)":20},"op":"c"}
{"before":{"count(*)":20},"after":null,"op":"d"}
{"before":null,"after":{"count(*)":30},"op":"c"}
{"before":{"count(*)":30},"after":null,"op":"d"}
{"before":null,"after":{"count(*)":40},"op":"c"}
while in 0.13 this becomes
{"before":null,"after":{"count(*)":10},"op":"c"}
{"before":{"count(*)":10},"after":{"count(*)":20},"op":"u"}
{"before":{"count(*)":20},"after":{"count(*)":30},"op":"u"}
{"before":{"count(*)":30},"after":{"count(*)":40},"op":"u"}
There is one breaking change associated with this: for Debezium CDC sources, it's now required to mark primary keys so that we can properly apply this logic. It looks like this:
CREATE TABLE debezium_source (
id INT PRIMARY KEY,
customer_id INT,
price FLOAT,
order_date TIMESTAMP,
status TEXT
) WITH (
connector = 'kafka',
format = 'debezium_json',
type = 'source',
...
);
IAM Auth for Kafka
There are many, many ways to authenticate with Kafka. You can use a SASL username and password. You can use an SSL private key. Mutual TLS. The list goes on.
Or if you are using one of AWS's Managed Streaming for Kafka (MSK) products—in particularly its serverless offering—you must authenticate via a bespoke IAM protocol.
Thanks to contributor @emef, this protocol is now supported in Arroyo, simplifying the process of using it with MSK.
To use it, simply specify AWS_MSK_IAM as the authentication protocol and the AWS region:
Or in SQL:
CREATE TABLE msk (
...
) WITH (
connector = 'kafka',
'auth.type' = 'aws_msk_iam',
'auth.region' = 'us-east-1',
...
)
Operator chaining
If you've made it this far, I'm going to assume you're pretty interested in stream processing engines. Or you just scrolled around for a while before landing here. Either way, we're going to get deep into the details
Arroyo 0.13 introduces a new feature into the core dataflow: operator chaining. What is operator chaining? First we need to understand a bit about the physical structure of a stream processing pipeline. A user-supplied SQL query defining a pipeline goes through several stages of transformation, from SQL text, to a logical plan, and finally to a physical dataflow graph.
The dataflow graph is what we actually execute. Each node is an operator, which consumes data, does some (possibly stateful) transformation, and produces an output; operators include sources, sinks, joins, aggregations, and windows.
That's the logical view. In the physical view (what actually gets executed) the graph is parallelized, with multiple copies (which we call subtasks) of each node, handling different sharded portions of the keyspace. Each of those subtasks may also be running on different machines. There are two different ways operators may be connected in the logical dataflow: forward edges mean we connect each physical subtask to the one subtask in the next operator with the same subtask index. Shuffle edges mean we connect every source substask to every target subtask; these are used when we must repartition the data, for example for a GROUP BY.
In Arroyo, these physical edges are queues, which either connect directly to the next subtask in the graph, or to the network stack if the subtasks are running on different machines. The queues provide backpressure and asynchronicity, key elements in making a high-throughput, reliable stream processing engine.
With that background, we can now talk about operator chaining. The basic idea is to remove some of the queues between operators (effectively merging them into a single operator). Why would we want to do this? There are a few reasons. In some engines like Flink, records might have to be serialized or copied onto the queues, so removing the queue avoids that cost. In Arroyo, all data is Arrow which does not need to be serialized (or rather, supports zero-copy serialization/deserialization) and its immutable, columnar nature means that we don't have to move it or copy to send it between nodes.
However, the writing and reading to the queues still adds some overhead. We also statically provision memory to back the queues, so removing queues will greatly reduce memory usage of complex pipelines. Finally, each of the parallel subtasks of the operators have to be managed by the engine's control plane, so there's benefit to reducing the total number of tasks.
So what can we chain? Two operators are chainable if they're connected by a non-shuffle edge (called a forward) in which each subtask of an operator is connected only to the same subtask in the next operator. Both operators must also have the same parallelism. The previous operator must have a single output, and the next operator must also have just a single input (i.e., isn't a join or a union). If all these conditions are met, we can chain the two operators together and eliminate a task and a queue.
This can have a huge effect on the number of tasks in a large complex pipeline:
Chaining is disabled by default in 0.13, but can be enabled by setting the pipeline.chaining.enabled
config option to true
. If you are seeing large memory consumption or would like to reduce controller resource usage for larger pipelines, you may see benefits from enabling it.
Other Improvements
- Update dependencies to address security warnings by @mwylde in #767
- Upgrade to DataFusion 43/Arrow 53.2 by @mwylde in #790
- Add zstd to rdkafka crate by @harshit2283 in #793
- Add kafka connection properties to profile by @emef in #791
- Add pgcrypto ext by @zhuliquan in #795
- Merge sinks by @zhuliquan in #794
- Allow setting environment variables in helm chart by @mwylde in #80
- Plan hop(x, x) to tumble(x) by @mwylde in #809
Fixes
- Properly return a scalar value from a UDF when only input is scalar by @mwylde in #753
- Urlencode subject in confluent schema registry URLs by @emef in #759
- Skip the whole confluent schema header in proto deser by @emef in #763
- Update object_store to pull in GCS token refresh fix by @mwylde in #770
- Fix panic when running pipelines with UDFs in debug mode by @mwylde in #77
- Plan and optimize generating expressions by @mwylde in #778
- Properly plan subqueries with async UDFs by @mwylde in #780
- Don't overwrite existing UDFs on local filesystem by @mwylde in #784
- Don't deadlock the runtime when fetching MSK auth token by @emef in #803
Project infra and tooling
- Add a new
arroyo visualize
command to help debug query plans by @mwylde in #757 - Support record cpu profile by @zhuliquan in #761
- feat: add docker ARG FEATURES for build arroyo by @zhuliquan in #769
- Fix race condition in integ test that was leading to frequent failures by @mwylde in #775
- Add support for UDFs in visualize output by @mwylde in #781
- Use specific python version binary by @mwylde in #783
- Address new clippy warnings in Rust 1.83 by @mwylde in #799
- Finish renaming arroyo-df crate to arroyo-planner by @mwylde in #810
Full Changelog: v0.12.0...v0.13.0