From 4603edca80894b4270cf5fea2d61cf1533025e03 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Fri, 13 Mar 2020 19:01:43 +0000 Subject: [PATCH] terraform/schema: Implement watcher for invalidating cached schema Closes #16 --- go.mod | 2 + go.sum | 6 + .../terraform/lang/provider_block_test.go | 7 +- internal/terraform/schema/schema_storage.go | 138 ++++++++++++++--- internal/terraform/schema/storage_mock.go | 41 +++++ internal/terraform/schema/watcher.go | 141 ++++++++++++++++++ langserver/handlers/handlers.go | 23 ++- langserver/handlers/initialize.go | 12 +- langserver/langserver.go | 4 +- langserver/srvctl/server_controller.go | 3 +- 10 files changed, 341 insertions(+), 36 deletions(-) create mode 100644 internal/terraform/schema/storage_mock.go create mode 100644 internal/terraform/schema/watcher.go diff --git a/go.mod b/go.mod index 2983cd225..159f57783 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( github.com/apparentlymart/go-textseg v1.0.0 github.com/creachadair/jrpc2 v0.6.1 + github.com/fsnotify/fsnotify v1.4.9 github.com/google/go-cmp v0.4.0 github.com/hashicorp/go-version v1.2.0 github.com/hashicorp/hcl/v2 v2.3.0 @@ -12,6 +13,7 @@ require ( github.com/mitchellh/cli v1.0.0 github.com/sourcegraph/go-lsp v0.0.0-20200117082640-b19bb38222e2 github.com/zclconf/go-cty v1.2.1 + golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect golang.org/x/text v0.3.2 ) diff --git a/go.sum b/go.sum index 462449d94..9e4647e29 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -74,6 +76,10 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82 h1:vsphBvatvfbhlb4PO1BYSr9dzugGxJ/SQHoNufZJq1w= golang.org/x/sys v0.0.0-20190502175342-a43fa875dd82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 h1:uYVVQ9WP/Ds2ROhcaGPeIdVq0RIXVLwsHlnvJ+cT1So= +golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= diff --git a/internal/terraform/lang/provider_block_test.go b/internal/terraform/lang/provider_block_test.go index f97ac5ac9..e3122c05a 100644 --- a/internal/terraform/lang/provider_block_test.go +++ b/internal/terraform/lang/provider_block_test.go @@ -288,12 +288,7 @@ func TestProviderBlock_CompletionItemsAtPos(t *testing.T) { pf.InitializeCapabilities(*caps) } - var sr schema.Reader - if tc.ps != nil { - sr = schema.MockStorage(tc.ps) - } else { - sr = schema.MockStorage(&tfjson.ProviderSchemas{}) - } + sr := schema.MockStorage(tc.ps) pf.schemaReader = sr p, err := pf.New(block) diff --git a/internal/terraform/schema/schema_storage.go b/internal/terraform/schema/schema_storage.go index 55ad9c692..0ed9c9541 100644 --- a/internal/terraform/schema/schema_storage.go +++ b/internal/terraform/schema/schema_storage.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "log" + "sync" "time" "github.com/hashicorp/go-version" @@ -16,33 +17,63 @@ type Reader interface { } type Writer interface { - ObtainSchemasForDir(*exec.Executor, string) error - ProviderConfigSchema(name string) (*tfjson.Schema, error) + ObtainSchemasForWorkspace(*exec.Executor, string) error + AddWorkspaceForWatching(string) error + StartWatching(*exec.Executor) error } -type storage struct { - ps *tfjson.ProviderSchemas +type Storage struct { + ps *tfjson.ProviderSchemas + w watcher + watching bool logger *log.Logger + + // mu ensures atomic reading and obtaining of schemas + // as the process of obtaining it may not be thread-safe + mu sync.RWMutex + + // sync makes operations synchronous which makes testing easier + sync bool } -func NewStorage() *storage { - return &storage{ - logger: log.New(ioutil.Discard, "", 0), +var defaultLogger = log.New(ioutil.Discard, "", 0) + +func NewStorage() *Storage { + return &Storage{ + logger: defaultLogger, } } -func (s *storage) SetLogger(logger *log.Logger) { +func (s *Storage) SetLogger(logger *log.Logger) { s.logger = logger } -func MockStorage(ps *tfjson.ProviderSchemas) *storage { - s := NewStorage() - s.ps = ps - return s +// ObtainSchemasForWorkspace will (by default) asynchronously obtain schema via tf +// and store it for later consumption via Reader methods +func (s *Storage) ObtainSchemasForWorkspace(tf *exec.Executor, dir string) error { + if s.sync { + return s.obtainSchemasForWorkspace(tf, dir) + } + + // This routine is not cancellable in itself + // but the time-consuming part is done by exec.Executor + // which is cancellable via its own context + go func() { + err := s.obtainSchemasForWorkspace(tf, dir) + if err != nil { + s.logger.Println("error obtaining schemas:", err) + } + }() + + return nil } -func (c *storage) ObtainSchemasForDir(tf *exec.Executor, dir string) error { +func (s *Storage) obtainSchemasForWorkspace(tf *exec.Executor, dir string) error { + s.logger.Printf("Obtaining lock before retrieving schema for %q ...", dir) + s.mu.Lock() + defer s.mu.Unlock() + // Checking the version here may be excessive // TODO: Find a way to centralize this tfVersions, err := version.NewConstraint(">= 0.12.0") @@ -56,20 +87,24 @@ func (c *storage) ObtainSchemasForDir(tf *exec.Executor, dir string) error { tf.SetWorkdir(dir) - c.logger.Printf("Obtaining schemas for %q ...", dir) + s.logger.Printf("Retrieving schemas for %q ...", dir) start := time.Now() ps, err := tf.ProviderSchemas() if err != nil { - return fmt.Errorf("unable to get schemas: %s", err) + return fmt.Errorf("Unable to retrieve schemas: %s", err) } - c.ps = ps - c.logger.Printf("Schemas retrieved in %s", time.Since(start)) - + s.ps = ps + s.logger.Printf("Schemas retrieved in %s", time.Since(start)) return nil } -func (c *storage) ProviderConfigSchema(name string) (*tfjson.Schema, error) { - schema, ok := c.ps.Schemas[name] +func (s *Storage) ProviderConfigSchema(name string) (*tfjson.Schema, error) { + s.logger.Printf("Obtaining lock before reading %q provider schema", name) + s.mu.RLock() + defer s.mu.RUnlock() + + s.logger.Printf("Reading %q provider schema", name) + schema, ok := s.ps.Schemas[name] if !ok { return nil, &SchemaUnavailableErr{"provider", name} } @@ -80,3 +115,66 @@ func (c *storage) ProviderConfigSchema(name string) (*tfjson.Schema, error) { return schema.ConfigSchema, nil } + +// watcher creates a new Watcher instance +// if one doesn't exist yet or returns an existing one +func (s *Storage) watcher() (watcher, error) { + if s.w != nil { + return s.w, nil + } + + w, err := NewWatcher() + if err != nil { + return nil, err + } + w.SetLogger(s.logger) + + s.w = w + return s.w, nil +} + +// StartWatching starts to watch for plugin changes in dirs that were added +// via AddWorkspaceForWatching until StopWatching() is called +func (s *Storage) StartWatching(tf *exec.Executor) error { + if s.watching { + return fmt.Errorf("watching already in progress") + } + w, err := s.watcher() + if err != nil { + return err + } + + go w.OnPluginChange(func(ww *watchedWorkspace) error { + s.obtainSchemasForWorkspace(tf, ww.dir) + return nil + }) + s.watching = true + + s.logger.Printf("Watching for plugin changes ...") + + return nil +} + +func (s *Storage) StopWatching() error { + if s.w == nil { + return nil + } + s.logger.Println("Stopping watcher ...") + err := s.w.Close() + if err == nil { + s.watching = false + } + + return err +} + +func (s *Storage) AddWorkspaceForWatching(dir string) error { + w, err := s.watcher() + if err != nil { + return err + } + + s.logger.Printf("Adding workspace for watching: %q", dir) + + return w.AddWorkspace(dir) +} diff --git a/internal/terraform/schema/storage_mock.go b/internal/terraform/schema/storage_mock.go new file mode 100644 index 000000000..15e60e2ee --- /dev/null +++ b/internal/terraform/schema/storage_mock.go @@ -0,0 +1,41 @@ +package schema + +import ( + "log" + + "github.com/fsnotify/fsnotify" + tfjson "github.com/hashicorp/terraform-json" +) + +func MockStorage(ps *tfjson.ProviderSchemas) *Storage { + s := NewStorage() + if ps == nil { + ps = &tfjson.ProviderSchemas{} + } + s.ps = ps + s.sync = true + s.w = &MockWatcher{} + return s +} + +type MockWatcher struct{} + +func (w *MockWatcher) AddWorkspace(string) error { + return nil +} + +func (w *MockWatcher) Close() error { + return nil +} + +func (w *MockWatcher) Events() chan fsnotify.Event { + return nil +} + +func (w *MockWatcher) Errors() chan error { + return nil +} + +func (w *MockWatcher) OnPluginChange(func(*watchedWorkspace) error) {} + +func (w *MockWatcher) SetLogger(*log.Logger) {} diff --git a/internal/terraform/schema/watcher.go b/internal/terraform/schema/watcher.go new file mode 100644 index 000000000..e7d38ec09 --- /dev/null +++ b/internal/terraform/schema/watcher.go @@ -0,0 +1,141 @@ +package schema + +import ( + "crypto/sha256" + "fmt" + "io" + "log" + "os" + "path/filepath" + "runtime" + + "github.com/fsnotify/fsnotify" +) + +type watcher interface { + AddWorkspace(string) error + Close() error + Events() chan fsnotify.Event + Errors() chan error + OnPluginChange(func(*watchedWorkspace) error) + SetLogger(*log.Logger) +} + +// Watcher is just a thin wrapper around native fsnotify.Watcher +// to make it swappable for MockWatcher via interface +// +// This is necessary until https://github.com/fsnotify/fsnotify/issues/104 +// is addressed (or until there are *methods* for getting error and Event channels) +type Watcher struct { + w *fsnotify.Watcher + files map[string]*watchedWorkspace + logger *log.Logger +} + +type watchedWorkspace struct { + pluginsLockFileHash string + dir string +} + +func NewWatcher() (*Watcher, error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &Watcher{ + w: w, + files: make(map[string]*watchedWorkspace, 0), + logger: defaultLogger, + }, nil +} + +func (w *Watcher) SetLogger(logger *log.Logger) { + w.logger = logger +} + +func (w *Watcher) AddWorkspace(dir string) error { + lockPath := lockFilePath(dir) + w.logger.Printf("Adding %q for watching...", lockPath) + + hash, err := fileHashSum(lockPath) + if err != nil { + return fmt.Errorf("unable to calculate hash: %w", err) + } + + w.files[lockPath] = &watchedWorkspace{ + pluginsLockFileHash: string(hash), + dir: dir, + } + + return w.w.Add(lockPath) +} + +func lockFilePath(dir string) string { + return filepath.Join(dir, + ".terraform", + "plugins", + runtime.GOOS+"_"+runtime.GOARCH, + "lock.json") +} + +func fileHashSum(path string) ([]byte, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + h := sha256.New() + _, err = io.Copy(h, f) + if err != nil { + return nil, err + } + + return h.Sum(nil), nil +} + +func (w *Watcher) Close() error { + return w.w.Close() +} + +func (w *Watcher) Events() chan fsnotify.Event { + return w.w.Events +} + +func (w *Watcher) Errors() chan error { + return w.w.Errors +} + +func (w *Watcher) OnPluginChange(f func(*watchedWorkspace) error) { + for { + select { + case event, ok := <-w.Events(): + if !ok { + return + } + + if event.Op&fsnotify.Write == fsnotify.Write { + hash, err := fileHashSum(event.Name) + if err != nil { + w.logger.Println("unable to calculate hash:", err) + } + newHash := string(hash) + existingHash := w.files[event.Name].pluginsLockFileHash + + if newHash != existingHash { + w.files[event.Name].pluginsLockFileHash = newHash + + err = f(w.files[event.Name]) + if err != nil { + w.logger.Println("error when executing on change:", err) + } + } + } + case err, ok := <-w.Errors(): + if !ok { + return + } + w.logger.Println("watch error:", err) + } + } +} diff --git a/langserver/handlers/handlers.go b/langserver/handlers/handlers.go index bc9b795d7..1f6d659ca 100644 --- a/langserver/handlers/handlers.go +++ b/langserver/handlers/handlers.go @@ -23,6 +23,7 @@ type handlerProvider struct { logger *log.Logger srvCtl srvctl.ServerController + ss *schema.Storage executorFunc func(ctx context.Context, execPath string) *exec.Executor } @@ -32,6 +33,7 @@ func New() *handlerProvider { return &handlerProvider{ logger: defaultLogger, executorFunc: exec.NewExecutor, + ss: schema.NewStorage(), } } @@ -41,6 +43,7 @@ func NewMock(mid exec.MockItemDispenser) *handlerProvider { executorFunc: func(ctx context.Context, execPath string) *exec.Executor { return exec.MockExecutor(mid) }, + ss: schema.MockStorage(nil), } } @@ -50,15 +53,14 @@ func (hp *handlerProvider) SetLogger(logger *log.Logger) { // Handlers builds out the jrpc2.Map according to the LSP protocol // and passes related dependencies to handlers via context -func (hp *handlerProvider) Handlers(ctl srvctl.ServerController) jrpc2.Assigner { +func (hp *handlerProvider) Handlers(srvCtx context.Context, ctl srvctl.ServerController) jrpc2.Assigner { hp.srvCtl = ctl fs := filesystem.NewFilesystem() fs.SetLogger(hp.logger) lh := LogHandler(hp.logger) cc := &lsp.ClientCapabilities{} tfVersion := "0.0.0" - ss := schema.NewStorage() - ss.SetLogger(hp.logger) + hp.ss.SetLogger(hp.logger) m := map[string]rpch.Func{ "initialize": func(ctx context.Context, req *jrpc2.Request) (interface{}, error) { @@ -73,12 +75,16 @@ func (hp *handlerProvider) Handlers(ctl srvctl.ServerController) jrpc2.Assigner if err != nil { return nil, err } - tf := hp.executorFunc(ctx, tfPath) + + // We intentionally pass server context here to make executor cancellable + // on server shutdown, rather than response delivery or request cancellation + // as some operations may run in isolated goroutines + tf := hp.executorFunc(srvCtx, tfPath) tf.SetLogger(hp.logger) ctx = lsctx.WithTerraformExecutor(tf, ctx) ctx = lsctx.WithTerraformVersionSetter(&tfVersion, ctx) - ctx = lsctx.WithTerraformSchemaWriter(ss, ctx) + ctx = lsctx.WithTerraformSchemaWriter(hp.ss, ctx) return handle(ctx, req, Initialize) }, @@ -124,7 +130,7 @@ func (hp *handlerProvider) Handlers(ctl srvctl.ServerController) jrpc2.Assigner ctx = lsctx.WithFilesystem(fs, ctx) // TODO: Read-only FS ctx = lsctx.WithClientCapabilities(cc, ctx) ctx = lsctx.WithTerraformVersion(tfVersion, ctx) - ctx = lsctx.WithTerraformSchemaReader(ss, ctx) + ctx = lsctx.WithTerraformSchemaReader(hp.ss, ctx) return handle(ctx, req, lh.TextDocumentComplete) }, @@ -144,6 +150,11 @@ func (hp *handlerProvider) Handlers(ctl srvctl.ServerController) jrpc2.Assigner return nil, err } + err = hp.ss.StopWatching() + if err != nil { + return nil, err + } + return handle(ctx, req, Shutdown) }, "$/cancelRequest": func(ctx context.Context, req *jrpc2.Request) (interface{}, error) { diff --git a/langserver/handlers/initialize.go b/langserver/handlers/initialize.go index 6dba38073..1a8e71d6d 100644 --- a/langserver/handlers/initialize.go +++ b/langserver/handlers/initialize.go @@ -63,7 +63,17 @@ func Initialize(ctx context.Context, params lsp.InitializeParams) (lsp.Initializ return serverCaps, err } - err = ss.ObtainSchemasForDir(tf, rootURI) + err = ss.ObtainSchemasForWorkspace(tf, rootURI) + if err != nil { + return serverCaps, err + } + + err = ss.AddWorkspaceForWatching(rootURI) + if err != nil { + return serverCaps, err + } + + err = ss.StartWatching(tf) if err != nil { return serverCaps, err } diff --git a/langserver/langserver.go b/langserver/langserver.go index c99be696b..8de98afa2 100644 --- a/langserver/langserver.go +++ b/langserver/langserver.go @@ -58,7 +58,7 @@ func (ls *langServer) start(reader io.Reader, writer io.WriteCloser) *jrpc2.Serv ch := channel.LSP(reader, writer) - return jrpc2.NewServer(ls.hp.Handlers(ls.srvCtl), ls.srvOptions).Start(ch) + return jrpc2.NewServer(ls.hp.Handlers(ls.ctx, ls.srvCtl), ls.srvOptions).Start(ch) } func (ls *langServer) StartAndWait(reader io.Reader, writer io.WriteCloser) { @@ -96,7 +96,7 @@ func (ls *langServer) StartTCP(address string) error { go func() { ls.logger.Println("Starting loop server ...") - err = rpcServer.Loop(lst, ls.hp.Handlers(ls.srvCtl), &rpcServer.LoopOptions{ + err = rpcServer.Loop(lst, ls.hp.Handlers(ls.ctx, ls.srvCtl), &rpcServer.LoopOptions{ Framing: channel.LSP, ServerOptions: ls.srvOptions, }) diff --git a/langserver/srvctl/server_controller.go b/langserver/srvctl/server_controller.go index 42e488a95..e7d4430bf 100644 --- a/langserver/srvctl/server_controller.go +++ b/langserver/srvctl/server_controller.go @@ -2,6 +2,7 @@ package srvctl import ( "log" + "context" "github.com/creachadair/jrpc2" ) @@ -16,6 +17,6 @@ type ServerController interface { } type HandlerProvider interface { - Handlers(ServerController) jrpc2.Assigner + Handlers(context.Context, ServerController) jrpc2.Assigner SetLogger(*log.Logger) }