Skip to content

Commit

Permalink
add tests and docs for using custom service account in dataflow flex …
Browse files Browse the repository at this point in the history
…template job (#4260) (#2776)

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Dec 11, 2020
1 parent 2a6d7bb commit 27a3905
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/4260.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
dataflow: added documentation about using `parameters` for custom service account and other pipeline options to `google_dataflow_flex_template_job`
```
179 changes: 174 additions & 5 deletions google-beta/resource_dataflow_flex_template_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package google

import (
"fmt"
"strings"
"testing"
"time"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
"google.golang.org/api/compute/v1"
)

func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
Expand All @@ -23,7 +27,7 @@ func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlowFlexTemplateJob_basic(bucket, job),
Config: testAccDataflowFlexTemplateJob_basic(bucket, job),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
),
Expand All @@ -32,18 +36,105 @@ func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
})
}

func TestAccDataflowFlexTemplateJob_withServiceAccount(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
skipIfVcr(t)
t.Parallel()

randStr := randString(t, 10)
bucket := "tf-test-dataflow-gcs-" + randStr
job := "tf-test-dataflow-job-" + randStr
accountId := "tf-test-dataflow-sa" + randStr
zone := "us-central1-b"

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_serviceAccount(bucket, job, accountId, zone),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
testAccDataflowFlexTemplateJobHasServiceAccount(t, "google_dataflow_flex_template_job.big_data", accountId, zone),
),
},
},
})
}

func testAccDataflowFlexTemplateJobHasServiceAccount(t *testing.T, res, expectedId, zone string) resource.TestCheckFunc {
return func(s *terraform.State) error {
instance, err := testAccDataflowFlexTemplateJobGetGeneratedInstance(t, s, res, zone)
if err != nil {
return fmt.Errorf("Error getting dataflow job instance: %s", err)
}
accounts := instance.ServiceAccounts
if len(accounts) != 1 {
return fmt.Errorf("Found multiple service accounts (%d) for dataflow job %q, expected 1", len(accounts), res)
}
actualId := strings.Split(accounts[0].Email, "@")[0]
if expectedId != actualId {
return fmt.Errorf("service account mismatch, expected account ID = %q, actual email = %q", expectedId, accounts[0].Email)
}
return nil
}
}

func testAccDataflowFlexTemplateJobGetGeneratedInstance(t *testing.T, s *terraform.State, res, zone string) (*compute.Instance, error) {
rs, ok := s.RootModule().Resources[res]
if !ok {
return nil, fmt.Errorf("resource %q not in state", res)
}
if rs.Primary.ID == "" {
return nil, fmt.Errorf("resource %q does not have an ID set", res)
}
filter := fmt.Sprintf("labels.goog-dataflow-job-id = %s", rs.Primary.ID)

config := googleProviderConfig(t)

var instance *compute.Instance

err := resource.Retry(1*time.Minute, func() *resource.RetryError {
instances, rerr := config.NewComputeClient(config.userAgent).Instances.
List(config.Project, zone).
Filter(filter).
MaxResults(2).
Do()
if rerr != nil {
return resource.NonRetryableError(rerr)
}
if len(instances.Items) == 0 {
return resource.RetryableError(fmt.Errorf("no instance found for dataflow job %q", rs.Primary.ID))
}
if len(instances.Items) > 1 {
return resource.NonRetryableError(fmt.Errorf("Wrong number of matching instances for dataflow job: %s, %d", rs.Primary.ID, len(instances.Items)))
}
instance = instances.Items[0]
if instance == nil {
return resource.NonRetryableError(fmt.Errorf("invalid instance"))
}
return nil
})
if err != nil {
return nil, err
}
return instance, nil
}

// note: this config creates a job that doesn't actually do anything
func testAccDataflowFlowFlexTemplateJob_basic(bucket, job string) string {
func testAccDataflowFlexTemplateJob_basic(bucket, job string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
force_destroy = true
}
resource "google_storage_bucket_object" "flex_template" {
name = "flex_template.json"
bucket = google_storage_bucket.temp.name
content = <<EOF
name = "flex_template.json"
bucket = google_storage_bucket.temp.name
content = <<EOF
{
"image": "my-image",
"metadata": {
Expand Down Expand Up @@ -87,3 +178,81 @@ resource "google_dataflow_flex_template_job" "big_data" {
}
`, bucket, job)
}

// note: this config creates a job that doesn't actually do anything
func testAccDataflowFlexTemplateJob_serviceAccount(bucket, job, accountId, zone string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
force_destroy = true
}
resource "google_service_account" "dataflow-sa" {
account_id = "%s"
display_name = "DataFlow Service Account"
}
resource "google_storage_bucket_iam_member" "dataflow-gcs" {
bucket = google_storage_bucket.temp.name
role = "roles/storage.objectAdmin"
member = "serviceAccount:${google_service_account.dataflow-sa.email}"
}
resource "google_project_iam_member" "dataflow-worker" {
role = "roles/dataflow.worker"
member = "serviceAccount:${google_service_account.dataflow-sa.email}"
}
resource "google_storage_bucket_object" "flex_template" {
name = "flex_template.json"
bucket = google_storage_bucket.temp.name
content = <<EOF
{
"image": "my-image",
"metadata": {
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
"name": "Streaming Beam SQL",
"parameters": [
{
"helpText": "Pub/Sub subscription to read from.",
"label": "Pub/Sub input subscription.",
"name": "inputSubscription",
"regexes": [
"[-_.a-zA-Z0-9]+"
]
},
{
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"is_optional": true,
"label": "BigQuery output table",
"name": "outputTable",
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
},
"sdkInfo": {
"language": "JAVA"
}
}
EOF
}
resource "google_dataflow_flex_template_job" "big_data" {
name = "%s"
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
on_delete = "cancel"
parameters = {
inputSubscription = "my-subscription"
outputTable = "my-project:my-dataset.my-table"
serviceAccount = google_service_account.dataflow-sa.email
zone = "%s"
}
depends_on = [
google_storage_bucket_iam_member.dataflow-gcs,
google_project_iam_member.dataflow-worker
]
}
`, bucket, accountId, job, zone)
}
13 changes: 8 additions & 5 deletions website/docs/r/dataflow_flex_template_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ resource "google_dataflow_flex_template_job" "big_data_job" {

## Note on "destroy" / "apply"
There are many types of Dataflow jobs. Some Dataflow jobs run constantly,
getting new data from (e.g.) a GCS bucket, and outputting data continuously.
getting new data from (e.g.) a GCS bucket, and outputting data continuously.
Some jobs process a set amount of data then terminate. All jobs can fail while
running due to programming errors or other issues. In this way, Dataflow jobs
are different from most other Terraform / Google resources.

The Dataflow resource is considered 'existing' while it is in a nonterminal
state. If it reaches a terminal state (e.g. 'FAILED', 'COMPLETE',
'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for
'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for
jobs which run continuously, but may surprise users who use this resource for
other kinds of Dataflow jobs.

Expand All @@ -60,22 +60,25 @@ Template.
- - -

* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as
used in the template).
used in the template). Additional [pipeline options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options)
such as `serviceAccount`, `workerMachineType`, etc can be specified here.

* `labels` - (Optional) User labels to be specified for the job. Keys and values
should follow the restrictions specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions)
page. **Note**: This field is marked as deprecated in Terraform as the API does not currently
support adding labels.
support adding labels.
**NOTE**: Google-provided Dataflow templates often provide default labels
that begin with `goog-dataflow-provided`. Unless explicitly set in config, these
labels will be ignored to prevent diffs on re-apply.
labels will be ignored to prevent diffs on re-apply.

* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
deletion during `terraform destroy`. See above note.

* `project` - (Optional) The project in which the resource belongs. If it is not
provided, the provider project is used.

* `region` - (Optional) The region in which the created job should run.

## Attributes Reference
In addition to the arguments listed above, the following computed attributes are exported:

Expand Down

0 comments on commit 27a3905

Please sign in to comment.