From 9f3c0493ddd16852e7e7fbe790b178b56e2d0cbb Mon Sep 17 00:00:00 2001 From: Edward McFarlane Date: Mon, 23 Sep 2024 17:09:38 -0400 Subject: [PATCH] Single threaded LSP model --- private/buf/buflsp/buflsp.go | 34 +++--- private/buf/buflsp/file.go | 77 +----------- private/buf/buflsp/file_manager.go | 2 - private/buf/buflsp/mutex.go | 182 ----------------------------- private/buf/buflsp/server.go | 17 +-- private/buf/buflsp/symbol.go | 21 +--- 6 files changed, 23 insertions(+), 310 deletions(-) delete mode 100644 private/buf/buflsp/mutex.go diff --git a/private/buf/buflsp/buflsp.go b/private/buf/buflsp/buflsp.go index cd46a0063a..f2827934cd 100644 --- a/private/buf/buflsp/buflsp.go +++ b/private/buf/buflsp/buflsp.go @@ -20,7 +20,7 @@ package buflsp import ( "context" "fmt" - "sync/atomic" + "sync" "github.com/bufbuild/buf/private/buf/bufctl" "github.com/bufbuild/buf/private/bufpkg/bufcheck" @@ -81,7 +81,7 @@ func Serve( } lsp.fileManager = newFileManager(lsp) off := protocol.TraceOff - lsp.traceValue.Store(&off) + lsp.traceValue = &off conn.Go(ctx, lsp.newHandler()) return conn, nil @@ -97,6 +97,9 @@ func Serve( type lsp struct { conn jsonrpc2.Conn client protocol.Client + // lock is used to ensure that requests are processed serially. + // This simplifies the implementation of the server. + lock sync.Mutex logger *zap.Logger tracer tracing.Tracer @@ -105,22 +108,18 @@ type lsp struct { rootBucket storage.ReadBucket fileManager *fileManager - // These are atomics, because they are read often and written to - // almost never, but potentially concurrently. Having them side-by-side - // is fine; they are almost never written to so false sharing is not a - // concern. - initParams atomic.Pointer[protocol.InitializeParams] - traceValue atomic.Pointer[protocol.TraceValue] + // InitializeParams is set once, and never changed. + initParams *protocol.InitializeParams + traceValue *protocol.TraceValue } // init performs *actual* initialization of the server. This is called by Initialize(). // // It may only be called once for a given server. func (l *lsp) init(params *protocol.InitializeParams) error { - if l.initParams.Load() != nil { - return fmt.Errorf("called the %q method more than once", protocol.MethodInitialize) + if l.initParams == nil { + l.initParams = params } - l.initParams.Store(params) // TODO: set up logging. We need to forward everything from server.logger through to // the client, if tracing is turned on. The right way to do this is with an extra @@ -166,22 +165,23 @@ func (l *lsp) newHandler() jsonrpc2.Handler { ) defer span.End() + replier := l.adaptReplier(reply, req) + + // Process requests serially. + l.lock.Lock() + defer l.lock.Unlock() + l.logger.Debug( "processing request", zap.String("method", req.Method()), zap.ByteString("params", req.Params()), ) - ctx = withRequestID(ctx) - - replier := l.adaptReplier(reply, req) - // Verify that the server has been initialized if this isn't the initialization // request. - if req.Method() != protocol.MethodInitialize && l.initParams.Load() == nil { + if req.Method() != protocol.MethodInitialize && l.initParams == nil { return replier(ctx, nil, fmt.Errorf("the first call to the server must be the %q method", protocol.MethodInitialize)) } - return actual(ctx, replier, req) } } diff --git a/private/buf/buflsp/file.go b/private/buf/buflsp/file.go index f50bf7f7a6..65a8ef00d7 100644 --- a/private/buf/buflsp/file.go +++ b/private/buf/buflsp/file.go @@ -51,32 +51,10 @@ const descriptorPath = "google/protobuf/descriptor.proto" // // Mutating a file is thread-safe. type file struct { - // lsp and uri are not protected by file.lock; they are immutable after - // file creation! + // lsp and uri are immutable after file creation! lsp *lsp uri protocol.URI - // All variables after this lock variables are protected by file.lock. - // - // NOTE: this package must NEVER attempt to acquire a lock on a file while - // holding a lock on another file. This guarantees that any concurrent operations - // on distinct files can always make forward progress, even if the information they - // have is incomplete. This trades off up-to-date accuracy for responsiveness. - // - // For example, suppose g1 locks a.proto, and then attempts to lock b.proto - // because it followed a pointer in importMap. However, in the meantime, g2 - // has acquired b.proto's lock already, and attempts to acquire a lock to a.proto, - // again because of a pointer in importMap. This will deadlock, and it will - // deadlock in such a way that will be undetectable to the Go scheduler, so the - // LSP will hang forever. - // - // This seems like a contrived scenario, but it can happen if a user creates two - // mutually-recursive Protobuf files. Although this is not permitted by Protobuf, - // the LSP must handle this invalid state gracefully. - // - // This is enforced by mutex.go. - lock mutex - text string version int32 hasText bool // Whether this file has ever had text read into it. @@ -115,24 +93,11 @@ func (f *file) Package() []string { func (f *file) Reset(ctx context.Context) { f.lsp.logger.Sugar().Debugf("resetting file %v", f.uri) - // Lock and unlock to acquire the import map. - // This map is never mutated after being created, so we only - // need to read the pointer. - // - // We need to lock and unlock because Close() will call Reset() on other - // files, and this will deadlock if cyclic imports exist. - f.lock.Lock(ctx) - imports := f.imports - f.lock.Unlock(ctx) - // Close all imported files while file.mu is not held. - for _, imported := range imports { + for _, imported := range f.imports { imported.Close(ctx) } - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) - f.fileNode = nil f.packageNode = nil f.diagnostics = nil @@ -155,8 +120,6 @@ func (f *file) Close(ctx context.Context) { // If it has been read from disk before, or has received updates from the LSP client, this // function returns nil. func (f *file) ReadFromDisk(ctx context.Context) (err error) { - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) if f.hasText { return nil } @@ -176,9 +139,6 @@ func (f *file) ReadFromDisk(ctx context.Context) (err error) { func (f *file) Update(ctx context.Context, version int32, text string) { f.Reset(ctx) - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) - f.lsp.logger.Sugar().Infof("new file version: %v, %v -> %v", f.uri, f.version, version) f.version = version f.text = text @@ -218,8 +178,6 @@ func (f *file) Refresh(ctx context.Context) { // // Returns whether a reparse was necessary. func (f *file) RefreshAST(ctx context.Context) bool { - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) if f.fileNode != nil { return false } @@ -260,9 +218,6 @@ func (f *file) PublishDiagnostics(ctx context.Context) { tracing.WithAttributes(attribute.String("uri", string(f.uri)))) defer span.End() - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) - if f.diagnostics == nil { return } @@ -310,10 +265,8 @@ func (f *file) FindModule(ctx context.Context) { defer file.Close() } - f.lock.Lock(ctx) f.workspace = workspace f.module = module - f.lock.Unlock(ctx) } // IndexImports finds URIs for all of the files imported by this file. @@ -322,9 +275,6 @@ func (f *file) IndexImports(ctx context.Context) { tracing.WithAttributes(attribute.String("uri", string(f.uri)))) defer span.End() - unlock := f.lock.Lock(ctx) - defer unlock() - if f.fileNode == nil || f.imports != nil { return } @@ -390,11 +340,7 @@ func (f *file) IndexImports(ctx context.Context) { // FIXME: This algorithm is not correct: it does not account for `import public`. - // Drop the lock after copying the pointer to the imports map. This - // particular map will not be mutated further, and since we're going to grab the lock of - // other files, we need to drop the currently held lock. fileImports := f.imports - unlock() for _, file := range fileImports { if err := file.ReadFromDisk(ctx); err != nil { @@ -417,10 +363,8 @@ func (f *file) IndexImports(ctx context.Context) { // // This operation requires IndexImports(). func (f *file) BuildImage(ctx context.Context) { - f.lock.Lock(ctx) importable := f.importable fileInfo := f.fileInfo - f.lock.Unlock(ctx) if importable == nil || fileInfo == nil { return @@ -455,9 +399,7 @@ func (f *file) BuildImage(ctx context.Context) { compiled, err := compiler.Compile(ctx, fileInfo.Path()) if err != nil { - f.lock.Lock(ctx) f.diagnostics = report.diagnostics - f.lock.Unlock(ctx) } if compiled[0] == nil { return @@ -535,9 +477,7 @@ func (f *file) BuildImage(ctx context.Context) { return } - f.lock.Lock(ctx) f.image = image - f.lock.Unlock(ctx) } // RunLints runs linting on this file. Returns whether any lints failed. @@ -549,11 +489,9 @@ func (f *file) RunLints(ctx context.Context) bool { return false } - f.lock.Lock(ctx) workspace := f.workspace module := f.module image := f.image - f.lock.Unlock(ctx) if module == nil || image == nil { f.lsp.logger.Sugar().Warnf("could not find image for %q", f.uri) @@ -584,8 +522,6 @@ func (f *file) RunLints(ctx context.Context) bool { f.lsp.logger.Sugar().Warnf("lint generated %d error(s) for %s", len(annotations.FileAnnotations()), f.uri) - f.lock.Lock(ctx) - f.lock.Unlock(ctx) for _, annotation := range annotations.FileAnnotations() { f.lsp.logger.Sugar().Info(annotation.FileInfo().Path(), " ", annotation.FileInfo().ExternalPath()) @@ -616,9 +552,6 @@ func (f *file) IndexSymbols(ctx context.Context) { tracing.WithAttributes(attribute.String("uri", string(f.uri)))) defer span.End() - unlock := f.lock.Lock(ctx) - defer unlock() - // Throw away all the old symbols. Unlike other indexing functions, we rebuild // symbols unconditionally. f.symbols = nil @@ -635,9 +568,8 @@ func (f *file) IndexSymbols(ctx context.Context) { return diff }) - // Now we can drop the lock and search for cross-file references. + // Now we can drop the search for cross-file references. symbols := f.symbols - unlock() for _, symbol := range symbols { symbol.ResolveCrossFile(ctx) } @@ -649,9 +581,6 @@ func (f *file) IndexSymbols(ctx context.Context) { // // Returns nil if no symbol is found. func (f *file) SymbolAt(ctx context.Context, cursor protocol.Position) *symbol { - f.lock.Lock(ctx) - defer f.lock.Unlock(ctx) - // Binary search for the symbol whose start is before or equal to cursor. idx, found := slices.BinarySearchFunc(f.symbols, cursor, func(sym *symbol, cursor protocol.Position) int { return comparePositions(sym.Range().Start, cursor) diff --git a/private/buf/buflsp/file_manager.go b/private/buf/buflsp/file_manager.go index 079ea42fbb..c8ecfa39b1 100644 --- a/private/buf/buflsp/file_manager.go +++ b/private/buf/buflsp/file_manager.go @@ -28,7 +28,6 @@ import ( type fileManager struct { lsp *lsp table refcount.Map[protocol.URI, file] - pool mutexPool } // newFiles creates a new file manager. @@ -44,7 +43,6 @@ func (fm *fileManager) Open(ctx context.Context, uri protocol.URI) *file { if !found { file.lsp = fm.lsp file.uri = uri - file.lock = fm.pool.NewMutex() } return file diff --git a/private/buf/buflsp/mutex.go b/private/buf/buflsp/mutex.go deleted file mode 100644 index 7188bf0992..0000000000 --- a/private/buf/buflsp/mutex.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2020-2024 Buf Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// This file defines various concurrency helpers. - -package buflsp - -import ( - "context" - "fmt" - "sync" - "sync/atomic" -) - -const poison = ^uint64(0) - -var nextRequestID atomic.Uint64 - -// withReentrancy assigns a unique request ID to the given context, which can be retrieved -// with with getRequestID. -func withRequestID(ctx context.Context) context.Context { - // This will always be unique. It is impossible to increment a uint64 and wrap around before - // the heat death of the universe. - id := nextRequestID.Add(1) - // We need to give the context package a unique identifier for the request; it can be - // any value. The address of the global we mint new IDs from is actually great for this, - // because users can't access it outside of this package, nor can they extract it out - // of the context itself. - return context.WithValue(ctx, &nextRequestID, id) -} - -// getRequestID returns the request ID for this context, or 0 if ctx is nil or has no -// such ID. -func getRequestID(ctx context.Context) uint64 { - if ctx == nil { - return 0 - } - id, ok := ctx.Value(&nextRequestID).(uint64) - if !ok { - return 0 - } - - // Make sure we don't return 0. This is the only place where the id is actually - // witnessed so doing +1 won't affect anything. - return id + 1 -} - -// mutexPool represents a group of reentrant muteces that cannot be acquired simultaneously. -// -// A zero mutexPool is ready to use. -type mutexPool struct { - lock sync.Mutex - held map[uint64]*mutex -} - -// NewMutex creates a new mutex in this pool. -func (mp *mutexPool) NewMutex() mutex { - return mutex{pool: mp} -} - -// check checks what id is either not holding a lock, or is holding the given -// map, depending on whether isUnlock is set. -func (mp *mutexPool) check(id uint64, mu *mutex, isUnlock bool) { - if mp == nil { - return - } - - mp.lock.Lock() - defer mp.lock.Unlock() - - if mp.held == nil { - mp.held = make(map[uint64]*mutex) - } - - if isUnlock { - if held := mp.held[id]; held != mu { - panic(fmt.Sprintf("buflsp/mutex.go: attempted to unlock incorrect non-reentrant lock: %p -> %p", held, mu)) - } - - delete(mp.held, id) - } else { - if held := mp.held[id]; held != nil { - panic(fmt.Sprintf("buflsp/mutex.go: attempted to acquire two non-reentrant locks at once: %p -> %p", mu, held)) - } - - mp.held[id] = mu - } -} - -// mutex is a sync.Mutex with some extra features. -// -// The main feature is reentrancy-checking. Within the LSP, we need to lock-protect many structures, -// and it is very easy to deadlock if the same request tries to lock something multiple times. -// To achieve this, Lock() takes a context, which must be modified by withRequestID(). -type mutex struct { - lock sync.Mutex - // This is the id of the context currently holding the lock. - who atomic.Uint64 - pool *mutexPool -} - -// Lock attempts to acquire this mutex or blocks. -// -// Unlike [sync.Mutex.Lock], this takes a Context. If that context was updated with withRequestID, -// this function will panic when attempting to lock the mutex while it is already held by a -// goroutine using this same context. -// -// NOTE: to Lock() and Unlock() with the same context DO NOT synchronize with each other. For example, -// attempting to lock this mutex from two different goroutines with the same context will -// result in undefined behavior. -// -// Also unlike [sync.Mutex.Lock], it returns an idempotent unlocker function. This can be used like -// defer mu.Lock()(). Note that only the outer function call is deferred: this is part of the -// definition of defer. See https://go.dev/play/p/RJNKRcoQRo1. This unlocker can also be used to -// defer unlocking but also unlock before the function returns. -// -// The returned unlocker is not thread-safe. -func (mu *mutex) Lock(ctx context.Context) (unlocker func()) { - var unlocked bool - unlocker = func() { - if unlocked { - return - } - mu.Unlock(ctx) - unlocked = true - } - - id := getRequestID(ctx) - - if mu.who.Load() == id && id > 0 { - // We seem to have tried to lock this lock twice. Panic, and poison the lock. - mu.who.Store(poison) - panic("buflsp/mutex.go: non-reentrant lock locked twice by the same request") - } - - mu.pool.check(id, mu, false) - - // Ok, we're definitely not holding a lock, so we can block until we acquire the lock. - mu.lock.Lock() - mu.storeWho(id) - - return unlocker -} - -// Unlock releases this mutex. -// -// Unlock must be called with the same context that locked it, otherwise this function panics. -func (mu *mutex) Unlock(ctx context.Context) { - id := getRequestID(ctx) - if mu.who.Load() != id { - panic("buflsp/mutex.go: lock was locked by one request and unlocked by another") - } - - mu.storeWho(0) - - mu.pool.check(id, mu, true) - mu.lock.Unlock() -} - -func (mu *mutex) storeWho(id uint64) { - for { - // This has to be a CAS loop to avoid races with a poisoning p. - old := mu.who.Load() - if old == poison { - panic("buflsp/mutex.go: non-reentrant lock locked twice by the same request") - } - if mu.who.CompareAndSwap(old, id) { - break - } - } -} diff --git a/private/buf/buflsp/server.go b/private/buf/buflsp/server.go index ada3cea0a7..73d5add5a6 100644 --- a/private/buf/buflsp/server.go +++ b/private/buf/buflsp/server.go @@ -22,7 +22,6 @@ import ( "fmt" "runtime/debug" "strings" - "time" "github.com/bufbuild/buf/private/buf/bufformat" "github.com/bufbuild/protocompile/ast" @@ -143,7 +142,7 @@ func (s *server) SetTrace( ctx context.Context, params *protocol.SetTraceParams, ) error { - s.lsp.traceValue.Store(¶ms.Value) + s.lsp.traceValue = ¶ms.Value return nil } @@ -208,8 +207,6 @@ func (s *server) Formatting( // Currently we have no way to honor any of the parameters. _ = params - file.lock.Lock(ctx) - defer file.lock.Unlock(ctx) if file.fileNode == nil { return nil, nil } @@ -322,17 +319,7 @@ func (s *server) SemanticTokensFull( progress.Begin(ctx, "Processing Tokens") defer progress.Done(ctx) - var symbols []*symbol - for { - file.lock.Lock(ctx) - symbols = file.symbols - file.lock.Unlock(ctx) - if symbols != nil { - break - } - time.Sleep(1 * time.Millisecond) - } - + symbols := file.symbols var ( encoded []uint32 prevLine, prevCol uint32 diff --git a/private/buf/buflsp/symbol.go b/private/buf/buflsp/symbol.go index ca2f8f6d73..63a7db4dba 100644 --- a/private/buf/buflsp/symbol.go +++ b/private/buf/buflsp/symbol.go @@ -116,8 +116,6 @@ func (s *symbol) Definition(ctx context.Context) (*symbol, ast.Node) { return nil, nil } - kind.file.lock.Lock(ctx) - defer kind.file.lock.Unlock(ctx) for _, symbol := range kind.file.symbols { def, ok := symbol.kind.(*definition) if ok && slices.Equal(kind.path, def.path) { @@ -268,22 +266,12 @@ func (s *symbol) ResolveCrossFile(ctx context.Context) { return } - // Make a copy of the import table pointer and then drop the lock, - // since searching inside of the imports will need to acquire other - // fileManager' locks. - s.file.lock.Lock(ctx) descriptorProto := s.file.imports[descriptorPath] - s.file.lock.Unlock(ctx) - if descriptorProto == nil { return } // Look for a symbol with this exact path in descriptor proto. - - descriptorProto.lock.Lock(ctx) - defer descriptorProto.lock.Unlock(ctx) - var fieldSymbol *symbol for _, symbol := range descriptorProto.symbols { if def, ok := symbol.kind.(*definition); ok && slices.Equal(def.path, fieldPath) { @@ -300,13 +288,7 @@ func (s *symbol) ResolveCrossFile(ctx context.Context) { return } - // Make a copy of the import table pointer and then drop the lock, - // since searching inside of the imports will need to acquire other - // fileManager' locks. - s.file.lock.Lock(ctx) imports := s.file.imports - s.file.lock.Unlock(ctx) - if imports == nil { // Hopeless. We'll have to try again once we have imports! return @@ -756,8 +738,7 @@ func (w *symbolWalker) newRef(name ast.IdentValueNode) *symbol { } } - // NOTE: cross-file resolution happens elsewhere, after we have walked the whole - // ast and dropped this file's lock. + // NOTE: cross-file resolution happens elsewhere. // If we couldn't resolve the symbol, symbol.definedIn will be nil. // However, for hover, it's necessary to still remember the components.