The Cloud Storage CSV to BigQuery pipeline is a batch pipeline that allows you to read CSV files stored in Cloud Storage, and append the result to a BigQuery table. The CSV files can be uncompressed or compressed in formats listed in https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Compression.html.
💡 This is a generated documentation based on Metadata Annotations . Do not change this file directly.
- inputFilePattern : The Cloud Storage path to the CSV file that contains the text to process. (Example: gs://your-bucket/path/*.csv).
- schemaJSONPath : The Cloud Storage path to the JSON file that defines your BigQuery schema.
- outputTable : The name of the BigQuery table that stores your processed data. If you reuse an existing BigQuery table, the data is appended to the destination table.
- bigQueryLoadingTemporaryDirectory : The temporary directory to use during the BigQuery loading process. (Example: gs://your-bucket/your-files/temp_dir).
- badRecordsOutputTable : The name of the BigQuery table to use to store the rejected data when processing the CSV files. If you reuse an existing BigQuery table, the data is appended to the destination table. The schema of this table must match the error table schema (https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-csv-to-bigquery#GcsCSVToBigQueryBadRecordsSchema).
- delimiter : The column delimiter that the CSV file uses. (Example: ,).
- csvFormat : The CSV format according to Apache Commons CSV format. Defaults to: Default.
- containsHeaders : Whether headers are included in the CSV file. Defaults to: false.
- csvFileEncoding : The CSV file character encoding format. Allowed Values are US-ASCII, ISO-8859-1, UTF-8, and UTF-16. Defaults to: UTF-8.
- Java 11
- Maven
- gcloud CLI, and execution of the
following commands:
gcloud auth login
gcloud auth application-default login
🌟 Those dependencies are pre-installed if you use Google Cloud Shell!
This README provides instructions using the Templates Plugin.
This template is a Classic Template, meaning that the pipeline code will be executed only once and the pipeline will be saved to Google Cloud Storage for further reuse. Please check Creating classic Dataflow templates and Running classic templates for more information.
If the plan is to just stage the template (i.e., make it available to use) by
the gcloud
command or Dataflow "Create job from template" UI,
the -PtemplatesStage
profile should be used:
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>
mvn clean package -PtemplatesStage \
-DskipTests \
-DprojectId="$PROJECT" \
-DbucketName="$BUCKET_NAME" \
-DstagePrefix="templates" \
-DtemplateName="GCS_CSV_to_BigQuery" \
-f v1
The -DgcpTempLocation=<temp-bucket-name>
parameter can be specified to set the GCS bucket used by the DataflowRunner to write
temp files to during serialization. The path used will be gs://<temp-bucket-name>/temp/
.
The command should build and save the template to Google Cloud, and then print the complete location on Cloud Storage:
Classic Template was staged! gs://<bucket-name>/templates/GCS_CSV_to_BigQuery
The specific path should be copied as it will be used in the following steps.
Using the staged template:
You can use the path above run the template (or share with others for execution).
To start a job with the template at any time using gcloud
, you are going to
need valid resources for the required parameters.
Provided that, the following command line can be used:
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>
export REGION=us-central1
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/GCS_CSV_to_BigQuery"
### Required
export INPUT_FILE_PATTERN=<inputFilePattern>
export SCHEMA_JSONPATH=<schemaJSONPath>
export OUTPUT_TABLE=<outputTable>
export BIG_QUERY_LOADING_TEMPORARY_DIRECTORY=<bigQueryLoadingTemporaryDirectory>
export BAD_RECORDS_OUTPUT_TABLE=<badRecordsOutputTable>
export DELIMITER=<delimiter>
export CSV_FORMAT=<csvFormat>
### Optional
export CONTAINS_HEADERS=false
export CSV_FILE_ENCODING=UTF-8
gcloud dataflow jobs run "gcs-csv-to-bigquery-job" \
--project "$PROJECT" \
--region "$REGION" \
--gcs-location "$TEMPLATE_SPEC_GCSPATH" \
--parameters "inputFilePattern=$INPUT_FILE_PATTERN" \
--parameters "schemaJSONPath=$SCHEMA_JSONPATH" \
--parameters "outputTable=$OUTPUT_TABLE" \
--parameters "bigQueryLoadingTemporaryDirectory=$BIG_QUERY_LOADING_TEMPORARY_DIRECTORY" \
--parameters "badRecordsOutputTable=$BAD_RECORDS_OUTPUT_TABLE" \
--parameters "containsHeaders=$CONTAINS_HEADERS" \
--parameters "delimiter=$DELIMITER" \
--parameters "csvFormat=$CSV_FORMAT" \
--parameters "csvFileEncoding=$CSV_FILE_ENCODING"
For more information about the command, please check: https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/run
Using the plugin:
Instead of just generating the template in the folder, it is possible to stage and run the template in a single command. This may be useful for testing when changing the templates.
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>
export REGION=us-central1
### Required
export INPUT_FILE_PATTERN=<inputFilePattern>
export SCHEMA_JSONPATH=<schemaJSONPath>
export OUTPUT_TABLE=<outputTable>
export BIG_QUERY_LOADING_TEMPORARY_DIRECTORY=<bigQueryLoadingTemporaryDirectory>
export BAD_RECORDS_OUTPUT_TABLE=<badRecordsOutputTable>
export DELIMITER=<delimiter>
export CSV_FORMAT=<csvFormat>
### Optional
export CONTAINS_HEADERS=false
export CSV_FILE_ENCODING=UTF-8
mvn clean package -PtemplatesRun \
-DskipTests \
-DprojectId="$PROJECT" \
-DbucketName="$BUCKET_NAME" \
-Dregion="$REGION" \
-DjobName="gcs-csv-to-bigquery-job" \
-DtemplateName="GCS_CSV_to_BigQuery" \
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,schemaJSONPath=$SCHEMA_JSONPATH,outputTable=$OUTPUT_TABLE,bigQueryLoadingTemporaryDirectory=$BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,badRecordsOutputTable=$BAD_RECORDS_OUTPUT_TABLE,containsHeaders=$CONTAINS_HEADERS,delimiter=$DELIMITER,csvFormat=$CSV_FORMAT,csvFileEncoding=$CSV_FILE_ENCODING" \
-f v1
Dataflow supports the utilization of Terraform to manage template jobs, see dataflow_job.
Terraform modules have been generated for most templates in this repository. This includes the relevant parameters specific to the template. If available, they may be used instead of dataflow_job directly.
To use the autogenerated module, execute the standard terraform workflow:
cd v1/terraform/GCS_CSV_to_BigQuery
terraform init
terraform apply
To use dataflow_job directly:
provider "google-beta" {
project = var.project
}
variable "project" {
default = "<my-project>"
}
variable "region" {
default = "us-central1"
}
resource "google_dataflow_job" "gcs_csv_to_bigquery" {
provider = google-beta
template_gcs_path = "gs://dataflow-templates-${var.region}/latest/GCS_CSV_to_BigQuery"
name = "gcs-csv-to-bigquery"
region = var.region
temp_gcs_location = "gs://bucket-name-here/temp"
parameters = {
inputFilePattern = "gs://your-bucket/path/*.csv"
schemaJSONPath = "<schemaJSONPath>"
outputTable = "<outputTable>"
bigQueryLoadingTemporaryDirectory = "gs://your-bucket/your-files/temp_dir"
badRecordsOutputTable = "<badRecordsOutputTable>"
delimiter = ","
csvFormat = "<csvFormat>"
# containsHeaders = "false"
# csvFileEncoding = "UTF-8"
}
}