Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[faker] decouple stream state #20492

Merged
merged 16 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@
- name: Sample Data (Faker)
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 1.0.0
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
sourceType: api
releaseStage: alpha
Expand Down
13 changes: 11 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3866,7 +3866,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-faker:1.0.0"
- dockerImage: "airbyte/source-faker:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
connectionSpecification:
Expand Down Expand Up @@ -3907,8 +3907,17 @@
\ before a state message is emitted?"
type: "integer"
minimum: 1
default: 100
default: 1000
order: 3
parallelism:
title: "Parallelism"
description: "How many parallel workers should we use to generate fake data?\
\ Choose a value equal to the number of CPUs you will allocate to this\
\ source."
type: "integer"
minimum: 1
default: 4
order: 4
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ acceptance_tests:
tests:
- config_path: secrets/config.json
backward_compatibility_tests_config:
disable_for_version: "0.2.1"
disable_for_version: "1.0.0" # We changed the cursor field of the Purchases stream in 2.0.0
basic_read:
tests:
- config_path: secrets/config.json
Expand Down
76 changes: 76 additions & 0 deletions airbyte-integrations/connectors/source-faker/csv_export/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Python Source CSV Export

This collection of tools is used to run the source and capture it's AirbyteMessages and convert them into CSV files. This is useful if you want to manually inspect this data or load it into a database manually.

To be fast, we make use of parallel processing per-stream and only using command-line tools. This works by the main file (`main.sh`) running the source via python and tee-ing the output of RECORDS to sub-scripts which use `jq` to convert the records into CSV-delimited output, which we finally write to disk.

As we read the connector config files, e.g. `--config secrets/config.json --state secrets/state.json --catalog integration_tests/configured_catalog.json`, you can manually step forward your sync if you need to read and store the input in chunks.

## The road to 1TB of faker data

There's commentary on this at https://github.com/airbytehq/airbyte/pull/20558, along with some cool SQL tricks.

- 2 Billion faker users for 1TB: `10,000,000*(1024/5.02) = 2,039,840,637`
- 200 Million faker users for 100GB: `10,000,000*(100/5.02) = 199,203,187`
- 20 Million faker users for 10GB: `10,000,000*(10/5.02) = 19,920,318`

But let's assume we don't have 1TB of local hard disk. So, we want to make 10 chunks of data, each around 100GB in size.

**`config.json`**

```json
{
"count": 2039840637,
"seed": 0,
"records_per_sync": 203984064
}
```

**`state.json`**

At the end of every sync, increment the `id` in the users stream and the `user_id` in the purchases stream by `203984064`, the `records_per_sync` chunk size

```json
[
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0
},
"stream_descriptor": {
"name": "users"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0,
"user_id": 0
},
"stream_descriptor": {
"name": "purchases"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0
},
"stream_descriptor": {
"name": "products"
}
}
}
]
```

Finally, ensure that you've opted-into all the streams in `integration_tests/configured_catalog.json`

## TODO

- This is currently set up very manually, in that we build bash scripts for each stream and manually populate the header information. This information all already lives in the connector's catalog. We probably could build these bash files on-demand with a python script...
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"
cd ".."

mkdir -p /tmp/csv

python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state secrets/state.json \
| tee >(./csv_export/purchases.sh) >(./csv_export/products.sh) >(./csv_export/users.sh)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/products.csv"

rm -rf $FILE

echo "make, model, price, created_at" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("products"))) .record.data' \
| jq -r '[.make, .model, .price, .created_at] | @csv' \
>> $FILE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/purchases.csv"

rm -rf $FILE

echo "id, product_id, user_id, added_to_cart_at, purchased_at, returned_at" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("purchases"))) .record.data' \
| jq -r '[.id, .product_id, .user_id, .added_to_cart_at, .purchased_at, .returned_at] | @csv' \
>> $FILE
13 changes: 13 additions & 0 deletions airbyte-integrations/connectors/source-faker/csv_export/users.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/users.csv"

rm -rf $FILE

echo "id, created_at, updated_at, name, title, age, email, telephone, gender, language, academic_degree, nationality, occupation, height, blood_type, weight" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("users"))) .record.data' \
| jq -r '[.id, .created_at, .updated_at, .name, .title, .age, .email, .telephone, .gender, .language, .academic_degree, .nationality, .occupation, .height, .blood_type, .weight] | @csv' \
>> $FILE
Loading