Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update include payload syntax #125

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions ingestion/ingest-additional-fields-with-include-clause.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sidebarTitle: "Ingest additional source fields"
To add additional columns, use the `INCLUDE` clause.

```sql
INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]
INCLUDE { header | key | offset | partition | timestamp | payload } [AS <column_name>]
```

If `<column_name>` is not specified, a default one will be generated in the format `_rw_{connector}_{col}`, where `connector` is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and `col` is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be `_rw_kafka_offset`.
Expand Down Expand Up @@ -43,6 +43,7 @@ When ingesting data from Kafka, the following additional fields can be included.
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition. |
| headers | `struct<varchar, bytea>[]` | Key-value pairs along with the message. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

In the case of headers, there are two ways to define it.

Expand All @@ -64,10 +65,11 @@ When ingesting data from Kinesis, here are some things to note when including th

| Allowed components | Default type | Note |
| :----------------- | :---------------- | :-------------- |
| key | bytea | Can be overwritten by encode and key encode. |
| timestamp | timestamp with time zone | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). |
| partition | varchar | The partition the message is from. |
| offset | varchar | The offset in the partition, which corresponds to Kinesis sequence numbers. |
| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. |
| timestamp | `timestamp with time zone` | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). |
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition, which corresponds to Kinesis sequence numbers. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

For more components, see [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html).

Expand All @@ -77,20 +79,22 @@ When ingesting data from Pulsar, here are some things to note when including the

| Allowed components | Default type | Note |
| :----------------- | :----------- | :------------------------------------------- |
| key | bytea | Can be overwritten by ENCODE and KEY ENCODE. |
| partition | varchar | The partition the message is from. |
| offset | varchar | The offset in the partition. |
| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. |
| partition | `varchar` | The partition the message is from. |
| offset | `varchar` | The offset in the partition. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

For more components, see [Struct pulsar::message::proto::MessageMetadata](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html).

### S3 and GCS
### S3, GCS, and Azure Blob

When ingesting data from AWS S3 or GCS, the following additional fields can be included.
When ingesting data from AWS S3, GCS or Azure Blob, the following additional fields can be included.

| Allowed components | Default type | Note |
| :----------------- | :----------- | :--------------------------- |
| file | varchar | The file the record is from. |
| offset | varchar | The offset in the file. |
| file | `varchar` | The file the record is from. |
| offset | `varchar` | The offset in the file. |
| payload | `json` | The actual content or data of the message. Only supports `JSON` format. |

### MQTT

Expand All @@ -102,9 +106,9 @@ When ingesting data from MQTT, the following additional fields can be included.

## Examples

Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, and `header`, are also added to the table.
Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, `header`, and `payload` are also added to the table.

```js
```sql
CREATE TABLE additional_columns (
a int,
primary key (key_col)
Expand All @@ -114,6 +118,7 @@ INCLUDE partition AS partition_col
INCLUDE offset AS offset_col
INCLUDE timestamp AS timestamp_col
INCLUDE header AS header_col
INCLUDE payload AS payload_col
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down
Loading