Skip to content

Commit

Permalink
Merge pull request #21467 from dominik-lekse/f-aws-glue-crawler-s3-ev…
Browse files Browse the repository at this point in the history
…ent-notifications

r/aws_glue_crawler Add support for S3 event notifications
  • Loading branch information
ewbankkit authored Oct 25, 2021
2 parents bcbb6e3 + 8d89f67 commit 4a4f77b
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/21467.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_crawler: Add `dlq_event_queue_arn` and `event_queue_arn` arguments to the `s3_target` configuration block
```
37 changes: 34 additions & 3 deletions internal/service/glue/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,25 @@ func ResourceCrawler() *schema.Resource {
Type: schema.TypeString,
Optional: true,
},
"path": {
Type: schema.TypeString,
Required: true,
"dlq_event_queue_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidARN,
},
"event_queue_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidARN,
},
"exclusions": {
Type: schema.TypeList,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"path": {
Type: schema.TypeString,
Required: true,
},
"sample_size": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -304,6 +314,11 @@ func resourceCrawlerCreate(d *schema.ResourceData, meta interface{}) error {
return resource.RetryableError(err)
}

// InvalidInputException: SQS queue arn:aws:sqs:us-west-2:*******:tf-acc-test-4317277351691904203 does not exist or the role provided does not have access to it.
if tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "SQS queue") && tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "does not exist or the role provided does not have access to it") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
Expand Down Expand Up @@ -524,6 +539,14 @@ func expandGlueS3Target(cfg map[string]interface{}) *glue.S3Target {
target.SampleSize = aws.Int64(int64(v.(int)))
}

if v, ok := cfg["event_queue_arn"]; ok {
target.EventQueueArn = aws.String(v.(string))
}

if v, ok := cfg["dlq_event_queue_arn"]; ok {
target.DlqEventQueueArn = aws.String(v.(string))
}

return target
}

Expand Down Expand Up @@ -625,6 +648,11 @@ func resourceCrawlerUpdate(d *schema.ResourceData, meta interface{}) error {
return resource.RetryableError(err)
}

// InvalidInputException: SQS queue arn:aws:sqs:us-west-2:*******:tf-acc-test-4317277351691904203 does not exist or the role provided does not have access to it.
if tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "SQS queue") && tfawserr.ErrMessageContains(err, glue.ErrCodeInvalidInputException, "does not exist or the role provided does not have access to it") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return nil
Expand Down Expand Up @@ -768,6 +796,9 @@ func flattenGlueS3Targets(s3Targets []*glue.S3Target) []map[string]interface{} {
attrs["sample_size"] = aws.Int64Value(s3Target.SampleSize)
}

attrs["event_queue_arn"] = aws.StringValue(s3Target.EventQueueArn)
attrs["dlq_event_queue_arn"] = aws.StringValue(s3Target.DlqEventQueueArn)

result = append(result, attrs)
}
return result
Expand Down
205 changes: 205 additions & 0 deletions internal/service/glue/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,67 @@ func TestAccGlueCrawler_S3Target_exclusions(t *testing.T) {
})
}

func TestAccGlueCrawler_S3Target_eventqueue(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckCrawlerDestroy,
Steps: []resource.TestStep{
{
Config: testAccGlueCrawlerConfig_S3Target_EventQueue(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(resourceName, &crawler),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("crawler/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "s3_target.#", "1"),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.event_queue_arn", "sqs", rName),
resource.TestCheckResourceAttr(resourceName, "recrawl_policy.0.recrawl_behavior", "CRAWL_EVENT_MODE"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccGlueCrawler_S3Target_dlqeventqueue(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckCrawlerDestroy,
Steps: []resource.TestStep{
{
Config: testAccGlueCrawlerConfig_S3Target_DlqEventQueue(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(resourceName, &crawler),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("crawler/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "s3_target.#", "1"),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.event_queue_arn", "sqs", rName),
acctest.CheckResourceAttrRegionalARN(resourceName, "s3_target.0.dlq_event_queue_arn", "sqs", fmt.Sprintf("%sdlq", rName)),
resource.TestCheckResourceAttr(resourceName, "recrawl_policy.0.recrawl_behavior", "CRAWL_EVENT_MODE"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccGlueCrawler_S3Target_multiple(t *testing.T) {
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
Expand Down Expand Up @@ -2099,6 +2160,150 @@ resource "aws_glue_crawler" "test" {
`, rName, exclusion1, exclusion2)
}

func testAccGlueCrawlerConfig_S3Target_EventQueue(rName string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}
resource "aws_s3_bucket" "test" {
bucket = %[1]q
force_destroy = true
}
resource "aws_sqs_queue" "test" {
name = %[1]q
visibility_timeout_seconds = 3600
}
resource "aws_iam_role_policy" "test_sqs" {
role = aws_iam_role.test.name
policy = data.aws_iam_policy_document.role_test_sqs.json
}
data "aws_iam_policy_document" "role_test_sqs" {
statement {
effect = "Allow"
actions = [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:SetQueueAttributes",
"sqs:PurgeQueue",
]
resources = [
aws_sqs_queue.test.arn,
]
}
}
resource "aws_glue_crawler" "test" {
depends_on = [
aws_iam_role_policy_attachment.test-AWSGlueServiceRole,
aws_iam_role_policy.test_sqs,
]
database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name
s3_target {
path = "s3://${aws_s3_bucket.test.bucket}"
event_queue_arn = aws_sqs_queue.test.arn
}
recrawl_policy {
recrawl_behavior = "CRAWL_EVENT_MODE"
}
}
`, rName)
}

func testAccGlueCrawlerConfig_S3Target_DlqEventQueue(rName string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}
resource "aws_s3_bucket" "test" {
bucket = %[1]q
force_destroy = true
}
resource "aws_sqs_queue" "test" {
name = %[1]q
visibility_timeout_seconds = 3600
}
resource "aws_sqs_queue" "test_dlq" {
name = "%[1]sdlq"
visibility_timeout_seconds = 3600
}
resource "aws_iam_role_policy" "test_sqs" {
role = aws_iam_role.test.name
policy = data.aws_iam_policy_document.role_test_sqs.json
}
data "aws_iam_policy_document" "role_test_sqs" {
statement {
effect = "Allow"
actions = [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ListDeadLetterSourceQueues",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes",
"sqs:ListQueueTags",
"sqs:SetQueueAttributes",
"sqs:PurgeQueue",
]
resources = [
aws_sqs_queue.test_dlq.arn,
aws_sqs_queue.test.arn,
]
}
}
resource "aws_glue_crawler" "test" {
depends_on = [
aws_iam_role_policy_attachment.test-AWSGlueServiceRole,
aws_iam_role_policy.test_sqs,
]
database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name
s3_target {
path = "s3://${aws_s3_bucket.test.bucket}"
event_queue_arn = aws_sqs_queue.test.arn
dlq_event_queue_arn = aws_sqs_queue.test_dlq.arn
}
recrawl_policy {
recrawl_behavior = "CRAWL_EVENT_MODE"
}
}
`, rName)
}

func testAccGlueCrawlerConfig_S3Target_Multiple(rName, path1, path2 string) string {
return testAccGlueCrawlerConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_catalog_database" "test" {
Expand Down
4 changes: 3 additions & 1 deletion website/docs/r/glue_crawler.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ The following arguments are supported:
* `path` - (Required) The path to the Amazon S3 target.
* `connection_name` - (Optional) The name of a connection which allows crawler to access data in S3 within a VPC.
* `exclusions` - (Optional) A list of glob patterns used to exclude from the crawl.
* `sample_size` - (Optional) Sets the number of files in each leaf folder to be crawled when crawling sample files in a dataset. If not set, all the files are crawled. A valid value is an integer between 1 and 249.
* `sample_size` - (Optional) Sets the number of files in each leaf folder to be crawled when crawling sample files in a dataset. If not set, all the files are crawled. A valid value is an integer between 1 and 249.
* `event_queue_arn` - (Optional) The ARN of the SQS queue to receive S3 notifications from.
* `dlq_event_queue_arn` - (Optional) The ARN of the dead-letter SQS queue.

### Catalog Target

Expand Down

0 comments on commit 4a4f77b

Please sign in to comment.