Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Dms task lifecycle #2305

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion aws/resource_aws_dms_replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func resourceAwsDmsReplicationTask() *schema.Resource {
ForceNew: true,
ValidateFunc: validateArn,
},
"handle_task_lifecycle": {
Type: schema.TypeBool,
Optional: true,
},
"table_mappings": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -138,7 +142,21 @@ func resourceAwsDmsReplicationTaskCreate(d *schema.ResourceData, meta interface{
return err
}

return resourceAwsDmsReplicationTaskRead(d, meta)
// so we can get replication task arn
err = resourceAwsDmsReplicationTaskRead(d, meta)
if err != nil {
return err
}

// start the task if required
if d.Get("handle_task_lifecycle").(bool) {
err = resourceAwsDmsReplicationTaskStart(d, meta)
if err != nil {
return err
}
}

return nil
}

func resourceAwsDmsReplicationTaskRead(d *schema.ResourceData, meta interface{}) error {
Expand Down Expand Up @@ -219,6 +237,14 @@ func resourceAwsDmsReplicationTaskUpdate(d *schema.ResourceData, meta interface{
if hasChanges {
log.Println("[DEBUG] DMS update replication task:", request)

// stop the task if required
if d.Get("handle_task_lifecycle").(bool) {
err := resourceAwsDmsReplicationTaskStop(d, meta)
if err != nil {
return err
}
}

_, err := conn.ModifyReplicationTask(request)
if err != nil {
return err
Expand All @@ -239,6 +265,14 @@ func resourceAwsDmsReplicationTaskUpdate(d *schema.ResourceData, meta interface{
return err
}

// resume the task if required
if d.Get("handle_task_lifecycle").(bool) {
err := resourceAwsDmsReplicationTaskStart(d, meta)
if err != nil {
return err
}
}

return resourceAwsDmsReplicationTaskRead(d, meta)
}

Expand All @@ -248,6 +282,14 @@ func resourceAwsDmsReplicationTaskUpdate(d *schema.ResourceData, meta interface{
func resourceAwsDmsReplicationTaskDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).dmsconn

// stop the task if required
if d.Get("handle_task_lifecycle").(bool) {
err := resourceAwsDmsReplicationTaskStop(d, meta)
if err != nil {
return err
}
}

request := &dms.DeleteReplicationTaskInput{
ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)),
}
Expand Down Expand Up @@ -329,3 +371,96 @@ func resourceAwsDmsReplicationTaskStateRefreshFunc(
return v, *v.ReplicationTasks[0].Status, nil
}
}

func resourceAwsDmsReplicationTaskStop(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).dmsconn

log.Println("[DEBUG] DMS Stopping replication task:", d.Get("replication_task_arn").(string))

request := &dms.StopReplicationTaskInput{
ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)),
}
_, err := conn.StopReplicationTask(request)
if err != nil {
if dmserr, ok := err.(awserr.Error); ok {
if dmserr.Code() == "ResourceNotFoundFault" {
log.Printf("[DEBUG] DMS Replication Task %q Not Found", d.Id())
d.SetId("")
return nil
} else if dmserr.Code() != "InvalidResourceStateFault" {
return err
}
} else {
return err
}
}

stateConf := &resource.StateChangeConf{
Pending: []string{"stopping"},
Target: []string{"stopped", "failed", "ready"},
Refresh: resourceAwsDmsReplicationTaskStateRefreshFunc(d, meta),
Timeout: d.Timeout(schema.TimeoutCreate),
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}

// Wait, catching any errors
_, err = stateConf.WaitForState()
if err != nil {
return err
}

return nil
}

func resourceAwsDmsReplicationTaskStart(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).dmsconn

log.Println("[DEBUG] DMS Resuming replication task:", d.Get("replication_task_arn").(string))

request := &dms.StartReplicationTaskInput{
ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)),
StartReplicationTaskType: aws.String("resume-processing"),
}

// append start time if set
if d.Get("cdc_start_time").(string) != "" {
seconds, err := strconv.ParseInt(d.Get("cdc_start_time").(string), 10, 64)
if err != nil {
return fmt.Errorf("[ERROR] DMS update replication task. Invalid CRC Unix timestamp: %s", err)
}
request.CdcStartTime = aws.Time(time.Unix(seconds, 0))
}

_, err := conn.StartReplicationTask(request)
if err != nil {
if dmserr, ok := err.(awserr.Error); ok {
if dmserr.Code() == "ResourceNotFoundFault" {
log.Printf("[DEBUG] DMS Replication Task %q Not Found", d.Id())
d.SetId("")
return nil
} else if dmserr.Code() != "InvalidResourceStateFault" {
return err
}
} else {
return err
}
}

stateConf := &resource.StateChangeConf{
Pending: []string{"starting"},
Target: []string{"running", "failed", "errored"},
Refresh: resourceAwsDmsReplicationTaskStateRefreshFunc(d, meta),
Timeout: d.Timeout(schema.TimeoutCreate),
MinTimeout: 10 * time.Second,
Delay: 30 * time.Second, // Wait 30 secs before starting
}

// Wait, catching any errors
_, err = stateConf.WaitForState()
if err != nil {
return err
}

return nil
}
Loading