diff --git a/brokerapi/broker/brokerfakes/fake_storage.go b/brokerapi/broker/brokerfakes/fake_storage.go index f0de811b3..cd6ce9688 100644 --- a/brokerapi/broker/brokerfakes/fake_storage.go +++ b/brokerapi/broker/brokerfakes/fake_storage.go @@ -197,6 +197,17 @@ type FakeStorage struct { result1 storage.TerraformDeployment result2 error } + RemoveLockFileStub func(string) error + removeLockFileMutex sync.RWMutex + removeLockFileArgsForCall []struct { + arg1 string + } + removeLockFileReturns struct { + result1 error + } + removeLockFileReturnsOnCall map[int]struct { + result1 error + } StoreBindRequestDetailsStub func(storage.BindRequestDetails) error storeBindRequestDetailsMutex sync.RWMutex storeBindRequestDetailsArgsForCall []struct { @@ -242,6 +253,17 @@ type FakeStorage struct { storeTerraformDeploymentReturnsOnCall map[int]struct { result1 error } + WriteLockFileStub func(string) error + writeLockFileMutex sync.RWMutex + writeLockFileArgsForCall []struct { + arg1 string + } + writeLockFileReturns struct { + result1 error + } + writeLockFileReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -1193,6 +1215,67 @@ func (fake *FakeStorage) GetTerraformDeploymentReturnsOnCall(i int, result1 stor }{result1, result2} } +func (fake *FakeStorage) RemoveLockFile(arg1 string) error { + fake.removeLockFileMutex.Lock() + ret, specificReturn := fake.removeLockFileReturnsOnCall[len(fake.removeLockFileArgsForCall)] + fake.removeLockFileArgsForCall = append(fake.removeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.RemoveLockFileStub + fakeReturns := fake.removeLockFileReturns + fake.recordInvocation("RemoveLockFile", []interface{}{arg1}) + fake.removeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) RemoveLockFileCallCount() int { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + return len(fake.removeLockFileArgsForCall) +} + +func (fake *FakeStorage) RemoveLockFileCalls(stub func(string) error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = stub +} + +func (fake *FakeStorage) RemoveLockFileArgsForCall(i int) string { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + argsForCall := fake.removeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeStorage) RemoveLockFileReturns(result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + fake.removeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) RemoveLockFileReturnsOnCall(i int, result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + if fake.removeLockFileReturnsOnCall == nil { + fake.removeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.removeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeStorage) StoreBindRequestDetails(arg1 storage.BindRequestDetails) error { fake.storeBindRequestDetailsMutex.Lock() ret, specificReturn := fake.storeBindRequestDetailsReturnsOnCall[len(fake.storeBindRequestDetailsArgsForCall)] @@ -1438,6 +1521,67 @@ func (fake *FakeStorage) StoreTerraformDeploymentReturnsOnCall(i int, result1 er }{result1} } +func (fake *FakeStorage) WriteLockFile(arg1 string) error { + fake.writeLockFileMutex.Lock() + ret, specificReturn := fake.writeLockFileReturnsOnCall[len(fake.writeLockFileArgsForCall)] + fake.writeLockFileArgsForCall = append(fake.writeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.WriteLockFileStub + fakeReturns := fake.writeLockFileReturns + fake.recordInvocation("WriteLockFile", []interface{}{arg1}) + fake.writeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeStorage) WriteLockFileCallCount() int { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + return len(fake.writeLockFileArgsForCall) +} + +func (fake *FakeStorage) WriteLockFileCalls(stub func(string) error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = stub +} + +func (fake *FakeStorage) WriteLockFileArgsForCall(i int) string { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + argsForCall := fake.writeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeStorage) WriteLockFileReturns(result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + fake.writeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeStorage) WriteLockFileReturnsOnCall(i int, result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + if fake.writeLockFileReturnsOnCall == nil { + fake.writeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -1471,6 +1615,8 @@ func (fake *FakeStorage) Invocations() map[string][][]interface{} { defer fake.getServiceInstanceDetailsMutex.RUnlock() fake.getTerraformDeploymentMutex.RLock() defer fake.getTerraformDeploymentMutex.RUnlock() + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() fake.storeBindRequestDetailsMutex.RLock() defer fake.storeBindRequestDetailsMutex.RUnlock() fake.storeProvisionRequestDetailsMutex.RLock() @@ -1479,6 +1625,8 @@ func (fake *FakeStorage) Invocations() map[string][][]interface{} { defer fake.storeServiceInstanceDetailsMutex.RUnlock() fake.storeTerraformDeploymentMutex.RLock() defer fake.storeTerraformDeploymentMutex.RUnlock() + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/cmd/serve.go b/cmd/serve.go index b69135ccb..6d07274c0 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -22,7 +22,11 @@ import ( "io" "log/slog" "net/http" + "os" + "os/signal" "strings" + "syscall" + "time" "code.cloudfoundry.org/lager/v3" osbapiBroker "github.com/cloudfoundry/cloud-service-broker/v2/brokerapi/broker" @@ -55,6 +59,8 @@ const ( tlsKeyProp = "api.tlsKey" encryptionPasswords = "db.encryption.passwords" encryptionEnabled = "db.encryption.enabled" + + shutdownTimeout = time.Hour ) var cfCompatibilityToggle = toggles.Features.Toggle("enable-cf-sharing", false, `Set all services to have the Sharable flag so they can be shared @@ -102,7 +108,13 @@ func serve() { logger.Fatal("Error initializing service broker config", err) } var serviceBroker domain.ServiceBroker - serviceBroker, err = osbapiBroker.New(cfg, storage.New(db, encryptor), logger) + csbStore := storage.New(db, encryptor) + err = csbStore.RecoverInProgressOperations(logger) + if err != nil { + logger.Fatal("Error recovering in-progress operations", err) + } + + serviceBroker, err = osbapiBroker.New(cfg, csbStore, logger) if err != nil { logger.Fatal("Error initializing service broker", err) } @@ -133,7 +145,9 @@ func serve() { if err != nil { logger.Error("failed to get database connection", err) } - startServer(cfg.Registry, sqldb, brokerAPI, storage.New(db, encryptor), credentials) + httpServer := startServer(cfg.Registry, sqldb, brokerAPI, csbStore, credentials) + + listenForShutdownSignal(httpServer, logger, csbStore) } func serveDocs() { @@ -188,7 +202,7 @@ func setupDBEncryption(db *gorm.DB, logger lager.Logger) storage.Encryptor { return config.Encryptor } -func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) { +func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.Handler, store *storage.Storage, credentials brokerapi.BrokerCredentials) *http.Server { logger := utils.NewLogger("cloud-service-broker") docsHandler := server.DocsHandler(registry) @@ -225,24 +239,44 @@ func startServer(registry pakBroker.BrokerRegistry, db *sql.DB, brokerapi http.H Addr: fmt.Sprintf("%s:%s", host, port), Handler: router, } - var err error - if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" { - err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath) - } else { - err = httpServer.ListenAndServe() - } - // when the server is receiving a signal, we probably do not want to panic. - if err != http.ErrServerClosed { - logger.Fatal("Failed to start broker", err) - } + go func() { + var err error + if tlsCertCaBundleFilePath != "" && tlsKeyFilePath != "" { + err = httpServer.ListenAndServeTLS(tlsCertCaBundleFilePath, tlsKeyFilePath) + } else { + err = httpServer.ListenAndServe() + } + if err == http.ErrServerClosed { + logger.Info("shutting down csb") + } else { + logger.Fatal("Failed to start broker", err) + } + }() + return httpServer } -func labelName(label string) string { - switch label { - case "": - return "none" +func listenForShutdownSignal(httpServer *http.Server, logger lager.Logger, store *storage.Storage) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM) + + signalReceived := <-sigChan + + switch signalReceived { + + case syscall.SIGTERM: + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), shutdownTimeout) + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.Fatal("shutdown error: %v", err) + } + logger.Info("received SIGTERM, server is shutting down gracefully allowing for in flight work to finish") + defer shutdownRelease() + for store.LockFilesExist() { + logger.Info("draining csb in progress") + time.Sleep(time.Second * 1) + } + logger.Info("draining complete") default: - return label + logger.Info(fmt.Sprintf("csb does not handle the %s interrupt signal", signalReceived)) } } @@ -288,3 +322,12 @@ func importStateHandler(store *storage.Storage) http.Handler { } }) } + +func labelName(label string) string { + switch label { + case "": + return "none" + default: + return label + } +} diff --git a/dbservice/dbservice.go b/dbservice/dbservice.go index 10b1a8faa..3aaf70502 100644 --- a/dbservice/dbservice.go +++ b/dbservice/dbservice.go @@ -43,9 +43,6 @@ func NewWithMigrations(logger lager.Logger) *gorm.DB { if err := RunMigrations(db); err != nil { panic(fmt.Sprintf("Error migrating database: %s", err)) } - if err := recoverInProgressOperations(db, logger); err != nil { - panic(fmt.Sprintf("Error recovering in-progress operations: %s", err)) - } }) return db } diff --git a/dbservice/recover_in_progress_operations.go b/dbservice/recover_in_progress_operations.go deleted file mode 100644 index 9b9c1a549..000000000 --- a/dbservice/recover_in_progress_operations.go +++ /dev/null @@ -1,24 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - "gorm.io/gorm" -) - -func recoverInProgressOperations(db *gorm.DB, logger lager.Logger) error { - logger = logger.Session("recover-in-progress-operations") - - var terraformDeploymentBatch []models.TerraformDeployment - result := db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { - for i := range terraformDeploymentBatch { - terraformDeploymentBatch[i].LastOperationState = "failed" - terraformDeploymentBatch[i].LastOperationMessage = "the broker restarted while the operation was in progress" - logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) - } - - return tx.Save(&terraformDeploymentBatch).Error - }) - - return result.Error -} diff --git a/dbservice/recover_in_progress_operations_test.go b/dbservice/recover_in_progress_operations_test.go deleted file mode 100644 index c757e94ea..000000000 --- a/dbservice/recover_in_progress_operations_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package dbservice - -import ( - "code.cloudfoundry.org/lager/v3/lagertest" - "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "gorm.io/driver/sqlite" - "gorm.io/gorm" -) - -var _ = Describe("RecoverInProgressOperations()", func() { - It("recovers the expected operations", func() { - const ( - recoverID = "fake-id-to-recover" - okID = "fake-id-that-does-not-need-to-be-recovered" - ) - - // Setup - db, err := gorm.Open(sqlite.Open(":memory:"), nil) - Expect(err).NotTo(HaveOccurred()) - Expect(db.Migrator().CreateTable(&models.TerraformDeployment{})).To(Succeed()) - - Expect(db.Create(&models.TerraformDeployment{ - ID: recoverID, - LastOperationType: "fake-type", - LastOperationState: "in progress", - LastOperationMessage: "fake-type in progress", - }).Error).To(Succeed()) - Expect(db.Create(&models.TerraformDeployment{ - ID: okID, - LastOperationType: "fake-type", - LastOperationState: "succeeded", - LastOperationMessage: "fake-type succeeded", - }).Error).To(Succeed()) - - // Call the function - logger := lagertest.NewTestLogger("test") - recoverInProgressOperations(db, logger) - - // Behaviors - By("marking the in-progress operation as failed") - var r1 models.TerraformDeployment - Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) - Expect(r1.LastOperationState).To(Equal("failed")) - Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) - - By("no updating other operations") - var r2 models.TerraformDeployment - Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) - Expect(r2.LastOperationState).To(Equal("succeeded")) - Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) - - By("logging the expected message") - Expect(logger.Buffer().Contents()).To(SatisfyAll( - ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), - ContainSubstring(`"workspace_id":"fake-id-to-recover"`), - )) - }) -}) diff --git a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf index db12d4a33..ba683a089 100644 --- a/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf +++ b/integrationtest/fixtures/termination-recovery/fake-uuid-provision.tf @@ -3,9 +3,16 @@ terraform { random = { source = "registry.terraform.io/hashicorp/random" } + null = { + source = "registry.terraform.io/hashicorp/null" + } + } +} +resource "null_resource" "sleeper" { + provisioner "local-exec" { + command = "sleep 10" } } - resource "random_uuid" "random" {} -output provision_output { value = random_uuid.random.result } \ No newline at end of file +output provision_output { value = random_uuid.random.result } diff --git a/integrationtest/fixtures/termination-recovery/manifest.yml b/integrationtest/fixtures/termination-recovery/manifest.yml index 306bb39ab..b5758ffe9 100644 --- a/integrationtest/fixtures/termination-recovery/manifest.yml +++ b/integrationtest/fixtures/termination-recovery/manifest.yml @@ -14,5 +14,7 @@ terraform_binaries: source: https://github.com/opentofu/opentofu/archive/refs/tags/v1.6.0.zip - name: terraform-provider-random version: 3.1.0 +- name: terraform-provider-null + version: 3.2.2 service_definitions: - fake-uuid-service.yml diff --git a/integrationtest/import_state_test.go b/integrationtest/import_state_test.go index f7e453c4c..36ab237d7 100644 --- a/integrationtest/import_state_test.go +++ b/integrationtest/import_state_test.go @@ -31,7 +31,7 @@ var _ = Describe("Import State", func() { broker = must(testdrive.StartBroker(csb, brokerpak, database)) DeferCleanup(func() { - Expect(broker.Stop()).To(Succeed()) + Expect(broker.Terminate()).To(Succeed()) cleanup(brokerpak) }) }) diff --git a/integrationtest/integrationtest_suite_test.go b/integrationtest/integrationtest_suite_test.go index cae7ec363..280c9d235 100644 --- a/integrationtest/integrationtest_suite_test.go +++ b/integrationtest/integrationtest_suite_test.go @@ -30,7 +30,7 @@ var ( var _ = SynchronizedBeforeSuite( func() []byte { // -gcflags enabled "gops", but had to be removed as this doesn't compile with Go 1.19 - //path, err := Build("github.com/cloudfoundry/cloud-service-broker", `-gcflags="all=-N -l"`) + // path, err := Build("github.com/cloudfoundry/cloud-service-broker", `-gcflags="all=-N -l"`) path := must(Build("github.com/cloudfoundry/cloud-service-broker/v2")) return []byte(path) }, @@ -45,8 +45,18 @@ var _ = SynchronizedBeforeSuite( ) var _ = SynchronizedAfterSuite( - func() {}, - func() { CleanupBuildArtifacts() }, + func() { + }, + func() { + CleanupBuildArtifacts() + files, err := filepath.Glob("/tmp/brokerpak*") + Expect(err).ToNot(HaveOccurred()) + for _, f := range files { + if err := os.RemoveAll(f); err != nil { + Expect(err).ToNot(HaveOccurred()) + } + } + }, ) var _ = BeforeEach(func() { diff --git a/integrationtest/termination_recovery_test.go b/integrationtest/termination_recovery_test.go index 79c547c70..b8b1d64c3 100644 --- a/integrationtest/termination_recovery_test.go +++ b/integrationtest/termination_recovery_test.go @@ -3,6 +3,8 @@ package integrationtest_test import ( "fmt" "net/http" + "os" + "time" "github.com/cloudfoundry/cloud-service-broker/v2/integrationtest/packer" "github.com/cloudfoundry/cloud-service-broker/v2/internal/testdrive" @@ -31,142 +33,232 @@ var _ = Describe("Recovery From Broker Termination", func() { stdout = NewBuffer() stderr = NewBuffer() - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + }) - DeferCleanup(func() { - Expect(broker.Stop()).To(Succeed()) - cleanup(brokerpak) + Describe("running csb on a VM", func() { + Describe("when a vm broker properly drains", func() { + BeforeEach(func() { + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + }) + }) + + It("can finish the in flight operation", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + Eventually(stdout, time.Second*5).Should(Say(`tofu","apply","-auto-approve"`)) + By("gracefully stopping the broker") + // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. + go func() { + defer GinkgoRecover() + Expect(broker.Stop()).To(Succeed()) + }() + + By("logging a message") + Eventually(stdout).Should(Say("received SIGTERM")) + Eventually(stdout).Should(Say("draining csb in progress")) + + By("ensuring that the broker rejects requests") + Expect(broker.Client.LastOperation(instanceGUID, uuid.NewString()).Error).To(HaveOccurred()) + + // Fun stuff, do not optimize this with a SatisfyAll().. The relevant part of the docs is: + // When Say succeeds, it fast forwards the gbytes.Buffer's read cursor to just after the successful match. + // meaning if below lines will be partially matched at first attempt, no further attempt can succeed because we + // forwarded past the location of the initial first match. + + Eventually(stdout, time.Second*20).Should(Say(fmt.Sprintf("successfully stored state for tf:%s:", instanceGUID))) + Eventually(stdout, time.Second*20).Should(Say("draining complete")) + Consistently(stderr, time.Second*20).ShouldNot(Say("shutdown error")) + + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + + By("checking that the resource finished successfully") + response = broker.Client.LastOperation(instanceGUID, uuid.NewString()) + Expect(string(response.ResponseBody)).To(ContainSubstring(`{"state":"succeeded","description":"provision succeeded"}`)) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + + By("ensuring SI can be successfully deleted") + si := testdrive.ServiceInstance{GUID: instanceGUID, ServiceOfferingGUID: serviceOfferingGUID, ServicePlanGUID: servicePlanGUID} + Expect(broker.Deprovision(si)).To(Succeed()) + }) + }) + Describe("when a vm broker did not properly drain", func() { + var dirDefault string + BeforeEach(func() { + By("ensuring that the expected lockdir exists") + + dirDefault, _ = os.MkdirTemp("/tmp/", "lockfiles") + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv(fmt.Sprintf("CSB_LOCKFILE_DIR=%s", dirDefault)))) + }) + + It("fails service instances that have a lockfile on start", func() { + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + + Eventually(stdout, time.Second*5).Should(Say(`tofu","apply","-auto-approve"`)) + By("forcefully stopping the broker") + // Stop seems to be blocking, so run it in a routine so we can check that the broker actually rejects requests until it's fully stopped. + go func() { + defer GinkgoRecover() + Expect(broker.Terminate()).To(Succeed()) + }() + + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv(fmt.Sprintf("CSB_LOCKFILE_DIR=%s", dirDefault)))) + lastOperation, err := broker.LastOperation(instanceGUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + }) }) }) - It("can recover from a terminated create", func() { - By("starting to provision") - instanceGUID := uuid.NewString() - response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) - - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instanceGUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) - - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - - // OSBAPI requires that HTTP 409 (Conflict) is returned - By("refusing to allow a duplicate instance") - response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusConflict)) - - By("allowing the instance to be cleaned up") - response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusOK)) - }) + Describe("running csb as a CF app", func() { + BeforeEach(func() { + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - It("can recover from a terminated update", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + DeferCleanup(func() { + Expect(broker.Terminate()).To(Succeed()) + }) + }) - By("starting to update") - response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + It("can recover from a terminated create", func() { + By("starting to provision") + instanceGUID := uuid.NewString() + response := broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) + + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instanceGUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instanceGUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + + // OSBAPI requires that HTTP 409 (Conflict) is returned + By("refusing to allow a duplicate instance") + response = broker.Client.Provision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusConflict)) + + By("allowing the instance to be cleaned up") + response = broker.Client.Deprovision(instanceGUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusOK)) + }) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + It("can recover from a terminated update", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("starting to update") + response := broker.Client.Update(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString(), nil, domain.PreviousValues{}, nil) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("allowing the operation to be restarted") - Expect(broker.UpdateService(instance)).To(Succeed()) - }) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) + + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - It("can recover from a terminated delete", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + By("allowing the operation to be restarted") + Expect(broker.UpdateService(instance)).To(Succeed()) + }) - By("starting to delete") - response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) - Expect(response.Error).NotTo(HaveOccurred()) - Expect(response.StatusCode).To(Equal(http.StatusAccepted)) + It("can recover from a terminated delete", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("starting to delete") + response := broker.Client.Deprovision(instance.GUID, serviceOfferingGUID, servicePlanGUID, uuid.NewString()) + Expect(response.Error).NotTo(HaveOccurred()) + Expect(response.StatusCode).To(Equal(http.StatusAccepted)) - By("reporting that an operation failed") - lastOperation, err := broker.LastOperation(instance.GUID) - Expect(err).NotTo(HaveOccurred()) - Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) - Expect(lastOperation.State).To(BeEquivalentTo("failed")) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("logging a message") - ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) - Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) + By("reporting that an operation failed") + lastOperation, err := broker.LastOperation(instance.GUID) + Expect(err).NotTo(HaveOccurred()) + Expect(lastOperation.Description).To(Equal("the broker restarted while the operation was in progress")) + Expect(lastOperation.State).To(BeEquivalentTo("failed")) - By("allowing the operation to be restarted") - Expect(broker.Deprovision(instance)).To(Succeed()) - }) + By("logging a message") + ws := fmt.Sprintf(`"workspace_id":"tf:%s:"`, instance.GUID) + Expect(string(stdout.Contents())).To(SatisfyAll(ContainSubstring("recover-in-progress-operations.mark-as-failed"), ContainSubstring(ws))) - It("can recover from a terminated bind", func() { - By("successfully provisioning a service instance") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + By("allowing the operation to be restarted") + Expect(broker.Deprovision(instance)).To(Succeed()) + }) - By("starting to bind") - bindingGUID := uuid.NewString() - go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + It("can recover from a terminated bind", func() { + By("successfully provisioning a service instance") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) + By("starting to bind") + bindingGUID := uuid.NewString() + go broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Binding".*"binding_id":"%s"`, bindingGUID))) - By("allowing the operation to be restarted") - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) - }) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - It("can recover from a terminated unbind", func() { - By("successfully provisioning a service instance and binding") - instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) - Expect(err).NotTo(HaveOccurred()) + By("allowing the operation to be restarted") + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) + }) + + It("can recover from a terminated unbind", func() { + By("successfully provisioning a service instance and binding") + instance, err := broker.Provision(serviceOfferingGUID, servicePlanGUID) + Expect(err).NotTo(HaveOccurred()) - bindingGUID := uuid.NewString() - _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) - Expect(err).NotTo(HaveOccurred()) + bindingGUID := uuid.NewString() + _, err = broker.CreateBinding(instance, testdrive.WithBindingGUID(bindingGUID)) + Expect(err).NotTo(HaveOccurred()) - By("starting to unbind") - go broker.DeleteBinding(instance, bindingGUID) + By("starting to unbind") + go broker.DeleteBinding(instance, bindingGUID) - Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) + Eventually(stdout).Should(Say(fmt.Sprintf(`"cloud-service-broker.Unbinding".*"binding_id":"%s"`, bindingGUID))) - By("terminating and restarting the broker") - Expect(broker.Stop()).To(Succeed()) - broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr))) + By("terminating and restarting the broker") + Expect(broker.Terminate()).To(Succeed()) + broker = must(testdrive.StartBroker(csb, brokerpak, database, testdrive.WithOutputs(stdout, stderr), testdrive.WithEnv("CF_INSTANCE_GUID=dcfa061e-c0e3-4237-a805-734578347393"))) - By("allowing the operation to be restarted") - Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + By("allowing the operation to be restarted") + Expect(broker.DeleteBinding(instance, bindingGUID)).To(Succeed()) + }) }) }) diff --git a/internal/storage/recover_in_progress_operations.go b/internal/storage/recover_in_progress_operations.go new file mode 100644 index 000000000..e532fe48c --- /dev/null +++ b/internal/storage/recover_in_progress_operations.go @@ -0,0 +1,67 @@ +package storage + +import ( + "os" + + "code.cloudfoundry.org/lager/v3" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "gorm.io/gorm" +) + +const FailedMessage = "the broker restarted while the operation was in progress" + +func (s *Storage) RecoverInProgressOperations(logger lager.Logger) error { + logger = logger.Session("recover-in-progress-operations") + + if runningAsCFApp() { + return s.markAllInProgressOperationsAsFailed(logger) + } else { + return s.markAllOperationsWithLockFilesAsFailed(logger) + } + +} + +func runningAsCFApp() bool { + return os.Getenv("CF_INSTANCE_GUID") != "" +} + +func (s *Storage) markAllInProgressOperationsAsFailed(logger lager.Logger) error { + logger.Info("checking all in in progress operations from DB") + var terraformDeploymentBatch []models.TerraformDeployment + result := s.db.Where("last_operation_state = ?", "in progress").FindInBatches(&terraformDeploymentBatch, 100, func(tx *gorm.DB, batchNumber int) error { + for i := range terraformDeploymentBatch { + terraformDeploymentBatch[i].LastOperationState = "failed" + terraformDeploymentBatch[i].LastOperationMessage = FailedMessage + logger.Info("mark-as-failed", lager.Data{"workspace_id": terraformDeploymentBatch[i].ID}) + } + + return tx.Save(&terraformDeploymentBatch).Error + }) + + return result.Error +} + +func (s *Storage) markAllOperationsWithLockFilesAsFailed(logger lager.Logger) error { + logger.Info("checking all in in progress operations from lockfiles") + + deploymentIds, err := s.GetLockedDeploymentIds() + if err != nil { + return err + } + + for _, id := range deploymentIds { + var receiver models.TerraformDeployment + if err := s.db.Where("id = ?", id).First(&receiver).Error; err != nil { + return err + } + receiver.LastOperationState = "failed" + receiver.LastOperationMessage = FailedMessage + + err := s.db.Save(receiver).Error + if err != nil { + return err + } + logger.Info("mark-as-failed", lager.Data{"workspace_id": id}) + } + return err +} diff --git a/internal/storage/recover_in_progress_operations_test.go b/internal/storage/recover_in_progress_operations_test.go new file mode 100644 index 000000000..9ad4fa919 --- /dev/null +++ b/internal/storage/recover_in_progress_operations_test.go @@ -0,0 +1,98 @@ +package storage_test + +import ( + "os" + + "code.cloudfoundry.org/lager/v3/lagertest" + "github.com/cloudfoundry/cloud-service-broker/v2/dbservice/models" + "github.com/cloudfoundry/cloud-service-broker/v2/internal/storage" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +const ( + recoverID = "fake-id-to-recover" + okID = "fake-id-that-does-not-need-to-be-recovered" +) + +var _ = Describe("RecoverInProgressOperations()", func() { + BeforeEach(func() { + // Setup + Expect(db.Create(&models.TerraformDeployment{ + ID: recoverID, + LastOperationType: "fake-type", + LastOperationState: "in progress", + LastOperationMessage: "fake-type in progress", + }).Error).To(Succeed()) + Expect(db.Create(&models.TerraformDeployment{ + ID: okID, + LastOperationType: "fake-type", + LastOperationState: "succeeded", + LastOperationMessage: "fake-type succeeded", + }).Error).To(Succeed()) + var rowCount int64 + db.Model(&models.TerraformDeployment{}).Count(&rowCount) + Expect(rowCount).To(BeNumerically("==", 2)) + + logger = lagertest.NewTestLogger("test") + store = storage.New(db, encryptor) + }) + + When("running as a cf app", func() { + It("recovers the expected operations", func() { + os.Setenv("CF_INSTANCE_GUID", "something") // The presence of this variable means we are running as an App + defer os.Unsetenv("CF_INSTANCE_GUID") + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) + + When("running on a VM", func() { + It("recovers the expected operations", func() { + // When running on a VM there will be a lockfile and record in the db + Expect(store.WriteLockFile(recoverID)).To(Succeed()) + + // Call the function + Expect(store.RecoverInProgressOperations(logger)).To(Succeed()) + + // Behaviors + By("marking the in-progress operation as failed") + var r1 models.TerraformDeployment + Expect(db.Where("id = ?", recoverID).First(&r1).Error).To(Succeed()) + Expect(r1.LastOperationState).To(Equal("failed")) + Expect(r1.LastOperationMessage).To(Equal("the broker restarted while the operation was in progress")) + + By("no updating other operations") + var r2 models.TerraformDeployment + Expect(db.Where("id = ?", okID).First(&r2).Error).To(Succeed()) + Expect(r2.LastOperationState).To(Equal("succeeded")) + Expect(r2.LastOperationMessage).To(Equal("fake-type succeeded")) + + By("logging the expected message") + Expect(logger.Buffer().Contents()).To(SatisfyAll( + ContainSubstring(`"message":"test.recover-in-progress-operations.mark-as-failed"`), + ContainSubstring(`"workspace_id":"fake-id-to-recover"`), + )) + }) + }) +}) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 96a992208..335c90a18 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,17 +1,31 @@ // Package storage implements a Database Access Object (DAO) package storage -import "gorm.io/gorm" +import ( + "os" + + "gorm.io/gorm" +) type Storage struct { - db *gorm.DB - encryptor Encryptor + db *gorm.DB + encryptor Encryptor + lockFileDir string } func New(db *gorm.DB, encryptor Encryptor) *Storage { + // the VM based HA deployment requires a drain mechanism. LockFiles are a simple solution. + // but not every environment will opt for using VM based deployments. So detect if the lockfile + // director is present. + + dirDefault := os.Getenv("CSB_LOCKFILE_DIR") + if _, err := os.Stat(dirDefault); err != nil { + dirDefault, _ = os.MkdirTemp("/tmp/", "lockfiles") + } return &Storage{ - db: db, - encryptor: encryptor, + db: db, + encryptor: encryptor, + lockFileDir: dirDefault, } } diff --git a/internal/storage/storage_suite_test.go b/internal/storage/storage_suite_test.go index c02209966..31728a8c0 100644 --- a/internal/storage/storage_suite_test.go +++ b/internal/storage/storage_suite_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "code.cloudfoundry.org/lager/v3/lagertest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "gorm.io/driver/sqlite" @@ -19,6 +20,7 @@ var ( db *gorm.DB encryptor *storagefakes.FakeEncryptor store *storage.Storage + logger *lagertest.TestLogger ) func TestStorage(t *testing.T) { diff --git a/internal/storage/terraform_deployment.go b/internal/storage/terraform_deployment.go index 9d6e022aa..c6e8735c6 100644 --- a/internal/storage/terraform_deployment.go +++ b/internal/storage/terraform_deployment.go @@ -2,6 +2,9 @@ package storage import ( "fmt" + "os" + "path/filepath" + "strings" "time" "github.com/hashicorp/go-version" @@ -156,3 +159,33 @@ func (s *Storage) loadTerraformDeploymentIfExists(id string, receiver any) error return s.db.Where("id = ?", id).First(receiver).Error } + +func (s *Storage) LockFilesExist() bool { + entries, _ := os.ReadDir(s.lockFileDir) + return len(entries) != 0 +} + +func (s *Storage) WriteLockFile(deploymentID string) error { + return os.WriteFile(filepath.Join(s.lockFileDir, fileNameFromDeploymentID(deploymentID)), []byte{}, 0o644) +} + +func (s *Storage) RemoveLockFile(deploymentID string) error { + return os.Remove(filepath.Join(s.lockFileDir, fileNameFromDeploymentID(deploymentID))) +} + +func (s *Storage) GetLockedDeploymentIds() ([]string, error) { + entries, err := os.ReadDir(s.lockFileDir) + var names []string + for _, entry := range entries { + names = append(names, deploymentIDFromFileName(entry.Name())) + } + return names, err +} + +func fileNameFromDeploymentID(deploymentID string) string { + return strings.ReplaceAll(deploymentID, ":", "_") +} + +func deploymentIDFromFileName(fileName string) string { + return strings.ReplaceAll(fileName, "_", ":") +} diff --git a/internal/storage/terraform_deployment_test.go b/internal/storage/terraform_deployment_test.go index 1dfe0ada9..86bd3a8fc 100644 --- a/internal/storage/terraform_deployment_test.go +++ b/internal/storage/terraform_deployment_test.go @@ -206,10 +206,54 @@ var _ = Describe("TerraformDeployments", func() { Expect(store.DeleteTerraformDeployment("not-there")).NotTo(HaveOccurred()) }) }) + + Describe("LockFileExists", func() { + It("reports correct status", func() { + Expect(store.WriteLockFile("1234")).To(Succeed()) + Expect(store.WriteLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("1234")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeTrue()) + + Expect(store.RemoveLockFile("5678")).To(Succeed()) + + Expect(store.LockFilesExist()).To(BeFalse()) + }) + }) + + Describe("GetLockedDeploymentIds", func() { + It("returns correct names", func() { + names, err := store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + + Expect(store.WriteLockFile("tf:1234:")).To(Succeed()) + Expect(store.WriteLockFile("tf:5678:9123")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("tf:1234:", "tf:5678:9123")) + + Expect(store.RemoveLockFile("tf:1234:")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(ContainElements("tf:5678:9123")) + Expect(names).ToNot(ContainElements("tf:1234:")) + + Expect(store.RemoveLockFile("tf:5678:9123")).To(Succeed()) + + names, err = store.GetLockedDeploymentIds() + Expect(err).NotTo(HaveOccurred()) + Expect(names).To(BeEmpty()) + }) + }) }) func addFakeTerraformDeployments() { - Expect(db.Create(&models.TerraformDeployment{ ID: "fake-id-1", Workspace: fakeWorkspace("fake-1", "1.2.3"), diff --git a/internal/testdrive/broker.go b/internal/testdrive/broker.go index 416d70deb..f2259483b 100644 --- a/internal/testdrive/broker.go +++ b/internal/testdrive/broker.go @@ -23,6 +23,15 @@ func (b *Broker) Stop() error { case b == nil, b.runner == nil: return nil default: - return b.runner.stop() + return b.runner.gracefullStop() + } +} + +func (b *Broker) Terminate() error { + switch { + case b == nil, b.runner == nil: + return nil + default: + return b.runner.forceStop() } } diff --git a/internal/testdrive/broker_start.go b/internal/testdrive/broker_start.go index 7831b189f..d71986553 100644 --- a/internal/testdrive/broker_start.go +++ b/internal/testdrive/broker_start.go @@ -115,7 +115,7 @@ func StartBroker(csbPath, bpk, db string, opts ...StartBrokerOption) (*Broker, e case err == nil && response.StatusCode == http.StatusOK: return &broker, nil case time.Since(start) > time.Minute: - if err := broker.runner.stop(); err != nil { + if err := broker.runner.forceStop(); err != nil { return nil, err } return nil, fmt.Errorf("timed out after %s waiting for broker to start: %s\n%s", time.Since(start), stdout.String(), stderr.String()) diff --git a/internal/testdrive/runner.go b/internal/testdrive/runner.go index 1d4736e3b..68aa89481 100644 --- a/internal/testdrive/runner.go +++ b/internal/testdrive/runner.go @@ -2,6 +2,7 @@ package testdrive import ( "os/exec" + "syscall" "time" ) @@ -26,12 +27,29 @@ func (r *runner) error(err error) *runner { return r } -func (r *runner) stop() error { +func (r *runner) gracefullStop() error { if r.exited { return nil } if r.cmd != nil && r.cmd.Process != nil { - if err := r.cmd.Process.Kill(); err != nil { + if err := r.cmd.Process.Signal(syscall.SIGTERM); err != nil { + return err + } + } + + for !r.exited { + time.Sleep(time.Millisecond) + } + + return nil +} + +func (r *runner) forceStop() error { + if r.exited { + return nil + } + if r.cmd != nil && r.cmd.Process != nil { + if err := r.cmd.Process.Signal(syscall.SIGKILL); err != nil { return err } } diff --git a/pkg/broker/brokerfakes/fake_service_provider_storage.go b/pkg/broker/brokerfakes/fake_service_provider_storage.go index 726eb81e4..c194ea7dd 100644 --- a/pkg/broker/brokerfakes/fake_service_provider_storage.go +++ b/pkg/broker/brokerfakes/fake_service_provider_storage.go @@ -59,6 +59,17 @@ type FakeServiceProviderStorage struct { result1 storage.TerraformDeployment result2 error } + RemoveLockFileStub func(string) error + removeLockFileMutex sync.RWMutex + removeLockFileArgsForCall []struct { + arg1 string + } + removeLockFileReturns struct { + result1 error + } + removeLockFileReturnsOnCall map[int]struct { + result1 error + } StoreTerraformDeploymentStub func(storage.TerraformDeployment) error storeTerraformDeploymentMutex sync.RWMutex storeTerraformDeploymentArgsForCall []struct { @@ -70,6 +81,17 @@ type FakeServiceProviderStorage struct { storeTerraformDeploymentReturnsOnCall map[int]struct { result1 error } + WriteLockFileStub func(string) error + writeLockFileMutex sync.RWMutex + writeLockFileArgsForCall []struct { + arg1 string + } + writeLockFileReturns struct { + result1 error + } + writeLockFileReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -327,6 +349,67 @@ func (fake *FakeServiceProviderStorage) GetTerraformDeploymentReturnsOnCall(i in }{result1, result2} } +func (fake *FakeServiceProviderStorage) RemoveLockFile(arg1 string) error { + fake.removeLockFileMutex.Lock() + ret, specificReturn := fake.removeLockFileReturnsOnCall[len(fake.removeLockFileArgsForCall)] + fake.removeLockFileArgsForCall = append(fake.removeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.RemoveLockFileStub + fakeReturns := fake.removeLockFileReturns + fake.recordInvocation("RemoveLockFile", []interface{}{arg1}) + fake.removeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileCallCount() int { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + return len(fake.removeLockFileArgsForCall) +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileCalls(stub func(string) error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = stub +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileArgsForCall(i int) string { + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() + argsForCall := fake.removeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileReturns(result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + fake.removeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceProviderStorage) RemoveLockFileReturnsOnCall(i int, result1 error) { + fake.removeLockFileMutex.Lock() + defer fake.removeLockFileMutex.Unlock() + fake.RemoveLockFileStub = nil + if fake.removeLockFileReturnsOnCall == nil { + fake.removeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.removeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceProviderStorage) StoreTerraformDeployment(arg1 storage.TerraformDeployment) error { fake.storeTerraformDeploymentMutex.Lock() ret, specificReturn := fake.storeTerraformDeploymentReturnsOnCall[len(fake.storeTerraformDeploymentArgsForCall)] @@ -388,6 +471,67 @@ func (fake *FakeServiceProviderStorage) StoreTerraformDeploymentReturnsOnCall(i }{result1} } +func (fake *FakeServiceProviderStorage) WriteLockFile(arg1 string) error { + fake.writeLockFileMutex.Lock() + ret, specificReturn := fake.writeLockFileReturnsOnCall[len(fake.writeLockFileArgsForCall)] + fake.writeLockFileArgsForCall = append(fake.writeLockFileArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.WriteLockFileStub + fakeReturns := fake.writeLockFileReturns + fake.recordInvocation("WriteLockFile", []interface{}{arg1}) + fake.writeLockFileMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceProviderStorage) WriteLockFileCallCount() int { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + return len(fake.writeLockFileArgsForCall) +} + +func (fake *FakeServiceProviderStorage) WriteLockFileCalls(stub func(string) error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = stub +} + +func (fake *FakeServiceProviderStorage) WriteLockFileArgsForCall(i int) string { + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() + argsForCall := fake.writeLockFileArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceProviderStorage) WriteLockFileReturns(result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + fake.writeLockFileReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceProviderStorage) WriteLockFileReturnsOnCall(i int, result1 error) { + fake.writeLockFileMutex.Lock() + defer fake.writeLockFileMutex.Unlock() + fake.WriteLockFileStub = nil + if fake.writeLockFileReturnsOnCall == nil { + fake.writeLockFileReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeLockFileReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceProviderStorage) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -399,8 +543,12 @@ func (fake *FakeServiceProviderStorage) Invocations() map[string][][]interface{} defer fake.getServiceBindingIDsForServiceInstanceMutex.RUnlock() fake.getTerraformDeploymentMutex.RLock() defer fake.getTerraformDeploymentMutex.RUnlock() + fake.removeLockFileMutex.RLock() + defer fake.removeLockFileMutex.RUnlock() fake.storeTerraformDeploymentMutex.RLock() defer fake.storeTerraformDeploymentMutex.RUnlock() + fake.writeLockFileMutex.RLock() + defer fake.writeLockFileMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/broker/service_provider.go b/pkg/broker/service_provider.go index 1cd6ad535..f28cc0758 100644 --- a/pkg/broker/service_provider.go +++ b/pkg/broker/service_provider.go @@ -77,4 +77,6 @@ type ServiceProviderStorage interface { DeleteTerraformDeployment(id string) error ExistsTerraformDeployment(id string) (bool, error) GetServiceBindingIDsForServiceInstance(serviceInstanceID string) ([]string, error) + WriteLockFile(guid string) error + RemoveLockFile(guid string) error } diff --git a/pkg/providers/tf/deployment_manager.go b/pkg/providers/tf/deployment_manager.go index 188581ec5..3a3f6c19b 100644 --- a/pkg/providers/tf/deployment_manager.go +++ b/pkg/providers/tf/deployment_manager.go @@ -51,7 +51,7 @@ func (d *DeploymentManager) MarkOperationStarted(deployment *storage.TerraformDe return err } - return nil + return d.store.WriteLockFile(deployment.ID) } func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformDeployment, err error) error { @@ -74,8 +74,16 @@ func (d *DeploymentManager) MarkOperationFinished(deployment *storage.TerraformD }) } - - return d.store.StoreTerraformDeployment(*deployment) + err = d.store.StoreTerraformDeployment(*deployment) + if err != nil { + d.logger.Error("store-state", err, lager.Data{ + "deploymentID": deployment.ID, + "message": deployment.LastOperationMessage, + }) + } else { + d.logger.Info(fmt.Sprintf("successfully stored state for %s", deployment.ID)) + } + return d.store.RemoveLockFile(deployment.ID) } func (d *DeploymentManager) OperationStatus(deploymentID string) (bool, string, error) { diff --git a/pkg/providers/tf/deployment_manager_test.go b/pkg/providers/tf/deployment_manager_test.go index 0e04c3b20..30c8d5eb5 100644 --- a/pkg/providers/tf/deployment_manager_test.go +++ b/pkg/providers/tf/deployment_manager_test.go @@ -205,7 +205,8 @@ var _ = Describe("DeploymentManager", func() { storedDeployment := fakeStore.StoreTerraformDeploymentArgsForCall(0) Expect(storedDeployment.LastOperationState).To(Equal("succeeded")) Expect(storedDeployment.LastOperationMessage).To(Equal("provision succeeded: apply completed successfully")) - Expect(fakeLogger.Logs()).To(BeEmpty()) + Expect(fakeLogger.Logs()).To(HaveLen(1)) + Expect(fakeLogger.Logs()[0].Message).To(Equal("broker.successfully stored state for deploymentID")) }) }) @@ -222,12 +223,12 @@ var _ = Describe("DeploymentManager", func() { Expect(storedDeployment.LastOperationType).To(Equal(existingDeployment.LastOperationType)) Expect(storedDeployment.LastOperationState).To(Equal("failed")) Expect(storedDeployment.LastOperationMessage).To(Equal("provision failed: operation failed dramatically")) - Expect(fakeLogger.Logs()).To(HaveLen(1)) + Expect(fakeLogger.Logs()).To(HaveLen(2)) Expect(fakeLogger.Logs()[0].Message).To(ContainSubstring("operation-failed")) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("error", Equal("operation failed dramatically"))) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("message", Equal("provision failed: operation failed dramatically"))) Expect(fakeLogger.Logs()[0].Data).To(HaveKeyWithValue("deploymentID", Equal(existingDeployment.ID))) - + Expect(fakeLogger.Logs()[1].Message).To(Equal("broker.successfully stored state for deploymentID")) }) }) }) diff --git a/pkg/providers/tf/provider.go b/pkg/providers/tf/provider.go index 6c9828995..6e2eaeae7 100644 --- a/pkg/providers/tf/provider.go +++ b/pkg/providers/tf/provider.go @@ -96,7 +96,10 @@ func (provider *TerraformProvider) create(ctx context.Context, vars *varcontext. } else { err = provider.DefaultInvoker().Apply(ctx, newWorkspace) } - _ = provider.MarkOperationFinished(&deployment, err) + err = provider.MarkOperationFinished(&deployment, err) + if err != nil { + provider.logger.Error("MarkOperationFinished", err) + } }() return tfID, nil