From bea055a7e711e5cca9cf9f1a795bcbbce389ff4a Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Sat, 11 Dec 2021 16:07:23 +0100 Subject: [PATCH] Remove failed libraries on running clusters Whenever a library fails to get installed on a running cluster, we automatically remove it, so that the clean state of managed libraries is properly maintained. Without this fix users had to manually go to Clusters UI and remove library from a cluster, where it failed to install. Libraries add up to CREATE and UPDATE timeouts of `databricks_cluster` resource. Fixes #599 --- CHANGELOG.md | 6 ++- clusters/resource_cluster.go | 25 ++++++++--- clusters/resource_cluster_test.go | 70 ++++++++++++++++++++++++++--- libraries/libraries_api.go | 74 ++++++++++++++++++++++++------- libraries/libraries_api_test.go | 57 +++++++++++++++++++----- 5 files changed, 188 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63cdaa8005..06fd5d0ed4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,10 @@ ## 0.4.1 -* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952)) -* Added `databricks_clusters` data resource to list all clusters in the workspace. +* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952)). +* Added `databricks_clusters` data resource to list all clusters in the workspace ([#955](https://github.com/databrickslabs/terraform-provider-databricks/pull/955)). +* Whenever a library fails to get installed on a running `databricks_cluster`, we now automatically remove this library, so that the clean state of managed libraries is properly maintained. Without this fix users had to manually go to Clusters UI and remove library from a cluster, where it failed to install. Libraries add up to CREATE and UPDATE timeouts of `databricks_cluster` resource. ([#599](https://github.com/databrickslabs/terraform-provider-databricks/issues/599)). +* Added new experimental resources and increased test coverage. ## 0.4.0 diff --git a/clusters/resource_cluster.go b/clusters/resource_cluster.go index 6c1857720a..387771d716 100644 --- a/clusters/resource_cluster.go +++ b/clusters/resource_cluster.go @@ -122,6 +122,8 @@ func resourceClusterSchema() map[string]*schema.Schema { func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { var cluster Cluster + start := time.Now() + timeout := d.Timeout(schema.TimeoutCreate) clusters := NewClustersAPI(ctx, c) err := common.DataToStructPointer(d, clusterSchema, &cluster) if err != nil { @@ -131,6 +133,7 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo return err } cluster.ModifyRequestOnInstancePool() + // TODO: propagate d.Timeout(schema.TimeoutCreate) clusterInfo, err := clusters.Create(cluster) if err != nil { return err @@ -153,9 +156,12 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo if err = libs.Install(libraryList); err != nil { return err } - // TODO: share the remainder of timeout from clusters.Create - timeout := d.Timeout(schema.TimeoutCreate) - _, err := libs.WaitForLibrariesInstalled(d.Id(), timeout, clusterInfo.IsRunningOrResizing()) + _, err := libs.WaitForLibrariesInstalled(libraries.Wait{ + ClusterID: d.Id(), + Timeout: timeout - time.Since(start), + IsRunning: clusterInfo.IsRunningOrResizing(), + IsRefresh: false, + }) if err != nil { return err } @@ -195,8 +201,12 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, c *common. } d.Set("url", c.FormatURL("#setting/clusters/", d.Id(), "/configuration")) librariesAPI := libraries.NewLibrariesAPI(ctx, c) - libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(d.Id(), - d.Timeout(schema.TimeoutRead), clusterInfo.IsRunningOrResizing()) + libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(libraries.Wait{ + ClusterID: d.Id(), + Timeout: d.Timeout(schema.TimeoutRead), + IsRunning: clusterInfo.IsRunningOrResizing(), + IsRefresh: true, + }) if err != nil { return err } @@ -284,9 +294,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo return err } } - // clusters.StartAndGetInfo() always returns a running cluster + // clusters.StartAndGetInfo() always returns a running cluster // or errors out, so we just know the cluster is active. - err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall, true) + err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall, + d.Timeout(schema.TimeoutUpdate)) if err != nil { return err } diff --git a/clusters/resource_cluster_test.go b/clusters/resource_cluster_test.go index b19dba8b69..d6af3d01f3 100644 --- a/clusters/resource_cluster_test.go +++ b/clusters/resource_cluster_test.go @@ -1210,8 +1210,8 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) { Fixtures: []qa.HTTPFixture{ { Method: "GET", - Resource: "/api/2.0/clusters/get?cluster_id=foo", - Response: ClusterInfo { + Resource: "/api/2.0/clusters/get?cluster_id=foo", + Response: ClusterInfo{ State: ClusterStateTerminated, }, }, @@ -1220,10 +1220,10 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) { Resource: "/api/2.0/clusters/events", }, { - Method: "GET", + Method: "GET", ReuseRequest: true, - Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo", - Response: libraries.ClusterLibraryStatuses { + Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo", + Response: libraries.ClusterLibraryStatuses{ ClusterID: "foo", LibraryStatuses: []libraries.LibraryStatus{ { @@ -1237,6 +1237,62 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) { }, }, Read: true, - ID: "foo", + ID: "foo", }.ApplyNoError(t) -} \ No newline at end of file +} + +// https://github.com/databrickslabs/terraform-provider-databricks/issues/599 +func TestRefreshOnRunningClusterWithFailedLibraryUninstallsIt(t *testing.T) { + qa.ResourceFixture{ + Resource: ResourceCluster(), + Fixtures: []qa.HTTPFixture{ + { + Method: "GET", + Resource: "/api/2.0/clusters/get?cluster_id=foo", + Response: ClusterInfo{ + State: ClusterStateRunning, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/clusters/events", + }, + { + Method: "GET", + Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo", + Response: libraries.ClusterLibraryStatuses{ + ClusterID: "foo", + LibraryStatuses: []libraries.LibraryStatus{ + { + Status: "FAILED", + Messages: []string{"fails for the test"}, + Library: &libraries.Library{ + Jar: "foo.bar", + }, + }, + { + Status: "INSTALLED", + Library: &libraries.Library{ + Whl: "bar.whl", + }, + }, + }, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/libraries/uninstall", + ExpectedRequest: libraries.ClusterLibraryList{ + ClusterID: "foo", + Libraries: []libraries.Library{ + { + Jar: "foo.bar", + }, + }, + }, + }, + }, + Read: true, + ID: "foo", + }.ApplyNoError(t) +} diff --git a/libraries/libraries_api.go b/libraries/libraries_api.go index 46a7dfea89..d940fc4c07 100644 --- a/libraries/libraries_api.go +++ b/libraries/libraries_api.go @@ -45,7 +45,14 @@ func (a LibrariesAPI) ClusterStatus(clusterID string) (cls ClusterLibraryStatuse return } -func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, isActive bool) error { +type Wait struct { + ClusterID string + Timeout time.Duration + IsRunning bool + IsRefresh bool +} + +func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, timeout time.Duration) error { if len(remove.Libraries) > 0 { err := a.Uninstall(remove) if err != nil { @@ -58,15 +65,19 @@ func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibra return err } } - // TODO: propagate timeout to method signature - _, err := a.WaitForLibrariesInstalled(clusterID, 30*time.Minute, isActive) + _, err := a.WaitForLibrariesInstalled(Wait{ + ClusterID: clusterID, + Timeout: timeout, + IsRunning: true, + IsRefresh: false, + }) return err } -func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.Duration, - isActive bool) (result *ClusterLibraryStatuses, err error) { - err = resource.RetryContext(a.context, timeout, func() *resource.RetryError { - libsClusterStatus, err := a.ClusterStatus(clusterID) +// clusterID string, timeout time.Duration, isActive bool, refresh bool +func (a LibrariesAPI) WaitForLibrariesInstalled(wait Wait) (result *ClusterLibraryStatuses, err error) { + err = resource.RetryContext(a.context, wait.Timeout, func() *resource.RetryError { + libsClusterStatus, err := a.ClusterStatus(wait.ClusterID) if common.IsMissing(err) { // eventual consistency error return resource.RetryableError(err) @@ -74,13 +85,13 @@ func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.D if err != nil { return resource.NonRetryableError(err) } - if !isActive { + if !wait.IsRunning { log.Printf("[INFO] Cluster %s is currently not running, so just returning list of %d libraries", - clusterID, len(libsClusterStatus.LibraryStatuses)) + wait.ClusterID, len(libsClusterStatus.LibraryStatuses)) result = &libsClusterStatus return nil } - retry, err := libsClusterStatus.IsRetryNeeded() + retry, err := libsClusterStatus.IsRetryNeeded(wait.IsRefresh) if retry { return resource.RetryableError(err) } @@ -90,6 +101,33 @@ func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.D result = &libsClusterStatus return nil }) + if err != nil { + return + } + if wait.IsRunning { + installed := []LibraryStatus{} + cleanup := ClusterLibraryList{ + ClusterID: wait.ClusterID, + Libraries: []Library{}, + } + // cleanup libraries that failed to install + for _, v := range result.LibraryStatuses { + if v.Status == "FAILED" { + log.Printf("[WARN] Removing failed library %s from %s", v.Library, wait.ClusterID) + cleanup.Libraries = append(cleanup.Libraries, *v.Library) + continue + } + installed = append(installed, v) + } + // and result contains only the libraries that were successfully installed + result.LibraryStatuses = installed + if len(cleanup.Libraries) > 0 { + err = a.Uninstall(cleanup) + if err != nil { + err = fmt.Errorf("cannot cleanup libraries: %w", err) + } + } + } return } @@ -243,10 +281,10 @@ func (cll *ClusterLibraryList) String() string { // LibraryStatus is the status on a given cluster when using the libraries status api type LibraryStatus struct { - Library *Library `json:"library,omitempty"` - Status string `json:"status,omitempty"` - IsLibraryInstalledOnAllClusters bool `json:"is_library_for_all_clusters,omitempty"` - Messages []string `json:"messages,omitempty"` + Library *Library `json:"library,omitempty"` + Status string `json:"status,omitempty"` + IsGlobal bool `json:"is_library_for_all_clusters,omitempty"` + Messages []string `json:"messages,omitempty"` } // ClusterLibraryStatuses A status will be available for all libraries installed on the cluster via the API or @@ -271,12 +309,12 @@ func (cls ClusterLibraryStatuses) ToLibraryList() ClusterLibraryList { // IsRetryNeeded returns first bool if there needs to be retry. // If there needs to be retry, error message will explain why. // If retry does not need to happen and error is not nil - it failed. -func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) { +func (cls ClusterLibraryStatuses) IsRetryNeeded(refresh bool) (bool, error) { pending := 0 ready := 0 errors := []string{} for _, lib := range cls.LibraryStatuses { - if lib.IsLibraryInstalledOnAllClusters { + if lib.IsGlobal { continue } switch lib.Status { @@ -301,6 +339,10 @@ func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) { ready++ //Some step in installation failed. More information can be found in the messages field. case "FAILED": + if refresh { + // we're reading library list on a running cluster and some of the libs failed to install + continue + } errors = append(errors, fmt.Sprintf("%s failed: %s", lib.Library, strings.Join(lib.Messages, ", "))) continue } diff --git a/libraries/libraries_api_test.go b/libraries/libraries_api_test.go index ad5b6ca15d..f8a978a5bc 100644 --- a/libraries/libraries_api_test.go +++ b/libraries/libraries_api_test.go @@ -59,28 +59,61 @@ func TestWaitForLibrariesInstalled(t *testing.T) { Whl: "b.whl", }, }, + { + Status: "INSTALLED", + Library: &Library{ + Jar: "a.jar", + }, + }, + }, + }, + }, + { + Method: "POST", + Resource: "/api/2.0/libraries/uninstall", + ExpectedRequest: ClusterLibraryList{ + ClusterID: "failed-wheel", + Libraries: []Library{ + { + Whl: "b.whl", + }, }, }, }, }, func(ctx context.Context, client *common.DatabricksClient) { libs := NewLibrariesAPI(ctx, client) - _, err := libs.WaitForLibrariesInstalled("missing", 50*time.Millisecond, true) + _, err := libs.WaitForLibrariesInstalled(Wait{ + "missing", 50 * time.Millisecond, true, false, + }) assert.EqualError(t, err, "missing") - _, err = libs.WaitForLibrariesInstalled("error", 50*time.Millisecond, true) + _, err = libs.WaitForLibrariesInstalled(Wait{ + "error", 50 * time.Millisecond, true, false, + }) assert.EqualError(t, err, "internal error") // cluster is not running - _, err = libs.WaitForLibrariesInstalled("still-installing", 50*time.Millisecond, false) + _, err = libs.WaitForLibrariesInstalled(Wait{ + "still-installing", 50 * time.Millisecond, false, false, + }) assert.NoError(t, err) // cluster is running - _, err = libs.WaitForLibrariesInstalled("still-installing", 50*time.Millisecond, true) + _, err = libs.WaitForLibrariesInstalled(Wait{ + "still-installing", 50 * time.Millisecond, true, false, + }) assert.EqualError(t, err, "0 libraries are ready, but there are still 1 pending") - _, err = libs.WaitForLibrariesInstalled("failed-wheel", 50*time.Millisecond, true) + _, err = libs.WaitForLibrariesInstalled(Wait{ + "failed-wheel", 50 * time.Millisecond, true, false, + }) assert.EqualError(t, err, "whl:b.whl failed: does not compute") + // uninstall b.whl and continue executing + _, err = libs.WaitForLibrariesInstalled(Wait{ + "failed-wheel", 50 * time.Millisecond, true, true, + }) + assert.NoError(t, err, "library should have been uninstalled and work proceeded") }) } @@ -127,7 +160,7 @@ func TestClusterLibraryStatuses_UpdateLibraries(t *testing.T) { Jar: "remove.jar", }, }, - }, true) + }, 1*time.Second) assert.NoError(t, err) }) } @@ -203,11 +236,11 @@ func TestClusterLibraryStatuses_NoNeedAllClusters(t *testing.T) { ClusterID: "abc", LibraryStatuses: []LibraryStatus{ { - IsLibraryInstalledOnAllClusters: true, - Status: "INSTALLING", + IsGlobal: true, + Status: "INSTALLING", }, }, - }.IsRetryNeeded() + }.IsRetryNeeded(false) require.NoError(t, err) assert.False(t, need) } @@ -229,7 +262,7 @@ func TestClusterLibraryStatuses_RetryingCodes(t *testing.T) { Status: "INSTALLING", }, }, - }.IsRetryNeeded() + }.IsRetryNeeded(false) require.Error(t, err) assert.Equal(t, "0 libraries are ready, but there are still 4 pending", err.Error()) assert.True(t, need) @@ -249,7 +282,7 @@ func TestClusterLibraryStatuses_ReadyStatuses(t *testing.T) { Status: "UNINSTALL_ON_RESTART", }, }, - }.IsRetryNeeded() + }.IsRetryNeeded(false) require.NoError(t, err) assert.False(t, need) } @@ -284,7 +317,7 @@ func TestClusterLibraryStatuses_Errors(t *testing.T) { Messages: []string{"b"}, }, }, - }.IsRetryNeeded() + }.IsRetryNeeded(false) require.Error(t, err) assert.Equal(t, "whl:a failed: b\nmvn:a.b.c failed: b\ncran:a failed: b", err.Error()) assert.False(t, need)