Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single threaded LSP concurrency model #3341

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions private/buf/buflsp/buflsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package buflsp
import (
"context"
"fmt"
"sync/atomic"
"sync"

"github.com/bufbuild/buf/private/buf/bufctl"
"github.com/bufbuild/buf/private/bufpkg/bufcheck"
Expand Down Expand Up @@ -81,7 +81,7 @@ func Serve(
}
lsp.fileManager = newFileManager(lsp)
off := protocol.TraceOff
lsp.traceValue.Store(&off)
lsp.traceValue = &off

conn.Go(ctx, lsp.newHandler())
return conn, nil
Expand All @@ -97,6 +97,9 @@ func Serve(
type lsp struct {
conn jsonrpc2.Conn
client protocol.Client
// lock is used to ensure that requests are processed serially.
// This simplifies the implementation of the server.
lock sync.Mutex

logger *zap.Logger
tracer tracing.Tracer
Expand All @@ -105,22 +108,18 @@ type lsp struct {
rootBucket storage.ReadBucket
fileManager *fileManager

// These are atomics, because they are read often and written to
// almost never, but potentially concurrently. Having them side-by-side
// is fine; they are almost never written to so false sharing is not a
// concern.
initParams atomic.Pointer[protocol.InitializeParams]
traceValue atomic.Pointer[protocol.TraceValue]
// InitializeParams is set once, and never changed.
initParams *protocol.InitializeParams
traceValue *protocol.TraceValue
}

// init performs *actual* initialization of the server. This is called by Initialize().
//
// It may only be called once for a given server.
func (l *lsp) init(params *protocol.InitializeParams) error {
if l.initParams.Load() != nil {
return fmt.Errorf("called the %q method more than once", protocol.MethodInitialize)
if l.initParams == nil {
l.initParams = params
}
l.initParams.Store(params)

// TODO: set up logging. We need to forward everything from server.logger through to
// the client, if tracing is turned on. The right way to do this is with an extra
Expand Down Expand Up @@ -166,22 +165,23 @@ func (l *lsp) newHandler() jsonrpc2.Handler {
)
defer span.End()

replier := l.adaptReplier(reply, req)

// Process requests serially.
l.lock.Lock()
defer l.lock.Unlock()

l.logger.Debug(
"processing request",
zap.String("method", req.Method()),
zap.ByteString("params", req.Params()),
)

ctx = withRequestID(ctx)

replier := l.adaptReplier(reply, req)

// Verify that the server has been initialized if this isn't the initialization
// request.
if req.Method() != protocol.MethodInitialize && l.initParams.Load() == nil {
if req.Method() != protocol.MethodInitialize && l.initParams == nil {
return replier(ctx, nil, fmt.Errorf("the first call to the server must be the %q method", protocol.MethodInitialize))
}

return actual(ctx, replier, req)
}
}
Expand Down
77 changes: 3 additions & 74 deletions private/buf/buflsp/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,10 @@ const descriptorPath = "google/protobuf/descriptor.proto"
//
// Mutating a file is thread-safe.
type file struct {
// lsp and uri are not protected by file.lock; they are immutable after
// file creation!
// lsp and uri are immutable after file creation!
lsp *lsp
uri protocol.URI

// All variables after this lock variables are protected by file.lock.
//
// NOTE: this package must NEVER attempt to acquire a lock on a file while
// holding a lock on another file. This guarantees that any concurrent operations
// on distinct files can always make forward progress, even if the information they
// have is incomplete. This trades off up-to-date accuracy for responsiveness.
//
// For example, suppose g1 locks a.proto, and then attempts to lock b.proto
// because it followed a pointer in importMap. However, in the meantime, g2
// has acquired b.proto's lock already, and attempts to acquire a lock to a.proto,
// again because of a pointer in importMap. This will deadlock, and it will
// deadlock in such a way that will be undetectable to the Go scheduler, so the
// LSP will hang forever.
//
// This seems like a contrived scenario, but it can happen if a user creates two
// mutually-recursive Protobuf files. Although this is not permitted by Protobuf,
// the LSP must handle this invalid state gracefully.
//
// This is enforced by mutex.go.
lock mutex

text string
version int32
hasText bool // Whether this file has ever had text read into it.
Expand Down Expand Up @@ -115,24 +93,11 @@ func (f *file) Package() []string {
func (f *file) Reset(ctx context.Context) {
f.lsp.logger.Sugar().Debugf("resetting file %v", f.uri)

// Lock and unlock to acquire the import map.
// This map is never mutated after being created, so we only
// need to read the pointer.
//
// We need to lock and unlock because Close() will call Reset() on other
// files, and this will deadlock if cyclic imports exist.
f.lock.Lock(ctx)
imports := f.imports
f.lock.Unlock(ctx)

// Close all imported files while file.mu is not held.
for _, imported := range imports {
for _, imported := range f.imports {
imported.Close(ctx)
}

f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)

f.fileNode = nil
f.packageNode = nil
f.diagnostics = nil
Expand All @@ -155,8 +120,6 @@ func (f *file) Close(ctx context.Context) {
// If it has been read from disk before, or has received updates from the LSP client, this
// function returns nil.
func (f *file) ReadFromDisk(ctx context.Context) (err error) {
f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)
if f.hasText {
return nil
}
Expand All @@ -176,9 +139,6 @@ func (f *file) ReadFromDisk(ctx context.Context) (err error) {
func (f *file) Update(ctx context.Context, version int32, text string) {
f.Reset(ctx)

f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)

f.lsp.logger.Sugar().Infof("new file version: %v, %v -> %v", f.uri, f.version, version)
f.version = version
f.text = text
Expand Down Expand Up @@ -218,8 +178,6 @@ func (f *file) Refresh(ctx context.Context) {
//
// Returns whether a reparse was necessary.
func (f *file) RefreshAST(ctx context.Context) bool {
f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)
if f.fileNode != nil {
return false
}
Expand Down Expand Up @@ -260,9 +218,6 @@ func (f *file) PublishDiagnostics(ctx context.Context) {
tracing.WithAttributes(attribute.String("uri", string(f.uri))))
defer span.End()

f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)

if f.diagnostics == nil {
return
}
Expand Down Expand Up @@ -310,10 +265,8 @@ func (f *file) FindModule(ctx context.Context) {
defer file.Close()
}

f.lock.Lock(ctx)
f.workspace = workspace
f.module = module
f.lock.Unlock(ctx)
}

// IndexImports finds URIs for all of the files imported by this file.
Expand All @@ -322,9 +275,6 @@ func (f *file) IndexImports(ctx context.Context) {
tracing.WithAttributes(attribute.String("uri", string(f.uri))))
defer span.End()

unlock := f.lock.Lock(ctx)
defer unlock()

if f.fileNode == nil || f.imports != nil {
return
}
Expand Down Expand Up @@ -390,11 +340,7 @@ func (f *file) IndexImports(ctx context.Context) {

// FIXME: This algorithm is not correct: it does not account for `import public`.

// Drop the lock after copying the pointer to the imports map. This
// particular map will not be mutated further, and since we're going to grab the lock of
// other files, we need to drop the currently held lock.
fileImports := f.imports
unlock()

for _, file := range fileImports {
if err := file.ReadFromDisk(ctx); err != nil {
Expand All @@ -417,10 +363,8 @@ func (f *file) IndexImports(ctx context.Context) {
//
// This operation requires IndexImports().
func (f *file) BuildImage(ctx context.Context) {
f.lock.Lock(ctx)
importable := f.importable
fileInfo := f.fileInfo
f.lock.Unlock(ctx)

if importable == nil || fileInfo == nil {
return
Expand Down Expand Up @@ -455,9 +399,7 @@ func (f *file) BuildImage(ctx context.Context) {

compiled, err := compiler.Compile(ctx, fileInfo.Path())
if err != nil {
f.lock.Lock(ctx)
f.diagnostics = report.diagnostics
f.lock.Unlock(ctx)
}
if compiled[0] == nil {
return
Expand Down Expand Up @@ -535,9 +477,7 @@ func (f *file) BuildImage(ctx context.Context) {
return
}

f.lock.Lock(ctx)
f.image = image
f.lock.Unlock(ctx)
}

// RunLints runs linting on this file. Returns whether any lints failed.
Expand All @@ -549,11 +489,9 @@ func (f *file) RunLints(ctx context.Context) bool {
return false
}

f.lock.Lock(ctx)
workspace := f.workspace
module := f.module
image := f.image
f.lock.Unlock(ctx)

if module == nil || image == nil {
f.lsp.logger.Sugar().Warnf("could not find image for %q", f.uri)
Expand Down Expand Up @@ -584,8 +522,6 @@ func (f *file) RunLints(ctx context.Context) bool {

f.lsp.logger.Sugar().Warnf("lint generated %d error(s) for %s", len(annotations.FileAnnotations()), f.uri)

f.lock.Lock(ctx)
f.lock.Unlock(ctx)
for _, annotation := range annotations.FileAnnotations() {
f.lsp.logger.Sugar().Info(annotation.FileInfo().Path(), " ", annotation.FileInfo().ExternalPath())

Expand Down Expand Up @@ -616,9 +552,6 @@ func (f *file) IndexSymbols(ctx context.Context) {
tracing.WithAttributes(attribute.String("uri", string(f.uri))))
defer span.End()

unlock := f.lock.Lock(ctx)
defer unlock()

// Throw away all the old symbols. Unlike other indexing functions, we rebuild
// symbols unconditionally.
f.symbols = nil
Expand All @@ -635,9 +568,8 @@ func (f *file) IndexSymbols(ctx context.Context) {
return diff
})

// Now we can drop the lock and search for cross-file references.
// Now we can drop the search for cross-file references.
symbols := f.symbols
unlock()
for _, symbol := range symbols {
symbol.ResolveCrossFile(ctx)
}
Expand All @@ -649,9 +581,6 @@ func (f *file) IndexSymbols(ctx context.Context) {
//
// Returns nil if no symbol is found.
func (f *file) SymbolAt(ctx context.Context, cursor protocol.Position) *symbol {
f.lock.Lock(ctx)
defer f.lock.Unlock(ctx)

// Binary search for the symbol whose start is before or equal to cursor.
idx, found := slices.BinarySearchFunc(f.symbols, cursor, func(sym *symbol, cursor protocol.Position) int {
return comparePositions(sym.Range().Start, cursor)
Expand Down
2 changes: 0 additions & 2 deletions private/buf/buflsp/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
type fileManager struct {
lsp *lsp
table refcount.Map[protocol.URI, file]
pool mutexPool
}

// newFiles creates a new file manager.
Expand All @@ -44,7 +43,6 @@ func (fm *fileManager) Open(ctx context.Context, uri protocol.URI) *file {
if !found {
file.lsp = fm.lsp
file.uri = uri
file.lock = fm.pool.NewMutex()
}

return file
Expand Down
Loading
Loading