diff --git a/alicloud/resource_alicloud_alikafka_instance.go b/alicloud/resource_alicloud_alikafka_instance.go index 3cbc28d8de6c..ddeb7029a77e 100644 --- a/alicloud/resource_alicloud_alikafka_instance.go +++ b/alicloud/resource_alicloud_alikafka_instance.go @@ -3,6 +3,7 @@ package alicloud import ( "errors" "fmt" + "log" "strconv" "time" @@ -48,7 +49,7 @@ func resourceAliCloudAlikafkaInstance() *schema.Resource { "deploy_type": { Type: schema.TypeInt, Required: true, - ValidateFunc: validation.IntInSlice([]int{4, 5}), + ValidateFunc: IntInSlice([]int{4, 5}), }, "partition_num": { Type: schema.TypeInt, @@ -84,12 +85,12 @@ func resourceAliCloudAlikafkaInstance() *schema.Resource { Type: schema.TypeString, Optional: true, Computed: true, - ValidateFunc: validation.StringLenBetween(3, 64), + ValidateFunc: StringLenBetween(3, 64), }, "paid_type": { Type: schema.TypeString, Optional: true, - ValidateFunc: validation.StringInSlice([]string{string(common.PrePaid), string(common.PostPaid)}, false), + ValidateFunc: StringInSlice([]string{string(common.PrePaid), string(common.PostPaid)}, false), Default: PostPaid, }, "spec_type": { @@ -113,8 +114,8 @@ func resourceAliCloudAlikafkaInstance() *schema.Resource { "security_group": { Type: schema.TypeString, Optional: true, - Computed: true, ForceNew: true, + Computed: true, }, "service_version": { Type: schema.TypeString, @@ -136,14 +137,35 @@ func resourceAliCloudAlikafkaInstance() *schema.Resource { "vpc_id": { Type: schema.TypeString, Optional: true, - Computed: true, ForceNew: true, + Computed: true, }, "zone_id": { Type: schema.TypeString, Optional: true, - Computed: true, ForceNew: true, + Computed: true, + }, + "enable_auto_group": { + Type: schema.TypeBool, + Optional: true, + }, + "enable_auto_topic": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateFunc: StringInSlice([]string{"enable", "disable"}, false), + }, + "default_topic_partition_num": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + }, + "vswitch_ids": { + Type: schema.TypeList, + Optional: true, + Computed: true, + Elem: &schema.Schema{Type: schema.TypeString}, }, "selected_zones": { Type: schema.TypeList, @@ -267,9 +289,11 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa } } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutCreate)), func() *resource.RetryError { - createOrderResponse, err = conn.DoRequest(StringPointer(createOrderAction), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, createOrderReq, &util.RuntimeOptions{}) + createOrderResponse, err = conn.DoRequest(StringPointer(createOrderAction), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, createOrderReq, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL", "ONS_SYSTEM_ERROR"}) || NeedRetry(err) { wait() @@ -280,9 +304,11 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa return nil }) addDebug(createOrderAction, createOrderResponse, createOrderReq) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, "alicloud_alikafka_instance", createOrderAction, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(createOrderResponse["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", createOrderAction, createOrderResponse)) } @@ -291,12 +317,17 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa if err != nil { return WrapError(err) } + d.SetId(fmt.Sprint(alikafkaInstanceVO["InstanceId"])) // 2. Start instance startInstanceAction := "StartInstance" startInstanceResponse := make(map[string]interface{}) startInstanceReq := make(map[string]interface{}) + startInstanceReq["RegionId"] = client.RegionId + startInstanceReq["InstanceId"] = alikafkaInstanceVO["InstanceId"] + startInstanceReq["VSwitchId"] = d.Get("vswitch_id") + if v, ok := d.GetOk("vpc_id"); ok { startInstanceReq["VpcId"] = v } @@ -305,50 +336,52 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa startInstanceReq["ZoneId"] = v } - if v, ok := d.GetOk("vswitch_id"); ok { - startInstanceReq["VSwitchId"] = v - } - - if (startInstanceReq["ZoneId"] == nil || startInstanceReq["VpcId"] == nil) && startInstanceReq["VSwitchId"] != nil { + if startInstanceReq["VpcId"] == nil { vsw, err := vpcService.DescribeVswitch(startInstanceReq["VSwitchId"].(string)) if err != nil { return WrapError(err) } + if v, ok := startInstanceReq["VpcId"].(string); !ok || v == "" { startInstanceReq["VpcId"] = vsw["VpcId"] } - if v, ok := startInstanceReq["ZoneId"].(string); !ok || v == "" { - startInstanceReq["ZoneId"] = vsw["ZoneId"] - } } - startInstanceReq["RegionId"] = client.RegionId - startInstanceReq["InstanceId"] = alikafkaInstanceVO["InstanceId"] + if v, ok := d.GetOk("vswitch_ids"); ok { + startInstanceReq["VSwitchIds"] = v + } + if _, ok := d.GetOkExists("eip_max"); ok { startInstanceReq["DeployModule"] = "eip" startInstanceReq["IsEipInner"] = true } + if v, ok := d.GetOk("name"); ok { startInstanceReq["Name"] = v } + if v, ok := d.GetOk("security_group"); ok { startInstanceReq["SecurityGroup"] = v } + if v, ok := d.GetOk("service_version"); ok { startInstanceReq["ServiceVersion"] = v } + if v, ok := d.GetOk("config"); ok { startInstanceReq["Config"] = v } + if v, ok := d.GetOk("kms_key_id"); ok { startInstanceReq["KMSKeyId"] = v } + if v, ok := d.GetOk("selected_zones"); ok { startInstanceReq["SelectedZones"] = formatSelectedZonesReq(v.([]interface{})) } err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutCreate)), func() *resource.RetryError { - startInstanceResponse, err = conn.DoRequest(StringPointer(startInstanceAction), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, startInstanceReq, &util.RuntimeOptions{}) + startInstanceResponse, err = conn.DoRequest(StringPointer(startInstanceAction), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, startInstanceReq, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -359,9 +392,11 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa return nil }) addDebug(startInstanceAction, startInstanceResponse, startInstanceReq) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, "alicloud_alikafka_instance", startInstanceAction, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(startInstanceResponse["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", startInstanceAction, startInstanceResponse)) } @@ -378,10 +413,12 @@ func resourceAliCloudAlikafkaInstanceCreate(d *schema.ResourceData, meta interfa func resourceAliCloudAlikafkaInstanceRead(d *schema.ResourceData, meta interface{}) error { client := meta.(*connectivity.AliyunClient) alikafkaService := AlikafkaService{client} + object, err := alikafkaService.DescribeAliKafkaInstance(d.Id()) if err != nil { // Handle exceptions if !d.IsNewResource() && NotFoundError(err) { + log.Printf("[DEBUG] Resource alicloud_ecp_instance alikafkaService.DescribeAliKafkaInstance Failed!!! %s", err) d.SetId("") return nil } @@ -393,6 +430,7 @@ func resourceAliCloudAlikafkaInstanceRead(d *schema.ResourceData, meta interface d.Set("disk_size", object["DiskSize"]) d.Set("deploy_type", object["DeployType"]) d.Set("io_max", object["IoMax"]) + d.Set("io_max_spec", object["IoMaxSpec"]) d.Set("eip_max", object["EipMax"]) d.Set("resource_group_id", object["ResourceGroupId"]) d.Set("vpc_id", object["VpcId"]) @@ -411,6 +449,18 @@ func resourceAliCloudAlikafkaInstanceRead(d *schema.ResourceData, meta interface d.Set("service_version", object["UpgradeServiceDetailInfo"].(map[string]interface{})["Current2OpenSourceVersion"]) d.Set("config", object["AllConfig"]) d.Set("kms_key_id", object["KmsKeyId"]) + d.Set("enable_auto_group", object["AutoCreateGroupEnable"]) + d.Set("enable_auto_topic", convertAliKafkaAutoCreateTopicEnableResponse(object["AutoCreateTopicEnable"])) + d.Set("default_topic_partition_num", formatInt(object["DefaultPartitionNum"])) + + if vSwitchIds, ok := object["VSwitchIds"]; ok { + vSwitchIdsArg := vSwitchIds.(map[string]interface{}) + + if vSwitchIdsList, ok := vSwitchIdsArg["VSwitchIds"]; ok { + d.Set("vswitch_ids", vSwitchIdsList) + } + } + if fmt.Sprint(object["PaidType"]) == "0" { d.Set("paid_type", PrePaid) } @@ -435,8 +485,8 @@ func resourceAliCloudAlikafkaInstanceRead(d *schema.ResourceData, meta interface if err != nil { return WrapError(err) } + d.Set("tags", alikafkaService.tagsToMap(tags)) - d.Set("io_max_spec", object["IoMaxSpec"]) return nil } @@ -455,13 +505,8 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa return WrapError(err) } - if d.IsNewResource() { - d.Partial(false) - return resourceAliCloudAlikafkaInstanceRead(d, meta) - } - // Process change instance name. - if d.HasChange("name") { + if !d.IsNewResource() && d.HasChange("name") { action := "ModifyInstanceName" request := map[string]interface{}{ "RegionId": client.RegionId, @@ -472,9 +517,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa request["InstanceName"] = v } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -485,17 +532,20 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } + d.SetPartial("name") } // Process paid type change, note only support change from post to pre pay. - if d.HasChange("paid_type") { + if !d.IsNewResource() && d.HasChange("paid_type") { o, n := d.GetChange("paid_type") oldPaidType := o.(string) newPaidType := n.(string) @@ -514,9 +564,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa "InstanceId": d.Id(), } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -527,6 +579,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } @@ -535,6 +588,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } + } else { return WrapError(errors.New("paid type only support change from post pay to pre pay")) } @@ -548,16 +602,17 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa "RegionId": client.RegionId, } // updating topic_quota only by updating partition_num - if d.HasChange("partition_num") { + if !d.IsNewResource() && d.HasChange("partition_num") { update = true } request["PartitionNum"] = d.Get("partition_num") - if d.HasChange("disk_size") { + + if !d.IsNewResource() && d.HasChange("disk_size") { update = true } request["DiskSize"] = d.Get("disk_size") - if d.HasChange("io_max") { + if !d.IsNewResource() && d.HasChange("io_max") { update = true if v, ok := d.GetOk("io_max"); ok { @@ -565,7 +620,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa } } - if d.HasChange("io_max_spec") { + if !d.IsNewResource() && d.HasChange("io_max_spec") { update = true if v, ok := d.GetOk("io_max_spec"); ok { @@ -573,12 +628,12 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa } } - if d.HasChange("spec_type") { + if !d.IsNewResource() && d.HasChange("spec_type") { update = true } request["SpecType"] = d.Get("spec_type") - if d.HasChange("deploy_type") { + if !d.IsNewResource() && d.HasChange("deploy_type") { update = true } if d.Get("deploy_type").(int) == 4 { @@ -586,19 +641,23 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa } else { request["EipModel"] = false } - if d.HasChange("eip_max") { + if !d.IsNewResource() && d.HasChange("eip_max") { update = true } request["EipMax"] = d.Get("eip_max").(int) if update { action := "UpgradePostPayOrder" + if d.Get("paid_type").(string) == string(PrePaid) { action = "UpgradePrePayOrder" } + + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(5*time.Minute, func() *resource.RetryError { - raw, err := conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if NeedRetry(err) || IsExpectedErrors(err, []string{"ONS_SYSTEM_FLOW_CONTROL"}) { wait() @@ -606,37 +665,37 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa } return resource.NonRetryableError(err) } - addDebug(action, raw, request) return nil }) + addDebug(action, response, request) if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } - stateConf := BuildStateConf([]string{}, []string{"5"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{})) - if _, err := stateConf.WaitForState(); err != nil { - return WrapErrorf(err, IdMsg, d.Id()) - } - - stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("disk_size"))}, d.Timeout(schema.TimeoutUpdate), 0*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "DiskSize", []string{})) + stateConf := BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("disk_size"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "DiskSize", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } if d.HasChange("io_max") { - stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("io_max"))}, d.Timeout(schema.TimeoutUpdate), 0*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "IoMax", []string{})) + stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("io_max"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "IoMax", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } } - stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("eip_max"))}, d.Timeout(schema.TimeoutUpdate), 0*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "EipMax", []string{})) + stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("eip_max"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "EipMax", []string{})) + if _, err := stateConf.WaitForState(); err != nil { + return WrapErrorf(err, IdMsg, d.Id()) + } + + stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("spec_type"))}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "SpecType", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } - stateConf = BuildStateConf([]string{}, []string{fmt.Sprint(d.Get("spec_type"))}, d.Timeout(schema.TimeoutUpdate), 0*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "SpecType", []string{})) + stateConf = BuildStateConf([]string{}, []string{"5"}, d.Timeout(schema.TimeoutUpdate), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } @@ -649,7 +708,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa d.SetPartial("eip_max") } - if d.HasChange("service_version") { + if !d.IsNewResource() && d.HasChange("service_version") { action := "UpgradeInstanceVersion" request := map[string]interface{}{ "InstanceId": d.Id(), @@ -660,9 +719,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa request["TargetVersion"] = v } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -677,9 +738,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } @@ -694,7 +757,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa d.SetPartial("service_version") } - if d.HasChange("config") { + if !d.IsNewResource() && d.HasChange("config") { action := "UpdateInstanceConfig" request := map[string]interface{}{ "RegionId": client.RegionId, @@ -705,9 +768,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa request["Config"] = v } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -718,9 +783,11 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } @@ -730,6 +797,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) } + d.SetPartial("config") } @@ -739,7 +807,7 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa "ResourceId": d.Id(), } - if d.HasChange("resource_group_id") { + if !d.IsNewResource() && d.HasChange("resource_group_id") { update = true } if v, ok := d.GetOk("resource_group_id"); ok { @@ -772,6 +840,142 @@ func resourceAliCloudAlikafkaInstanceUpdate(d *schema.ResourceData, meta interfa d.SetPartial("resource_group_id") } + update = false + enableAutoGroupCreationReq := map[string]interface{}{ + "RegionId": client.RegionId, + "InstanceId": d.Id(), + } + + if d.HasChange("enable_auto_group") { + update = true + + if v, ok := d.GetOkExists("enable_auto_group"); ok { + enableAutoGroupCreationReq["Enable"] = v + } + } + + if update { + action := "EnableAutoGroupCreation" + + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) + wait := incrementalWait(3*time.Second, 3*time.Second) + err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, enableAutoGroupCreationReq, &runtime) + if err != nil { + if NeedRetry(err) { + wait() + return resource.RetryableError(err) + } + return resource.NonRetryableError(err) + } + return nil + }) + addDebug(action, response, enableAutoGroupCreationReq) + + if err != nil { + return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) + } + + if fmt.Sprint(response["Success"]) == "false" { + return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) + } + + d.SetPartial("enable_auto_group") + } + + update = false + enableAutoTopicCreationReq := map[string]interface{}{ + "RegionId": client.RegionId, + "InstanceId": d.Id(), + } + + if d.HasChange("enable_auto_topic") { + update = true + } + if v, ok := d.GetOk("enable_auto_topic"); ok { + enableAutoTopicCreationReq["Operate"] = v + } + + if update { + action := "EnableAutoTopicCreation" + + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) + wait := incrementalWait(3*time.Second, 3*time.Second) + err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, enableAutoTopicCreationReq, &runtime) + if err != nil { + if NeedRetry(err) { + wait() + return resource.RetryableError(err) + } + return resource.NonRetryableError(err) + } + return nil + }) + addDebug(action, response, enableAutoTopicCreationReq) + + if err != nil { + return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) + } + + if fmt.Sprint(response["Success"]) == "false" { + return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) + } + + d.SetPartial("enable_auto_topic") + } + + update = false + updateTopicPartitionNumReq := map[string]interface{}{ + "RegionId": client.RegionId, + "Operate": "updatePartition", + "UpdatePartition": true, + "InstanceId": d.Id(), + } + + object, err := alikafkaService.DescribeAliKafkaInstance(d.Id()) + if err != nil { + return WrapError(err) + } + + defaultTopicPartitionNum, ok := d.GetOkExists("default_topic_partition_num") + if ok && fmt.Sprint(object["DefaultPartitionNum"]) != fmt.Sprint(defaultTopicPartitionNum) { + update = true + updateTopicPartitionNumReq["PartitionNum"] = defaultTopicPartitionNum + } + + if update { + action := "EnableAutoTopicCreation" + + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) + wait := incrementalWait(3*time.Second, 3*time.Second) + err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutUpdate)), func() *resource.RetryError { + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, updateTopicPartitionNumReq, &runtime) + if err != nil { + if NeedRetry(err) { + wait() + return resource.RetryableError(err) + } + return resource.NonRetryableError(err) + } + return nil + }) + addDebug(action+" updateTopicPartitionNum", response, updateTopicPartitionNumReq) + + if err != nil { + return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) + } + + if fmt.Sprint(response["Success"]) == "false" { + return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) + } + + d.SetPartial("default_topic_partition_num") + } + d.Partial(false) return resourceAliCloudAlikafkaInstanceRead(d, meta) @@ -798,9 +1002,11 @@ func resourceAliCloudAlikafkaInstanceDelete(d *schema.ResourceData, meta interfa return nil } + runtime := util.RuntimeOptions{} + runtime.SetAutoretry(true) wait := incrementalWait(3*time.Second, 3*time.Second) err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutDelete)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if IsExpectedErrors(err, []string{ThrottlingUser, "ONS_SYSTEM_FLOW_CONTROL"}) || NeedRetry(err) { wait() @@ -811,12 +1017,15 @@ func resourceAliCloudAlikafkaInstanceDelete(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } + stateConf := BuildStateConf([]string{}, []string{"15"}, d.Timeout(schema.TimeoutDelete), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) @@ -829,7 +1038,7 @@ func resourceAliCloudAlikafkaInstanceDelete(d *schema.ResourceData, meta interfa } err = resource.Retry(client.GetRetryTimeout(d.Timeout(schema.TimeoutDelete)), func() *resource.RetryError { - response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &util.RuntimeOptions{}) + response, err = conn.DoRequest(StringPointer(action), nil, StringPointer("POST"), StringPointer("2019-09-16"), StringPointer("AK"), nil, request, &runtime) if err != nil { if NeedRetry(err) { wait() @@ -840,12 +1049,15 @@ func resourceAliCloudAlikafkaInstanceDelete(d *schema.ResourceData, meta interfa return nil }) addDebug(action, response, request) + if err != nil { return WrapErrorf(err, DefaultErrorMsg, d.Id(), action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } + stateConf = BuildStateConf([]string{}, []string{}, d.Timeout(schema.TimeoutDelete), 5*time.Second, alikafkaService.AliKafkaInstanceStateRefreshFunc(d.Id(), "ServiceStatus", []string{})) if _, err := stateConf.WaitForState(); err != nil { return WrapErrorf(err, IdMsg, d.Id()) @@ -885,3 +1097,14 @@ func formatSelectedZonesReq(configured []interface{}) string { return result } + +func convertAliKafkaAutoCreateTopicEnableResponse(source interface{}) interface{} { + switch source { + case true: + return "enable" + case false: + return "disable" + } + + return source +} diff --git a/alicloud/resource_alicloud_alikafka_instance_test.go b/alicloud/resource_alicloud_alikafka_instance_test.go index 33898c267dac..5789b8cb94e1 100644 --- a/alicloud/resource_alicloud_alikafka_instance_test.go +++ b/alicloud/resource_alicloud_alikafka_instance_test.go @@ -127,7 +127,7 @@ func TestAccAliCloudAlikafkaInstance_basic(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccConfig(map[string]interface{}{ - "name": "${var.name}", + "name": name, "topic_quota": "50", "disk_type": "1", "disk_size": "500", @@ -139,7 +139,7 @@ func TestAccAliCloudAlikafkaInstance_basic(t *testing.T) { }), Check: resource.ComposeTestCheckFunc( testAccCheck(map[string]string{ - "name": fmt.Sprintf("tf-testacc-alikafkainstancebasic%v", rand), + "name": name, "security_group": CHECKSET, "kms_key_id": CHECKSET, "partition_num": "0", @@ -226,6 +226,36 @@ func TestAccAliCloudAlikafkaInstance_basic(t *testing.T) { }), ), }, + { + Config: testAccConfig(map[string]interface{}{ + "enable_auto_group": "true", + }), + Check: resource.ComposeTestCheckFunc( + testAccCheck(map[string]string{ + "enable_auto_group": "true", + }), + ), + }, + { + Config: testAccConfig(map[string]interface{}{ + "enable_auto_topic": "enable", + }), + Check: resource.ComposeTestCheckFunc( + testAccCheck(map[string]string{ + "enable_auto_topic": "enable", + }), + ), + }, + { + Config: testAccConfig(map[string]interface{}{ + "default_topic_partition_num": "6", + }), + Check: resource.ComposeTestCheckFunc( + testAccCheck(map[string]string{ + "default_topic_partition_num": "6", + }), + ), + }, { Config: testAccConfig(map[string]interface{}{ "tags": map[string]string{ @@ -244,51 +274,27 @@ func TestAccAliCloudAlikafkaInstance_basic(t *testing.T) { { Config: testAccConfig(map[string]interface{}{ "tags": map[string]string{ - "Created": "TF", - "For": "acceptance test", - "Updated": "TF", + "Created": "TF-update", + "For": "Test-update", }, }), Check: resource.ComposeTestCheckFunc( testAccCheck(map[string]string{ - "tags.%": "3", - "tags.Created": "TF", - "tags.For": "acceptance test", - "tags.Updated": "TF", + "tags.%": "2", + "tags.Created": "TF-update", + "tags.For": "Test-update", }), ), }, - // suspend PrePaid testing { Config: testAccConfig(map[string]interface{}{ - "name": "${var.name}", - "partition_num": "2", - "disk_size": "1400", - "deploy_type": "4", - "io_max": "60", - "eip_max": "12", - "spec_type": "professional", - "service_version": "2.2.0", - //"config": `{\"enable.vpc_sasl_ssl\":\"false\",\"kafka.log.retention.hours\":\"96\",\"enable.acl\":\"false\",\"kafka.message.max.bytes\":\"1048576\"}`, "tags": REMOVEKEY, }), Check: resource.ComposeTestCheckFunc( testAccCheck(map[string]string{ - "name": fmt.Sprintf("tf-testacc-alikafkainstancebasic%v", rand), - "partition_num": "2", - "topic_quota": "1002", - "disk_size": "1400", - "deploy_type": "4", - "io_max": "60", - "eip_max": "12", - "paid_type": "PostPaid", - "spec_type": "professional", - "service_version": "2.2.0", - //"config": "{\"enable.vpc_sasl_ssl\":\"false\",\"kafka.log.retention.hours\":\"96\",\"enable.acl\":\"false\",\"kafka.message.max.bytes\":\"1048576\"}", "tags.%": REMOVEKEY, "tags.Created": REMOVEKEY, "tags.For": REMOVEKEY, - "tags.Updated": REMOVEKEY, }), ), }, @@ -319,7 +325,7 @@ func TestAccAliCloudAlikafkaInstance_convert(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccConfig(map[string]interface{}{ - "name": "${var.name}", + "name": name, "partition_num": "50", "disk_type": "1", "disk_size": "500", @@ -400,7 +406,7 @@ func TestAccAliCloudAlikafkaInstance_prepaid(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccConfig(map[string]interface{}{ - "name": "${var.name}", + "name": name, "partition_num": "50", "disk_type": "1", "disk_size": "500", @@ -479,46 +485,54 @@ func TestAccAliCloudAlikafkaInstance_VpcId(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccConfig(map[string]interface{}{ - "name": name, - "topic_quota": "50", - "disk_type": "1", - "disk_size": "800", - "deploy_type": "4", - "eip_max": "3", - "io_max_spec": "alikafka.hw.2xlarge", - "vswitch_id": "${data.alicloud_vswitches.default.ids.0}", - "paid_type": "PostPaid", - "spec_type": "professional", - "service_version": "2.2.0", - "config": `{\"enable.vpc_sasl_ssl\":\"true\",\"kafka.log.retention.hours\":\"72\",\"enable.acl\":\"true\",\"kafka.message.max.bytes\":\"1048576\"}`, + "name": name, + "topic_quota": "50", + "disk_type": "1", + "disk_size": "800", + "deploy_type": "4", + "eip_max": "3", + "io_max_spec": "alikafka.hw.2xlarge", + "vswitch_id": "${data.alicloud_vswitches.default.ids.0}", + "paid_type": "PostPaid", + "spec_type": "professional", + "service_version": "2.2.0", + "enable_auto_group": "true", + "enable_auto_topic": "enable", + "default_topic_partition_num": "6", + "config": `{\"enable.vpc_sasl_ssl\":\"true\",\"kafka.log.retention.hours\":\"72\",\"enable.acl\":\"true\",\"kafka.message.max.bytes\":\"1048576\"}`, "tags": map[string]string{ "Created": "TF", "For": "acceptance test", }, "security_group": "${alicloud_security_group.default.id}", "vpc_id": "${data.alicloud_vpcs.default.ids.0}", - "selected_zones": []string{"zonea", "zoneb"}, + "vswitch_ids": []string{"${data.alicloud_vswitches.default.ids.0}", "${data.alicloud_vswitches.default.ids.1}"}, + "selected_zones": []string{"zoneb", "zonec"}, }), Check: resource.ComposeTestCheckFunc( testAccCheck(map[string]string{ - "name": name, - "partition_num": "0", - "topic_quota": "1000", - "disk_type": "1", - "disk_size": "800", - "deploy_type": "4", - "eip_max": "3", - "io_max_spec": "alikafka.hw.2xlarge", - "paid_type": "PostPaid", - "spec_type": "professional", - "service_version": "2.2.0", - "config": CHECKSET, - "tags.%": "2", - "tags.Created": "TF", - "tags.For": "acceptance test", - "ssl_endpoint": CHECKSET, - "ssl_domain_endpoint": CHECKSET, - "sasl_domain_endpoint": CHECKSET, + "name": name, + "partition_num": "0", + "topic_quota": "1000", + "disk_type": "1", + "disk_size": "800", + "deploy_type": "4", + "eip_max": "3", + "io_max_spec": "alikafka.hw.2xlarge", + "paid_type": "PostPaid", + "spec_type": "professional", + "service_version": "2.2.0", + "enable_auto_group": "true", + "enable_auto_topic": "enable", + "default_topic_partition_num": "6", + "config": CHECKSET, + "vswitch_ids.#": "2", + "tags.%": "2", + "tags.Created": "TF", + "tags.For": "acceptance test", + "ssl_endpoint": CHECKSET, + "ssl_domain_endpoint": CHECKSET, + "sasl_domain_endpoint": CHECKSET, }), ), }, diff --git a/alicloud/service_alicloud_alikafka.go b/alicloud/service_alicloud_alikafka.go index 025480028bd0..8ddd8f078ee1 100644 --- a/alicloud/service_alicloud_alikafka.go +++ b/alicloud/service_alicloud_alikafka.go @@ -1057,15 +1057,18 @@ func (s *AlikafkaService) DescribeAliKafkaInstanceAllowedIpAttachment(id string) func (s *AlikafkaService) DescribeAliKafkaInstance(id string) (object map[string]interface{}, err error) { var response map[string]interface{} + action := "GetInstanceList" + conn, err := s.client.NewAlikafkaClient() if err != nil { return nil, WrapError(err) } - action := "GetInstanceList" + request := map[string]interface{}{ "RegionId": s.client.RegionId, "InstanceId": []string{id}, } + idExist := false runtime := util.RuntimeOptions{} runtime.SetAutoretry(true) @@ -1082,27 +1085,33 @@ func (s *AlikafkaService) DescribeAliKafkaInstance(id string) (object map[string return nil }) addDebug(action, response, request) + if err != nil { return object, WrapErrorf(err, DefaultErrorMsg, id, action, AlibabaCloudSdkGoERROR) } + if fmt.Sprint(response["Success"]) == "false" { return object, WrapError(fmt.Errorf("%s failed, response: %v", action, response)) } - v, err := jsonpath.Get("$.InstanceList.InstanceVO", response) + + resp, err := jsonpath.Get("$.InstanceList.InstanceVO", response) if err != nil { return object, WrapErrorf(err, FailedGetAttributeMsg, id, "$.InstanceList.InstanceVO", response) } - if len(v.([]interface{})) < 1 { - return object, WrapErrorf(Error(GetNotFoundMessage("AliKafka", id)), NotFoundWithResponse, response) + + if v, ok := resp.([]interface{}); !ok || len(v) < 1 { + return object, WrapErrorf(Error(GetNotFoundMessage("AliKafka:Instance", id)), NotFoundWithResponse, response) } - for _, v := range v.([]interface{}) { + + for _, v := range resp.([]interface{}) { if fmt.Sprint(v.(map[string]interface{})["InstanceId"]) == id && fmt.Sprint(v.(map[string]interface{})["ServiceStatus"]) != "10" { idExist = true return v.(map[string]interface{}), nil } } + if !idExist { - return object, WrapErrorf(Error(GetNotFoundMessage("AliKafka", id)), NotFoundWithResponse, response) + return object, WrapErrorf(Error(GetNotFoundMessage("AliKafka:Instance", id)), NotFoundWithResponse, response) } return object, nil diff --git a/website/docs/r/alikafka_instance.html.markdown b/website/docs/r/alikafka_instance.html.markdown index 59709b3fd65d..3a614a94eabf 100644 --- a/website/docs/r/alikafka_instance.html.markdown +++ b/website/docs/r/alikafka_instance.html.markdown @@ -35,7 +35,7 @@ Basic Usage ```terraform variable "instance_name" { - default = "tf-example" + default = "terraform-example" } resource "random_integer" "default" { @@ -116,9 +116,17 @@ The following arguments are supported: * `tags` - (Optional, Available since v1.63.0) A mapping of tags to assign to the resource. * `vpc_id` - (Optional, ForceNew, Available since v1.185.0) The VPC ID of the instance. * `zone_id` - (Optional, ForceNew, Available since v1.185.0) The zone ID of the instance. The value can be in zone x or region id-x format. **NOTE**: When the available zone is insufficient, another availability zone may be deployed. -* `selected_zones` - (Optional, Available since v1.195.0) The zones among which you want to deploy the instance. - --> **NOTE:** Arguments io_max, disk_size, topic_quota, eip_max should follow the following constraints. +* `enable_auto_group` - (Optional, Bool, Available since v1.241.0) Specify whether to enable the flexible group creation feature. Default value: `false`. Valid values: + - `true`: Enables the flexible group creation feature. + - `false`: Disabled the flexible group creation feature. +* `enable_auto_topic` - (Optional, Available since v1.241.0) Specify whether to enable the automatic topic creation feature. Default value: `disable`. Valid values: + - `enable`: Enables the automatic topic creation feature. + - `disable`: Disabled the automatic topic creation feature. +* `default_topic_partition_num` - (Optional, Int, Available since v1.241.0) The number of partitions in a topic that is automatically created. +* `vswitch_ids` - (Optional, List, Available since v1.241.0) The IDs of the vSwitches with which the instance is associated. +* `selected_zones` - (Optional, List, Available since v1.195.0) The zones among which you want to deploy the instance. + +-> **NOTE:** Field `io_max`, `disk_size`, `topic_quota`, `eip_max` should follow the following constraints. | io_max | disk_size(min-max:lag) | topic_quota(min-max:lag) | eip_max(min-max:lag) | |------|-------------|:----:|:-----:|