Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove failed libraries on running clusters #956

Merged
merged 1 commit into from
Dec 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

## 0.4.1

* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952))
* Added `databricks_clusters` data resource to list all clusters in the workspace.
* Fixed refresh of `library` blocks on a stopped `databricks_cluster` ([#952](https://github.com/databrickslabs/terraform-provider-databricks/issues/952)).
* Added `databricks_clusters` data resource to list all clusters in the workspace ([#955](https://github.com/databrickslabs/terraform-provider-databricks/pull/955)).
* Whenever a library fails to get installed on a running `databricks_cluster`, we now automatically remove this library, so that the clean state of managed libraries is properly maintained. Without this fix users had to manually go to Clusters UI and remove library from a cluster, where it failed to install. Libraries add up to CREATE and UPDATE timeouts of `databricks_cluster` resource. ([#599](https://github.com/databrickslabs/terraform-provider-databricks/issues/599)).
* Added new experimental resources and increased test coverage.

## 0.4.0

Expand Down
25 changes: 18 additions & 7 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func resourceClusterSchema() map[string]*schema.Schema {

func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
var cluster Cluster
start := time.Now()
timeout := d.Timeout(schema.TimeoutCreate)
clusters := NewClustersAPI(ctx, c)
err := common.DataToStructPointer(d, clusterSchema, &cluster)
if err != nil {
Expand All @@ -131,6 +133,7 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
return err
}
cluster.ModifyRequestOnInstancePool()
// TODO: propagate d.Timeout(schema.TimeoutCreate)
clusterInfo, err := clusters.Create(cluster)
if err != nil {
return err
Expand All @@ -153,9 +156,12 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
if err = libs.Install(libraryList); err != nil {
return err
}
// TODO: share the remainder of timeout from clusters.Create
timeout := d.Timeout(schema.TimeoutCreate)
_, err := libs.WaitForLibrariesInstalled(d.Id(), timeout, clusterInfo.IsRunningOrResizing())
_, err := libs.WaitForLibrariesInstalled(libraries.Wait{
ClusterID: d.Id(),
Timeout: timeout - time.Since(start),
IsRunning: clusterInfo.IsRunningOrResizing(),
IsRefresh: false,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -195,8 +201,12 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, c *common.
}
d.Set("url", c.FormatURL("#setting/clusters/", d.Id(), "/configuration"))
librariesAPI := libraries.NewLibrariesAPI(ctx, c)
libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(d.Id(),
d.Timeout(schema.TimeoutRead), clusterInfo.IsRunningOrResizing())
libsClusterStatus, err := librariesAPI.WaitForLibrariesInstalled(libraries.Wait{
ClusterID: d.Id(),
Timeout: d.Timeout(schema.TimeoutRead),
IsRunning: clusterInfo.IsRunningOrResizing(),
IsRefresh: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -284,9 +294,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
return err
}
}
// clusters.StartAndGetInfo() always returns a running cluster
// clusters.StartAndGetInfo() always returns a running cluster
// or errors out, so we just know the cluster is active.
err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall, true)
err = librariesAPI.UpdateLibraries(clusterID, libsToInstall, libsToUninstall,
d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
}
Expand Down
70 changes: 63 additions & 7 deletions clusters/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
Fixtures: []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo {
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo{
State: ClusterStateTerminated,
},
},
Expand All @@ -1220,10 +1220,10 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
Resource: "/api/2.0/clusters/events",
},
{
Method: "GET",
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses {
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses{
ClusterID: "foo",
LibraryStatuses: []libraries.LibraryStatus{
{
Expand All @@ -1237,6 +1237,62 @@ func TestReadOnStoppedClusterWithLibrariesDoesNotFail(t *testing.T) {
},
},
Read: true,
ID: "foo",
ID: "foo",
}.ApplyNoError(t)
}
}

// https://github.com/databrickslabs/terraform-provider-databricks/issues/599
func TestRefreshOnRunningClusterWithFailedLibraryUninstallsIt(t *testing.T) {
qa.ResourceFixture{
Resource: ResourceCluster(),
Fixtures: []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/clusters/get?cluster_id=foo",
Response: ClusterInfo{
State: ClusterStateRunning,
},
},
{
Method: "POST",
Resource: "/api/2.0/clusters/events",
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=foo",
Response: libraries.ClusterLibraryStatuses{
ClusterID: "foo",
LibraryStatuses: []libraries.LibraryStatus{
{
Status: "FAILED",
Messages: []string{"fails for the test"},
Library: &libraries.Library{
Jar: "foo.bar",
},
},
{
Status: "INSTALLED",
Library: &libraries.Library{
Whl: "bar.whl",
},
},
},
},
},
{
Method: "POST",
Resource: "/api/2.0/libraries/uninstall",
ExpectedRequest: libraries.ClusterLibraryList{
ClusterID: "foo",
Libraries: []libraries.Library{
{
Jar: "foo.bar",
},
},
},
},
},
Read: true,
ID: "foo",
}.ApplyNoError(t)
}
74 changes: 58 additions & 16 deletions libraries/libraries_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ func (a LibrariesAPI) ClusterStatus(clusterID string) (cls ClusterLibraryStatuse
return
}

func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, isActive bool) error {
type Wait struct {
ClusterID string
Timeout time.Duration
IsRunning bool
IsRefresh bool
}

func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibraryList, timeout time.Duration) error {
if len(remove.Libraries) > 0 {
err := a.Uninstall(remove)
if err != nil {
Expand All @@ -58,29 +65,33 @@ func (a LibrariesAPI) UpdateLibraries(clusterID string, add, remove ClusterLibra
return err
}
}
// TODO: propagate timeout to method signature
_, err := a.WaitForLibrariesInstalled(clusterID, 30*time.Minute, isActive)
_, err := a.WaitForLibrariesInstalled(Wait{
ClusterID: clusterID,
Timeout: timeout,
IsRunning: true,
IsRefresh: false,
})
return err
}

func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.Duration,
isActive bool) (result *ClusterLibraryStatuses, err error) {
err = resource.RetryContext(a.context, timeout, func() *resource.RetryError {
libsClusterStatus, err := a.ClusterStatus(clusterID)
// clusterID string, timeout time.Duration, isActive bool, refresh bool
func (a LibrariesAPI) WaitForLibrariesInstalled(wait Wait) (result *ClusterLibraryStatuses, err error) {
err = resource.RetryContext(a.context, wait.Timeout, func() *resource.RetryError {
libsClusterStatus, err := a.ClusterStatus(wait.ClusterID)
if common.IsMissing(err) {
// eventual consistency error
return resource.RetryableError(err)
}
if err != nil {
return resource.NonRetryableError(err)
}
if !isActive {
if !wait.IsRunning {
log.Printf("[INFO] Cluster %s is currently not running, so just returning list of %d libraries",
clusterID, len(libsClusterStatus.LibraryStatuses))
wait.ClusterID, len(libsClusterStatus.LibraryStatuses))
result = &libsClusterStatus
return nil
}
retry, err := libsClusterStatus.IsRetryNeeded()
retry, err := libsClusterStatus.IsRetryNeeded(wait.IsRefresh)
if retry {
return resource.RetryableError(err)
}
Expand All @@ -90,6 +101,33 @@ func (a LibrariesAPI) WaitForLibrariesInstalled(clusterID string, timeout time.D
result = &libsClusterStatus
return nil
})
if err != nil {
return
}
if wait.IsRunning {
installed := []LibraryStatus{}
cleanup := ClusterLibraryList{
ClusterID: wait.ClusterID,
Libraries: []Library{},
}
// cleanup libraries that failed to install
for _, v := range result.LibraryStatuses {
if v.Status == "FAILED" {
log.Printf("[WARN] Removing failed library %s from %s", v.Library, wait.ClusterID)
cleanup.Libraries = append(cleanup.Libraries, *v.Library)
continue
}
installed = append(installed, v)
}
// and result contains only the libraries that were successfully installed
result.LibraryStatuses = installed
if len(cleanup.Libraries) > 0 {
err = a.Uninstall(cleanup)
if err != nil {
err = fmt.Errorf("cannot cleanup libraries: %w", err)
}
}
}
return
}

Expand Down Expand Up @@ -243,10 +281,10 @@ func (cll *ClusterLibraryList) String() string {

// LibraryStatus is the status on a given cluster when using the libraries status api
type LibraryStatus struct {
Library *Library `json:"library,omitempty"`
Status string `json:"status,omitempty"`
IsLibraryInstalledOnAllClusters bool `json:"is_library_for_all_clusters,omitempty"`
Messages []string `json:"messages,omitempty"`
Library *Library `json:"library,omitempty"`
Status string `json:"status,omitempty"`
IsGlobal bool `json:"is_library_for_all_clusters,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we start deprecation of this flag? Because per documentation:

This option does not install the library on clusters running Databricks Runtime 7.0 and above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not changeable by users anymore 🤷🏻‍♂️ but there still might be some workspaces with dbr7.0, that have this

Messages []string `json:"messages,omitempty"`
}

// ClusterLibraryStatuses A status will be available for all libraries installed on the cluster via the API or
Expand All @@ -271,12 +309,12 @@ func (cls ClusterLibraryStatuses) ToLibraryList() ClusterLibraryList {
// IsRetryNeeded returns first bool if there needs to be retry.
// If there needs to be retry, error message will explain why.
// If retry does not need to happen and error is not nil - it failed.
func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) {
func (cls ClusterLibraryStatuses) IsRetryNeeded(refresh bool) (bool, error) {
pending := 0
ready := 0
errors := []string{}
for _, lib := range cls.LibraryStatuses {
if lib.IsLibraryInstalledOnAllClusters {
if lib.IsGlobal {
continue
}
switch lib.Status {
Expand All @@ -301,6 +339,10 @@ func (cls ClusterLibraryStatuses) IsRetryNeeded() (bool, error) {
ready++
//Some step in installation failed. More information can be found in the messages field.
case "FAILED":
if refresh {
// we're reading library list on a running cluster and some of the libs failed to install
continue
}
errors = append(errors, fmt.Sprintf("%s failed: %s", lib.Library, strings.Join(lib.Messages, ", ")))
continue
}
Expand Down
Loading