diff --git a/ingestion/ingest-additional-fields-with-include-clause.mdx b/ingestion/ingest-additional-fields-with-include-clause.mdx index a7891ac2..b90947ec 100644 --- a/ingestion/ingest-additional-fields-with-include-clause.mdx +++ b/ingestion/ingest-additional-fields-with-include-clause.mdx @@ -9,7 +9,7 @@ sidebarTitle: "Ingest additional source fields" To add additional columns, use the `INCLUDE` clause. ```sql -INCLUDE { header | key | offset | partition | timestamp } [AS ] +INCLUDE { header | key | offset | partition | timestamp | payload } [AS ] ``` If `` is not specified, a default one will be generated in the format `_rw_{connector}_{col}`, where `connector` is the name of the source connector used (Kafka, Pulsar, Kinesis, etc.), and `col` is the type of column being generated (key, offset, timestamp, etc.). For instance, if an offset column is added to a Kafka source, the default column name would be `_rw_kafka_offset`. @@ -43,6 +43,7 @@ When ingesting data from Kafka, the following additional fields can be included. | partition | `varchar` | The partition the message is from. | | offset | `varchar` | The offset in the partition. | | headers | `struct[]` | Key-value pairs along with the message. | +| payload | `json` | The actual content or data of the message. Only supports `JSON` format. | In the case of headers, there are two ways to define it. @@ -64,10 +65,11 @@ When ingesting data from Kinesis, here are some things to note when including th | Allowed components | Default type | Note | | :----------------- | :---------------- | :-------------- | -| key | bytea | Can be overwritten by encode and key encode. | -| timestamp | timestamp with time zone | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). | -| partition | varchar | The partition the message is from. | -| offset | varchar | The offset in the partition, which corresponds to Kinesis sequence numbers. | +| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. | +| timestamp | `timestamp with time zone` | See the approximate\_arrival\_timestamp field at [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). | +| partition | `varchar` | The partition the message is from. | +| offset | `varchar` | The offset in the partition, which corresponds to Kinesis sequence numbers. | +| payload | `json` | The actual content or data of the message. Only supports `JSON` format. | For more components, see [Struct aws\_sdk\_kinesis::types::Record](https://docs.rs/aws-sdk-kinesis/latest/aws%5Fsdk%5Fkinesis/types/struct.Record.html). @@ -77,20 +79,22 @@ When ingesting data from Pulsar, here are some things to note when including the | Allowed components | Default type | Note | | :----------------- | :----------- | :------------------------------------------- | -| key | bytea | Can be overwritten by ENCODE and KEY ENCODE. | -| partition | varchar | The partition the message is from. | -| offset | varchar | The offset in the partition. | +| key | `bytea` | Can be overwritten by `ENCODE` and `KEY ENCODE`. | +| partition | `varchar` | The partition the message is from. | +| offset | `varchar` | The offset in the partition. | +| payload | `json` | The actual content or data of the message. Only supports `JSON` format. | For more components, see [Struct pulsar::message::proto::MessageMetadata](https://docs.rs/pulsar/latest/pulsar/message/proto/struct.MessageMetadata.html). -### S3 and GCS +### S3, GCS, and Azure Blob -When ingesting data from AWS S3 or GCS, the following additional fields can be included. +When ingesting data from AWS S3, GCS or Azure Blob, the following additional fields can be included. | Allowed components | Default type | Note | | :----------------- | :----------- | :--------------------------- | -| file | varchar | The file the record is from. | -| offset | varchar | The offset in the file. | +| file | `varchar` | The file the record is from. | +| offset | `varchar` | The offset in the file. | +| payload | `json` | The actual content or data of the message. Only supports `JSON` format. | ### MQTT @@ -102,9 +106,9 @@ When ingesting data from MQTT, the following additional fields can be included. ## Examples -Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, and `header`, are also added to the table. +Here we create a table, `additional_columns`, that ingests data from a Kafka broker. Aside from the `a` column, which is part of the message payload, the additional fields `key`, `partition`, `offset`, `timestamp`, `header`, and `payload` are also added to the table. -```js +```sql CREATE TABLE additional_columns ( a int, primary key (key_col) @@ -114,6 +118,7 @@ INCLUDE partition AS partition_col INCLUDE offset AS offset_col INCLUDE timestamp AS timestamp_col INCLUDE header AS header_col +INCLUDE payload AS payload_col WITH ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092',