-
Notifications
You must be signed in to change notification settings - Fork 9.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
New Resource: aws_athena_database (#1922)
* Make files * wip * implement CRD methods * make docs * Reflect reviews * Reflect 2nd reviews
- Loading branch information
1 parent
e398f97
commit 76052ff
Showing
6 changed files
with
376 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package aws | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/athena" | ||
"github.com/hashicorp/terraform/helper/resource" | ||
"github.com/hashicorp/terraform/helper/schema" | ||
) | ||
|
||
func resourceAwsAthenaDatabase() *schema.Resource { | ||
return &schema.Resource{ | ||
Create: resourceAwsAthenaDatabaseCreate, | ||
Read: resourceAwsAthenaDatabaseRead, | ||
Delete: resourceAwsAthenaDatabaseDelete, | ||
|
||
Schema: map[string]*schema.Schema{ | ||
"name": { | ||
Type: schema.TypeString, | ||
Required: true, | ||
ForceNew: true, | ||
}, | ||
"bucket": { | ||
Type: schema.TypeString, | ||
Required: true, | ||
ForceNew: true, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func resourceAwsAthenaDatabaseCreate(d *schema.ResourceData, meta interface{}) error { | ||
conn := meta.(*AWSClient).athenaconn | ||
|
||
input := &athena.StartQueryExecutionInput{ | ||
QueryString: aws.String(fmt.Sprintf("create database %s;", d.Get("name").(string))), | ||
ResultConfiguration: &athena.ResultConfiguration{ | ||
OutputLocation: aws.String("s3://" + d.Get("bucket").(string)), | ||
}, | ||
} | ||
|
||
resp, err := conn.StartQueryExecution(input) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := executeAndExpectNoRowsWhenCreate(*resp.QueryExecutionId, d, conn); err != nil { | ||
return err | ||
} | ||
d.SetId(d.Get("name").(string)) | ||
return resourceAwsAthenaDatabaseRead(d, meta) | ||
} | ||
|
||
func resourceAwsAthenaDatabaseRead(d *schema.ResourceData, meta interface{}) error { | ||
conn := meta.(*AWSClient).athenaconn | ||
|
||
bucket := d.Get("bucket").(string) | ||
input := &athena.StartQueryExecutionInput{ | ||
QueryString: aws.String(fmt.Sprint("show databases;")), | ||
ResultConfiguration: &athena.ResultConfiguration{ | ||
OutputLocation: aws.String("s3://" + bucket), | ||
}, | ||
} | ||
|
||
resp, err := conn.StartQueryExecution(input) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := executeAndExpectMatchingRow(*resp.QueryExecutionId, d.Get("name").(string), conn); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func resourceAwsAthenaDatabaseDelete(d *schema.ResourceData, meta interface{}) error { | ||
conn := meta.(*AWSClient).athenaconn | ||
|
||
bucket := d.Get("bucket").(string) | ||
input := &athena.StartQueryExecutionInput{ | ||
QueryString: aws.String(fmt.Sprintf("drop database %s;", d.Get("name").(string))), | ||
ResultConfiguration: &athena.ResultConfiguration{ | ||
OutputLocation: aws.String("s3://" + bucket), | ||
}, | ||
} | ||
|
||
resp, err := conn.StartQueryExecution(input) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := executeAndExpectNoRowsWhenDrop(*resp.QueryExecutionId, d, conn); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func executeAndExpectNoRowsWhenCreate(qeid string, d *schema.ResourceData, conn *athena.Athena) error { | ||
rs, err := queryExecutionResult(qeid, conn) | ||
if err != nil { | ||
return err | ||
} | ||
if len(rs.Rows) != 0 { | ||
return fmt.Errorf("[ERROR] Athena create database, unexpected query result: %s", flattenAthenaResultSet(rs)) | ||
} | ||
return nil | ||
} | ||
|
||
func executeAndExpectMatchingRow(qeid string, dbName string, conn *athena.Athena) error { | ||
rs, err := queryExecutionResult(qeid, conn) | ||
if err != nil { | ||
return err | ||
} | ||
for _, row := range rs.Rows { | ||
for _, datum := range row.Data { | ||
if *datum.VarCharValue == dbName { | ||
return nil | ||
} | ||
} | ||
} | ||
return fmt.Errorf("[ERROR] Athena not found database: %s, query result: %s", dbName, flattenAthenaResultSet(rs)) | ||
} | ||
|
||
func executeAndExpectNoRowsWhenDrop(qeid string, d *schema.ResourceData, conn *athena.Athena) error { | ||
rs, err := queryExecutionResult(qeid, conn) | ||
if err != nil { | ||
return err | ||
} | ||
if len(rs.Rows) != 0 { | ||
return fmt.Errorf("[ERROR] Athena drop database, unexpected query result: %s", flattenAthenaResultSet(rs)) | ||
} | ||
return nil | ||
} | ||
|
||
func queryExecutionResult(qeid string, conn *athena.Athena) (*athena.ResultSet, error) { | ||
executionStateConf := &resource.StateChangeConf{ | ||
Pending: []string{athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning}, | ||
Target: []string{athena.QueryExecutionStateSucceeded}, | ||
Refresh: queryExecutionStateRefreshFunc(qeid, conn), | ||
Timeout: 10 * time.Minute, | ||
Delay: 3 * time.Second, | ||
MinTimeout: 3 * time.Second, | ||
} | ||
_, err := executionStateConf.WaitForState() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
qrinput := &athena.GetQueryResultsInput{ | ||
QueryExecutionId: aws.String(qeid), | ||
} | ||
resp, err := conn.GetQueryResults(qrinput) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return resp.ResultSet, nil | ||
} | ||
|
||
func queryExecutionStateRefreshFunc(qeid string, conn *athena.Athena) resource.StateRefreshFunc { | ||
return func() (interface{}, string, error) { | ||
input := &athena.GetQueryExecutionInput{ | ||
QueryExecutionId: aws.String(qeid), | ||
} | ||
out, err := conn.GetQueryExecution(input) | ||
if err != nil { | ||
return nil, "failed", err | ||
} | ||
return out, *out.QueryExecution.Status.State, nil | ||
} | ||
} | ||
|
||
func flattenAthenaResultSet(rs *athena.ResultSet) string { | ||
ss := make([]string, 0) | ||
for _, row := range rs.Rows { | ||
for _, datum := range row.Data { | ||
ss = append(ss, *datum.VarCharValue) | ||
} | ||
} | ||
return strings.Join(ss, "\n") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package aws | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/athena" | ||
"github.com/aws/aws-sdk-go/service/s3" | ||
"github.com/hashicorp/terraform/helper/acctest" | ||
"github.com/hashicorp/terraform/helper/resource" | ||
"github.com/hashicorp/terraform/terraform" | ||
) | ||
|
||
func TestAccAWSAthenaDatabase_basic(t *testing.T) { | ||
rInt := acctest.RandInt() | ||
dbName := acctest.RandString(8) | ||
resource.Test(t, resource.TestCase{ | ||
PreCheck: func() { testAccPreCheck(t) }, | ||
Providers: testAccProviders, | ||
CheckDestroy: testAccCheckAWSAthenaDatabaseDestroy, | ||
Steps: []resource.TestStep{ | ||
{ | ||
Config: testAccAthenaDatabaseConfig(rInt, dbName), | ||
Check: resource.ComposeTestCheckFunc( | ||
testAccCheckAWSAthenaDatabaseExists("aws_athena_database.hoge"), | ||
), | ||
}, | ||
}, | ||
}) | ||
} | ||
|
||
// StartQueryExecution requires OutputLocation but terraform destroy deleted S3 bucket as well. | ||
// So temporary S3 bucket as OutputLocation is created to confirm whether the database is acctually deleted. | ||
func testAccCheckAWSAthenaDatabaseDestroy(s *terraform.State) error { | ||
athenaconn := testAccProvider.Meta().(*AWSClient).athenaconn | ||
s3conn := testAccProvider.Meta().(*AWSClient).s3conn | ||
for _, rs := range s.RootModule().Resources { | ||
if rs.Type != "aws_athena_database" { | ||
continue | ||
} | ||
|
||
rInt := acctest.RandInt() | ||
bucketName := fmt.Sprintf("tf-athena-db-%s-%d", rs.Primary.Attributes["name"], rInt) | ||
_, err := s3conn.CreateBucket(&s3.CreateBucketInput{ | ||
Bucket: aws.String(bucketName), | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
input := &athena.StartQueryExecutionInput{ | ||
QueryString: aws.String(fmt.Sprint("show databases;")), | ||
ResultConfiguration: &athena.ResultConfiguration{ | ||
OutputLocation: aws.String("s3://" + bucketName), | ||
}, | ||
} | ||
|
||
resp, err := athenaconn.StartQueryExecution(input) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
ers, err := queryExecutionResult(*resp.QueryExecutionId, athenaconn) | ||
if err != nil { | ||
return err | ||
} | ||
found := false | ||
dbName := rs.Primary.Attributes["name"] | ||
for _, row := range ers.Rows { | ||
for _, datum := range row.Data { | ||
if *datum.VarCharValue == dbName { | ||
found = true | ||
} | ||
} | ||
} | ||
if found { | ||
return fmt.Errorf("[DELETE ERROR] Athena failed to drop database: %s", dbName) | ||
} | ||
|
||
loresp, err := s3conn.ListObjectsV2( | ||
&s3.ListObjectsV2Input{ | ||
Bucket: aws.String(bucketName), | ||
}, | ||
) | ||
if err != nil { | ||
return fmt.Errorf("[DELETE ERROR] S3 Bucket list Objects err: %s", err) | ||
} | ||
|
||
objectsToDelete := make([]*s3.ObjectIdentifier, 0) | ||
|
||
if len(loresp.Contents) != 0 { | ||
for _, v := range loresp.Contents { | ||
objectsToDelete = append(objectsToDelete, &s3.ObjectIdentifier{ | ||
Key: v.Key, | ||
}) | ||
} | ||
} | ||
|
||
_, err = s3conn.DeleteObjects(&s3.DeleteObjectsInput{ | ||
Bucket: aws.String(bucketName), | ||
Delete: &s3.Delete{ | ||
Objects: objectsToDelete, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Objects err: %s", err) | ||
} | ||
|
||
_, err = s3conn.DeleteBucket(&s3.DeleteBucketInput{ | ||
Bucket: aws.String(bucketName), | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("[DELETE ERROR] S3 Bucket delete Bucket err: %s", err) | ||
} | ||
|
||
} | ||
return nil | ||
} | ||
|
||
func testAccCheckAWSAthenaDatabaseExists(name string) resource.TestCheckFunc { | ||
return func(s *terraform.State) error { | ||
_, ok := s.RootModule().Resources[name] | ||
if !ok { | ||
return fmt.Errorf("Not found: %s, %v", name, s.RootModule().Resources) | ||
} | ||
return nil | ||
} | ||
} | ||
|
||
func testAccAthenaDatabaseConfig(randInt int, dbName string) string { | ||
return fmt.Sprintf(` | ||
resource "aws_s3_bucket" "hoge" { | ||
bucket = "tf-athena-db-%s-%d" | ||
force_destroy = true | ||
} | ||
resource "aws_athena_database" "hoge" { | ||
name = "%s" | ||
bucket = "${aws_s3_bucket.hoge.bucket}" | ||
} | ||
`, dbName, randInt, dbName) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.