Skip to content

Commit

Permalink
Add BigQuery Loader V2
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed May 7, 2024
1 parent 2d5b074 commit 1a692b2
Show file tree
Hide file tree
Showing 21 changed files with 891 additions and 325 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
```mdx-code-block
import Admonition from '@theme/Admonition';
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import LoaderDiagram from '@site/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/_diagram.md';
```

<Tabs groupId="cloud" queryString lazy>
<TabItem value="aws" label="AWS">
<LoaderDiagram {...props} stream="Kinesis" cloud="AWS"/>
</TabItem>
<TabItem value="gcp" label="GCP">
<LoaderDiagram {...props} stream="Pub/Sub" cloud="GCP"/>
</TabItem>
<TabItem value="azure" label="Azure">
<LoaderDiagram {...props} stream="Kafka"cloud="Azure"/>
</TabItem>
</Tabs>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
```mdx-code-block
import {versions} from '@site/src/componentVersions';
import CodeBlock from '@theme/CodeBlock';
```

<p>The BigQuery Loader is published as a Docker image which you can run on any {props.cloud} VM.</p>

<CodeBlock language="bash">{
`docker pull snowplow/bigquery-loader-${props.stream}:${versions.bqLoader}
`}</CodeBlock>

To run the loader, mount your config file into the docker image, and then provide the file path on the command line.

<CodeBlock language="bash">{
`docker run \\
--mount=type=bind,source=/path/to/myconfig,destination=/myconfig \\
snowplow/bigquery-loader-${props.stream}:${versions.bqLoader} \\
--config=/myconfig/loader.hocon \\
--iglu-config /myconfig/iglu.hocon
`}</CodeBlock>

Where `loader.hocon` is loader's [configuration file](/docs/pipeline-components-and-applications/loaders-storage-targets/bigquery-loader/#configuring-the-loader) and `iglu.hocon` is [iglu resolver](/docs/pipeline-components-and-applications/iglu/iglu-resolver/index.md) configuration.
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
At the high level, BigQuery loader reads enriched Snowplow events in real time and loads them in BigQuery using the Storage Write API.

```mermaid
flowchart LR
stream[["<b>Enriched events</b>\n(Pub/Sub stream)"]]
loader{{"<b>BigQuery Loader</b>\n(Loader, Mutator and Repeater apps)"}}
subgraph BigQuery
table[("<b>Events table</b>")]
end
stream-->loader-->BigQuery
```mdx-code-block
import Mermaid from '@theme/Mermaid';
import Link from '@docusaurus/Link';
```

BigQuery loader consists of three applications: Loader, Mutator and Repeater. The following diagram illustrates the interaction between them and BigQuery:
<p>The BigQuery Streaming Loader on {props.cloud} is a fully streaming application that continually pulls events from {props.stream} and writes to BigQuery using the <Link to="https://cloud.google.com/bigquery/docs/reference/storage/libraries#client-libraries-install-java">BigQuery Storage API</Link></p>

```mermaid
sequenceDiagram
loop
Note over Loader: Read a small batch of events
Loader-->>+Mutator: Communicate event types (via Pub/Sub)
Loader->>BigQuery: Send events using the Storage Write API
Mutator-->>-BigQuery: Adjust column types if necessary
Repeater->>BigQuery: Resend events that failed<br/>because columns were not up to date
<Mermaid value={`
flowchart LR
stream[["<b>Enriched Events</b>\n(${props.stream} stream)"]]
loader{{"<b>BigQuery Loader</b>"}}
subgraph bigquery [BigQuery]
table[("<b>Events table</b>")]
end
```
stream-->loader-->|BigQuery Storage API|bigquery
`}/>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<tr>
<td><code>output.good.project</code></td>
<td>Required. The GCP project to which the BigQuery dataset belongs</td>
</tr>
<tr>
<td><code>output.good.dataset</code></td>
<td>Required. The BigQuery dataset to which events will be loaded</td>
</tr>
<tr>
<td><code>output.good.table</code></td>
<td>Optional. Default value <code>events</code>. Name to use for the events table</td>
</tr>
<tr>
<td><code>output.good.credentials</code></td>
<td>Optional. Service account credentials (JSON). If not set, default credentials will be sourced from the usual locations, e.g. file pointed to by the <code>GOOGLE_APPLICATION_CREDENTIALS</code> environment variable </td>
</tr>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
```mdx-code-block
import Link from '@docusaurus/Link';
```

<tr>
<td><code>batching.maxBytes</code></td>
<td>Optional. Default value <code>16000000</code>. Events are emitted to BigQuery when the batch reaches this size in bytes</td>
</tr>
<tr>
<td><code>batching.maxDelay</code></td>
<td>Optional. Default value <code>1 second</code>. Events are emitted to BigQuery after a maximum of this duration, even if the <code>maxBytes</code> size has not been reached</td>
</tr>
<tr>
<td><code>batching.uploadConcurrency</code></td>
<td>Optional. Default value 3. How many batches can we send simultaneously over the network to BigQuery</td>
</tr>
<tr>
<td><code>retries.setupErrors.delay</code></td>
<td>
Optional. Default value <code>30 seconds</code>.
Configures exponential backoff on errors related to how BigQuery is set up for this loader.
Examples include authentication errors and permissions errors.
This class of errors are reported periodically to the monitoring webhook.
</td>
</tr>
<tr>
<td><code>retries.transientErrors.delay</code></td>
<td>
Optional. Default value <code>1 second</code>.
Configures exponential backoff on errors that are likely to be transient.
Examples include server errors and network errors.
</td>
</tr>
<tr>
<td><code>retries.transientErrors.attempts</code></td>
<td>Optional. Default value 5. Maximum number of attempts to make before giving up on a transient error.</td>
</tr>
<tr>
<td><code>skipSchemas</code></td>
<td>Optional, e.g. <code>["iglu:com.example/skipped1/jsonschema/1-0-0"]</code> or with wildcards <code>["iglu:com.example/skipped2/jsonschema/1-*-*"]</code>. A list of schemas that won't be loaded to BigQuery. This feature could be helpful when recovering from edge-case schemas which for some reason cannot be loaded to the table.</td>
</tr>
<tr>
<td><code>monitoring.metrics.statsd.hostname</code></td>
<td>Optional. If set, the loader sends statsd metrics over UDP to a server on this host name.</td>
</tr>
<tr>
<td><code>monitoring.metrics.statsd.port</code></td>
<td>Optional. Default value 8125. If the statsd server is configured, this UDP port is used for sending metrics.</td>
</tr>
<tr>
<td><code>monitoring.metrics.statsd.tags.*</code></td>
<td>Optional. A map of key/value pairs to be sent along with the statsd metric.</td>
</tr>
<tr>
<td><code>monitoring.metrics.statsd.period</code></td>
<td>Optional. Default <code>1 minute</code>. How often to report metrics to statsd.</td>
</tr>
<tr>
<td><code>monitoring.metrics.statsd.prefix</code></td>
<td>Optional. Default <code>snowplow.bigquery-loader</code>. Prefix used for the metric name when sending to statsd.</td>
</tr>
<tr>
<td><code>monitoring.webhook.endpoint</code></td>
<td>Optional, e.g. <code>https://webhook.example.com</code>. The loader will send to the webhook a payload containing details of any error related to how BigQuery is set up for this loader.</td>
</tr>
<tr>
<td><code>monitoring.webhook.tags.*</code></td>
<td>Optional. A map of key/value strings to be included in the payload content sent to the webhook.</td>
</tr>
<tr>
<td><code>sentry.dsn</code></td>
<td>Optional. Set to a Sentry URI to report unexpected runtime exceptions.</td>
</tr>
<tr>
<td><code>sentry.tags.*</code></td>
<td>Optional. A map of key/value strings which are passed as tags when reporting exceptions to Sentry.</td>
</tr>
<tr>
<td><code>telemetry.disable</code></td>
<td>Optional. Set to <code>true</code> to disable <Link to="/docs/getting-started-on-community-edition/telemetry/">telemetry</Link>.</td>
</tr>
<tr>
<td><code>telemetry.userProvidedId</code></td>
<td>Optional. See <Link to="/docs/getting-started-on-community-edition/telemetry/#how-can-i-help">here</Link> for more information.</td>
</tr>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<tr>
<td><code>input.topicName</code></td>
<td>Required. Name of the Kafka topic for the source of enriched events.</td>
</tr>
<tr>
<td><code>input.bootstrapServers</code></td>
<td>Required. Hostname and port of Kafka bootstrap servers hosting the source of enriched events.</td>
</tr>
<tr>
<td><code>input.consumerConf.*</code></td>
<td>Optional. A map of key/value pairs for <a href="https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html" target="_blank">any standard Kafka consumer configuration option</a>.</td>
</tr>
<tr>
<td><code>output.bad.topicName</code></td>
<td>Required. Name of the Kafka topic that will receive failed events.</td>
</tr>
<tr>
<td><code>output.bad.bootstrapServers</code></td>
<td>Required. Hostname and port of Kafka bootstrap servers hosting the bad topic</td>
</tr>
<tr>
<td><code>output.bad.producerConf.*</code></td>
<td>Optional. A map of key/value pairs for <a href="https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html" target="_blank">any standard Kafka producer configuration option</a>.</td>
</tr>
<tr>
<td><code>output.bad.maxRecordSize.*</code></td>
<td>Optional. Default value 1000000. Any single failed event sent to Kafka should not exceed this size in bytes</td>
</tr>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<tr>
<td><code>input.streamName</code></td>
<td>Required. Name of the Kinesis stream with the enriched events</td>
</tr>
<tr>
<td><code>input.appName</code></td>
<td>Optional, default <code>snowplow-bigquery-loader</code>. Name to use for the dynamodb table, used by the underlying Kinesis Consumer Library for managing leases.</td>
</tr>
<tr>
<td><code>input.initialPosition</code></td>
<td>Optional, default <code>LATEST</code>. Allowed values are <code>LATEST</code>, <code>TRIM_HORIZON</code>, <code>AT_TIMESTAMP</code>. When the loader is deployed for the first time, this controls from where in the kinesis stream it should start consuming events. On all subsequent deployments of the loader, the loader will resume from the offsets stored in the DynamoDB table.</td>
</tr>
<tr>
<td><code>input.initialPosition.timestamp</code></td>
<td>Required if <code>input.initialPosition</code> is <code>AT_TIMESTAMP</code>. A timestamp in ISO8601 format from where the loader should start consuming events.</td>
</tr>
<tr>
<td><code>input.retrievalMode</code></td>
<td>Optional, default Polling. Change to FanOut to enable the enhance fan-out feature of Kinesis.</td>
</tr>
<tr>
<td><code>input.retrievalMode.maxRecords</code></td>
<td>Optional. Default value 1000. How many events the Kinesis client may fetch in a single poll. Only used when `input.retrievalMode` is Polling.</td>
</tr>
<tr>
<td><code>input.bufferSize</code></td>
<td>Optional. Default value 1. The number of batches of events which are pre-fetched from kinesis. The default value is known to work well.</td>
</tr>
<tr>
<td><code>output.bad.streamName</code></td>
<td>Required. Name of the Kinesis stream that will receive failed events.</td>
</tr>
<tr>
<td><code>output.bad.throttledBackoffPolicy.minBackoff</code></td>
<td>Optional. Default value <code>100 milliseconds</code>. Initial backoff used to retry sending failed events if we exceed the Kinesis write throughput limits.</td>
</tr>
<tr>
<td><code>output.bad.throttledBackoffPolicy.maxBackoff</code></td>
<td>Optional. Default value <code>1 second</code>. Maximum backoff used to retry sending failed events if we exceed the Kinesis write throughput limits.</td>
</tr>
<tr>
<td><code>output.bad.recordLimit</code></td>
<td>Optional. Default value 500. The maximum number of records we are allowed to send to Kinesis in 1 PutRecords request.</td>
</tr>
<tr>
<td><code>output.bad.byteLimit</code></td>
<td>Optional. Default value 5242880. The maximum number of bytes we are allowed to send to Kinesis in 1 PutRecords request.</td>
</tr>
<tr>
<td><code>output.bad.maxRecordSize.*</code></td>
<td>Optional. Default value 1000000. Any single event failed event sent to Kinesis should not exceed this size in bytes</td>
</tr>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<tr>
<td><code>input.subscription</code></td>
<td>Required, e.g. <code>projects/myproject/subscriptions/snowplow-enriched</code>. Name of the Pub/Sub subscription with the enriched events</td>
</tr>
<tr>
<td><code>input.parallelPullCount</code></td>
<td>Optional. Default value 1. Number of threads used internally by the pubsub client library for fetching events</td>
</tr>
<tr>
<td><code>input.bufferMaxBytes</code></td>
<td>Optional. Default value 10000000. How many bytes can be buffered by the loader app before blocking the pubsub client library from fetching more events. This is a balance between memory usage vs how efficiently the app can operate. The default value works well.</td>
</tr>
<tr>
<td><code>input.maxAckExtensionPeriod</code></td>
<td>Optional. Default value 1 hour. For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event.</td>
</tr>
<tr>
<td><code>input.minDurationPerAckExtension</code></td>
<td>Optional. Default value 60 seconds. Sets min boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library.</td>
</tr>
<tr>
<td><code>input.maxDurationPerAckExtension</code></td>
<td>Optional. Default value 600 seconds. Sets max boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library.</td>
</tr>
<tr>
<td><code>output.bad.topic</code></td>
<td>Required, e.g. <code>projects/myproject/topics/snowplow-bad</code>. Name of the Pub/Sub topic that will receive failed events.</td>
</tr>
<tr>
<td><code>output.bad.batchSize</code></td>
<td>Optional. Default value 1000. Bad events are sent to Pub/Sub in batches not exceeding this count.</td>
</tr>
<tr>
<td><code>output.bad.requestByteThreshold</code></td>
<td>Optional. Default value 1000000. Bad events are sent to Pub/Sub in batches with a total size not exceeding this byte threshold</td>
</tr>
<tr>
<td><code>output.bad.maxRecordSize</code></td>
<td>Optional. Default value 10000000. Any single failed event sent to Pub/Sub should not exceed this size in bytes</td>
</tr>
Loading

0 comments on commit 1a692b2

Please sign in to comment.