Skip to content

Commit

Permalink
feat: Changed to use watch to wait_for
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Hipwell <steve.hipwell@gmail.com>
  • Loading branch information
stevehipwell committed Sep 2, 2024
1 parent fdf34c7 commit b4cfec5
Showing 1 changed file with 76 additions and 43 deletions.
119 changes: 76 additions & 43 deletions kubernetes/resource_kubectl_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/alekc/terraform-provider-kubectl/yaml"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
validate2 "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/mitchellh/mapstructure"
"github.com/thedevsaddam/gojsonq/v2"
Expand Down Expand Up @@ -660,16 +659,27 @@ func resourceKubectlManifestApply(ctx context.Context, d *schema.ResourceData, m

if v, ok := d.GetOk("wait_for"); ok {
timeout := d.Timeout(schema.TimeoutCreate)

waitFor := types.WaitFor{}
if err := mapstructure.Decode((v.([]interface{}))[0], &waitFor); err != nil {
return fmt.Errorf("cannot decode wait for conditions %v", err)
}
if len(waitFor.Field) == 0 && len(waitFor.Condition) == 0 {
return fmt.Errorf("at least one of `field` or `condition` must be provided in `wait_for` block")
}

rawResponse, err := restClient.ResourceInterface.List(ctx, meta_v1.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", manifest.GetName()).String()})
if err != nil {
return err
}

resourceVersion, _, err := unstructured.NestedString(rawResponse.Object, "metadata", "resourceVersion")
if err != nil {
return err
}

log.Printf("[INFO] %v waiting for wait conditions for %vmin", manifest, timeout.Minutes())
err = resource.RetryContext(ctx, timeout,
waitForConditions(ctx, restClient, waitFor.Field, waitFor.Condition, manifest.GetNamespace(), manifest.GetName()))
err = waitForConditions(ctx, restClient, waitFor.Field, waitFor.Condition, manifest.GetName(), resourceVersion, timeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -1185,54 +1195,77 @@ func waitForApiService(ctx context.Context, provider *KubeProvider, name string,
return nil
}

func waitForConditions(ctx context.Context, provider *RestClientResult, fields []types.WaitForField, conditions []types.WaitForStatusCondition, ns, name string) resource.RetryFunc {
return func() *resource.RetryError {
rawResponse, err := provider.ResourceInterface.Get(ctx, name, meta_v1.GetOptions{})
if err != nil {
return resource.NonRetryableError(err)
}
func waitForConditions(ctx context.Context, restClient *RestClientResult, waitFields []types.WaitForField, waitConditions []types.WaitForStatusCondition, name string, resourceVersion string, timeout time.Duration) error {
timeoutSeconds := int64(timeout.Seconds())

//convert to json and create a json query object from it
yamlJson, err := rawResponse.MarshalJSON()
if err != nil {
return resource.NonRetryableError(err)
}
gq := gojsonq.New().FromString(string(yamlJson))
for _, c := range fields {
//find the key
v := gq.Reset().Find(c.Key)
if v == nil {
return resource.RetryableError(fmt.Errorf("key %s was not found in the resource %s", c.Key, name))
}
// for the sake of comparison, we will convert everything to a string
stringVal := fmt.Sprintf("%v", v)
switch c.ValueType {
case "regex":
matched, err := regexp.Match(c.Value, []byte(stringVal))
switch {
case err != nil:
return resource.NonRetryableError(fmt.Errorf(
"invalid regex `%s`. error was %v", c.Value, err))
case !matched:
return resource.RetryableError(fmt.Errorf("%s key %s did not match regex %s. Value was %s", name, c.Key, c.Value, stringVal))
watcher, err := restClient.ResourceInterface.Watch(ctx, meta_v1.ListOptions{Watch: true, TimeoutSeconds: &timeoutSeconds, FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), ResourceVersion: resourceVersion})
if err != nil {
return err
}

defer watcher.Stop()

done := false
for !done {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
rawResponse, ok := event.Object.(*meta_v1_unstruct.Unstructured)
if !ok {
return fmt.Errorf("%s could not cast resource to unstructured", name)
}

case "eq", "":
if stringVal != c.Value {
return resource.RetryableError(fmt.Errorf("%s key %s value was not equal to expected. Got %s, want %s", name, c.Key, stringVal, c.Value))
yamlJson, err := rawResponse.MarshalJSON()
if err != nil {
return err
}
}
}

for _, c := range conditions {
//find the conditions by status and type
v := gq.Reset().From("status.conditions").Where("type", "=", c.Type).Where("status", "=", c.Status)
if v == nil {
return resource.RetryableError(fmt.Errorf("key %s was not found in the resource %s", c.Status, name))
gq := gojsonq.New().FromString(string(yamlJson))

for _, c := range waitConditions {
// Find the conditions by status and type
v := gq.Reset().From("status.conditions").Where("type", "=", c.Type).Where("status", "=", c.Status)
if v == nil {
continue
}
}

for _, c := range waitFields {
// Find the key
v := gq.Reset().Find(c.Key)
if v == nil {
continue
}

// For the sake of comparison we will convert everything to a string
stringVal := fmt.Sprintf("%v", v)
switch c.ValueType {
case "regex":
matched, err := regexp.Match(c.Value, []byte(stringVal))
if err != nil {
return err
}

if !matched {
continue
}

case "eq", "":
if stringVal != c.Value {
continue
}
}
}

done = true
}

case <-ctx.Done():
return fmt.Errorf("%s failed to wait for resource", name)
}
return nil
}

return nil
}

// Takes the result of flatmap.Expand for an array of strings
Expand Down

0 comments on commit b4cfec5

Please sign in to comment.