From 2aec10cd335d403ec89bf9d55068c0cadad95c5f Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 5 Jul 2022 10:32:22 +0100 Subject: [PATCH 1/5] avoid extra function arguments --- internal/indexer/indexer.go | 26 +++++++++++++------------- internal/indexer/walker.go | 2 +- internal/indexer/watcher.go | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 76e3cca0..8f477012 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -52,13 +52,13 @@ type Collector interface { CollectJobId(jobId job.ID) } -func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, schemaReader state.SchemaReader, modPath string) job.DeferFunc { +func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job.DeferFunc { return func(ctx context.Context, opErr error) (jobIds job.IDs) { if opErr != nil { return } - moduleCalls, err := modStore.ModuleCalls(modPath) + moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) if err != nil { return } @@ -73,7 +73,7 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche if err != nil || !fi.IsDir() { continue } - modStore.Add(mc.Path) + idx.modStore.Add(mc.Path) mcHandle := document.DirHandleFromPath(mc.Path) // copy path for queued jobs below @@ -82,7 +82,7 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche id, err := jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { - return module.ParseModuleConfiguration(fs, modStore, mcPath) + return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) }, Type: op.OpTypeParseModuleConfiguration.String(), Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { @@ -90,7 +90,7 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche Dir: mcHandle, Type: op.OpTypeLoadModuleMetadata.String(), Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(modStore, mcPath) + return module.LoadModuleMetadata(idx.modStore, mcPath) }, }) if err != nil { @@ -98,7 +98,7 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche } ids = append(ids, id) - rIds := collectReferences(ctx, mcHandle, modStore, schemaReader) + rIds := idx.collectReferences(ctx, mcHandle) ids = append(ids, rIds...) return @@ -112,14 +112,14 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche id, err = jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { - return module.ParseVariables(fs, modStore, mcPath) + return module.ParseVariables(idx.fs, idx.modStore, mcPath) }, Type: op.OpTypeParseVariables.String(), Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { id, err = jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, modStore, schemaReader, mcPath) + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) }, Type: op.OpTypeDecodeVarsReferences.String(), }) @@ -140,16 +140,16 @@ func decodeInstalledModuleCalls(fs ReadOnlyFS, modStore *state.ModuleStore, sche } } -func collectReferences(ctx context.Context, dirHandle document.DirHandle, modStore *state.ModuleStore, schemaReader state.SchemaReader) (ids job.IDs) { +func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (ids job.IDs) { jobStore, err := job.JobStoreFromContext(ctx) if err != nil { return } id, err := jobStore.EnqueueJob(job.Job{ - Dir: dirHandle, + Dir: modHandle, Func: func(ctx context.Context) error { - return module.DecodeReferenceTargets(ctx, modStore, schemaReader, dirHandle.Path()) + return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) }, Type: op.OpTypeDecodeReferenceTargets.String(), }) @@ -159,9 +159,9 @@ func collectReferences(ctx context.Context, dirHandle document.DirHandle, modSto ids = append(ids, id) id, err = jobStore.EnqueueJob(job.Job{ - Dir: dirHandle, + Dir: modHandle, Func: func(ctx context.Context) error { - return module.DecodeReferenceOrigins(ctx, modStore, schemaReader, dirHandle.Path()) + return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) }, Type: op.OpTypeDecodeReferenceOrigins.String(), }) diff --git a/internal/indexer/walker.go b/internal/indexer/walker.go index b2da9947..fa06a954 100644 --- a/internal/indexer/walker.go +++ b/internal/indexer/walker.go @@ -92,7 +92,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand return module.ParseModuleManifest(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleManifest.String(), - Defer: decodeInstalledModuleCalls(idx.fs, idx.modStore, idx.schemaStore, modHandle.Path()), + Defer: idx.decodeInstalledModuleCalls(modHandle), }) if err != nil { return ids, err diff --git a/internal/indexer/watcher.go b/internal/indexer/watcher.go index 69202043..fc5f620e 100644 --- a/internal/indexer/watcher.go +++ b/internal/indexer/watcher.go @@ -19,7 +19,7 @@ func (idx *Indexer) ModuleManifestChanged(ctx context.Context, modHandle documen return module.ParseModuleManifest(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleManifest.String(), - Defer: decodeInstalledModuleCalls(idx.fs, idx.modStore, idx.schemaStore, modHandle.Path()), + Defer: idx.decodeInstalledModuleCalls(modHandle), }) if err != nil { return ids, err From 3900c028bf19bc06ae6fba5d17db27cda33558f3 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 5 Jul 2022 11:22:19 +0100 Subject: [PATCH 2/5] indexer: extract module call-related jobs --- internal/indexer/indexer.go | 126 ------------------------------- internal/indexer/module_calls.go | 122 ++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 126 deletions(-) create mode 100644 internal/indexer/module_calls.go diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 8f477012..337e197b 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -1,18 +1,13 @@ package indexer import ( - "context" "io/ioutil" "log" - "os" - "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/registry" "github.com/hashicorp/terraform-ls/internal/state" "github.com/hashicorp/terraform-ls/internal/terraform/exec" - "github.com/hashicorp/terraform-ls/internal/terraform/module" - op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" ) type Indexer struct { @@ -51,124 +46,3 @@ func (idx *Indexer) SetLogger(logger *log.Logger) { type Collector interface { CollectJobId(jobId job.ID) } - -func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job.DeferFunc { - return func(ctx context.Context, opErr error) (jobIds job.IDs) { - if opErr != nil { - return - } - - moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) - if err != nil { - return - } - - jobStore, err := job.JobStoreFromContext(ctx) - if err != nil { - return - } - - for _, mc := range moduleCalls.Installed { - fi, err := os.Stat(mc.Path) - if err != nil || !fi.IsDir() { - continue - } - idx.modStore.Add(mc.Path) - - mcHandle := document.DirHandleFromPath(mc.Path) - // copy path for queued jobs below - mcPath := mc.Path - - id, err := jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) - }, - Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { - id, err := jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Type: op.OpTypeLoadModuleMetadata.String(), - Func: func(ctx context.Context) error { - return module.LoadModuleMetadata(idx.modStore, mcPath) - }, - }) - if err != nil { - return - } - ids = append(ids, id) - - rIds := idx.collectReferences(ctx, mcHandle) - ids = append(ids, rIds...) - - return - }, - }) - if err != nil { - return - } - jobIds = append(jobIds, id) - - id, err = jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.ParseVariables(idx.fs, idx.modStore, mcPath) - }, - Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { - id, err = jobStore.EnqueueJob(job.Job{ - Dir: mcHandle, - Func: func(ctx context.Context) error { - return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) - }, - Type: op.OpTypeDecodeVarsReferences.String(), - }) - if err != nil { - return - } - ids = append(ids, id) - return - }, - }) - if err != nil { - return - } - jobIds = append(jobIds, id) - } - - return - } -} - -func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (ids job.IDs) { - jobStore, err := job.JobStoreFromContext(ctx) - if err != nil { - return - } - - id, err := jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceTargets.String(), - }) - if err != nil { - return - } - ids = append(ids, id) - - id, err = jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceOrigins.String(), - }) - if err != nil { - return - } - ids = append(ids, id) - - return -} diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go new file mode 100644 index 00000000..11100dbc --- /dev/null +++ b/internal/indexer/module_calls.go @@ -0,0 +1,122 @@ +package indexer + +import ( + "context" + "os" + + "github.com/hashicorp/terraform-ls/internal/document" + "github.com/hashicorp/terraform-ls/internal/job" + "github.com/hashicorp/terraform-ls/internal/terraform/module" + op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" +) + +func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job.DeferFunc { + return func(ctx context.Context, opErr error) (jobIds job.IDs) { + if opErr != nil { + return + } + + moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) + if err != nil { + return + } + + for _, mc := range moduleCalls.Installed { + fi, err := os.Stat(mc.Path) + if err != nil || !fi.IsDir() { + continue + } + idx.modStore.Add(mc.Path) + + mcHandle := document.DirHandleFromPath(mc.Path) + // copy path for queued jobs below + mcPath := mc.Path + + id, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) + }, + Type: op.OpTypeParseModuleConfiguration.String(), + Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + id, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Type: op.OpTypeLoadModuleMetadata.String(), + Func: func(ctx context.Context) error { + return module.LoadModuleMetadata(idx.modStore, mcPath) + }, + }) + if err != nil { + return + } + ids = append(ids, id) + + rIds := idx.collectReferences(ctx, mcHandle) + ids = append(ids, rIds...) + + return + }, + }) + if err != nil { + return + } + jobIds = append(jobIds, id) + + id, err = idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.ParseVariables(idx.fs, idx.modStore, mcPath) + }, + Type: op.OpTypeParseVariables.String(), + Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + id, err = idx.jobStore.EnqueueJob(job.Job{ + Dir: mcHandle, + Func: func(ctx context.Context) error { + return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) + }, + Type: op.OpTypeDecodeVarsReferences.String(), + }) + if err != nil { + return + } + ids = append(ids, id) + return + }, + }) + if err != nil { + return + } + jobIds = append(jobIds, id) + } + + return + } +} + +func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (ids job.IDs) { + id, err := idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeReferenceTargets.String(), + }) + if err != nil { + return + } + ids = append(ids, id) + + id, err = idx.jobStore.EnqueueJob(job.Job{ + Dir: modHandle, + Func: func(ctx context.Context) error { + return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) + }, + Type: op.OpTypeDecodeReferenceOrigins.String(), + }) + if err != nil { + return + } + ids = append(ids, id) + + return +} From a55f0347ee77e3f517fefb382312c8d0a2e7d312 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 5 Jul 2022 11:20:36 +0100 Subject: [PATCH 3/5] indexer: improve/cleanup error handling - capture and log any errors from Deferred jobs - capture and log any errors from other places which were previously skipped on error (via multierror) --- internal/indexer/document_change.go | 28 ++++++----- internal/indexer/module_calls.go | 70 +++++++++++++++++++--------- internal/indexer/walker.go | 61 +++++++++++------------- internal/job/job.go | 2 +- internal/scheduler/scheduler.go | 5 +- internal/scheduler/scheduler_test.go | 11 +++-- internal/state/jobs_test.go | 14 ++++-- 7 files changed, 108 insertions(+), 83 deletions(-) diff --git a/internal/indexer/document_change.go b/internal/indexer/document_change.go index 87bbf942..af4bd37f 100644 --- a/internal/indexer/document_change.go +++ b/internal/indexer/document_change.go @@ -18,12 +18,8 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro return module.ParseModuleConfiguration(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) job.IDs { - ids, err := idx.decodeModule(ctx, modHandle) - if err != nil { - idx.logger.Printf("error: %s", err) - } - return ids + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + return idx.decodeModule(ctx, modHandle) }, }) if err != nil { @@ -37,7 +33,8 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro return module.ParseVariables(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { @@ -46,10 +43,10 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro Type: op.OpTypeDecodeVarsReferences.String(), }) if err != nil { - return + return ids, err } ids = append(ids, id) - return + return ids, nil }, }) if err != nil { @@ -69,7 +66,8 @@ func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHand return module.LoadModuleMetadata(idx.modStore, modHandle.Path()) }, Type: op.OpTypeLoadModuleMetadata.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { @@ -78,7 +76,7 @@ func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeDecodeReferenceTargets.String(), }) if err != nil { - return + return ids, err } ids = append(ids, id) @@ -90,7 +88,7 @@ func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeDecodeReferenceOrigins.String(), }) if err != nil { - return + return ids, err } ids = append(ids, id) @@ -104,11 +102,11 @@ func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeGetModuleDataFromRegistry.String(), }) if err != nil { - return + return ids, err } - ids = append(ids, id) - return + ids = append(ids, id) + return ids, nil }, }) if err != nil { diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go index 11100dbc..31315600 100644 --- a/internal/indexer/module_calls.go +++ b/internal/indexer/module_calls.go @@ -4,6 +4,7 @@ import ( "context" "os" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/terraform/module" @@ -11,22 +12,30 @@ import ( ) func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job.DeferFunc { - return func(ctx context.Context, opErr error) (jobIds job.IDs) { + return func(ctx context.Context, opErr error) (job.IDs, error) { + jobIds := make(job.IDs, 0) if opErr != nil { - return + return jobIds, opErr } moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) if err != nil { - return + return jobIds, err } + var errs *multierror.Error + for _, mc := range moduleCalls.Installed { fi, err := os.Stat(mc.Path) if err != nil || !fi.IsDir() { + multierror.Append(errs, err) + continue + } + err = idx.modStore.Add(mc.Path) + if err != nil { + multierror.Append(errs, err) continue } - idx.modStore.Add(mc.Path) mcHandle := document.DirHandleFromPath(mc.Path) // copy path for queued jobs below @@ -38,7 +47,10 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job return module.ParseModuleConfiguration(idx.fs, idx.modStore, mcPath) }, Type: op.OpTypeParseModuleConfiguration.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) + var errs *multierror.Error + id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Type: op.OpTypeLoadModuleMetadata.String(), @@ -47,18 +59,24 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job }, }) if err != nil { - return + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) - rIds := idx.collectReferences(ctx, mcHandle) - ids = append(ids, rIds...) + rIds, err := idx.collectReferences(ctx, mcHandle) + if err != nil { + errs = multierror.Append(errs, err) + } else { + ids = append(ids, rIds...) + } - return + return ids, errs.ErrorOrNil() }, }) if err != nil { - return + multierror.Append(errs, err) + continue } jobIds = append(jobIds, id) @@ -68,7 +86,8 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job return module.ParseVariables(idx.fs, idx.modStore, mcPath) }, Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) id, err = idx.jobStore.EnqueueJob(job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { @@ -77,23 +96,28 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle) job Type: op.OpTypeDecodeVarsReferences.String(), }) if err != nil { - return + return ids, err } ids = append(ids, id) - return + return ids, err }, }) if err != nil { - return + multierror.Append(errs, err) + continue } jobIds = append(jobIds, id) } - return + return jobIds, errs.ErrorOrNil() } } -func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (ids job.IDs) { +func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { + ids := make(job.IDs, 0) + + var errs *multierror.Error + id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { @@ -102,9 +126,10 @@ func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.Di Type: op.OpTypeDecodeReferenceTargets.String(), }) if err != nil { - return + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) id, err = idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -114,9 +139,10 @@ func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.Di Type: op.OpTypeDecodeReferenceOrigins.String(), }) if err != nil { - return + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) - return + return ids, errs.ErrorOrNil() } diff --git a/internal/indexer/walker.go b/internal/indexer/walker.go index fa06a954..21617250 100644 --- a/internal/indexer/walker.go +++ b/internal/indexer/walker.go @@ -3,6 +3,7 @@ package indexer import ( "context" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/terraform/datadir" @@ -13,6 +14,7 @@ import ( func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { ids := make(job.IDs, 0) + var errs *multierror.Error id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -22,9 +24,10 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeParseModuleConfiguration.String(), }) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) id, err = idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -32,7 +35,9 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand return module.ParseVariables(idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseVariables.String(), - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) + id, err := idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { @@ -41,16 +46,17 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeDecodeVarsReferences.String(), }) if err != nil { - return + return ids, err } ids = append(ids, id) - return + return ids, err }, }) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) id, err = idx.jobStore.EnqueueJob(job.Job{ Dir: modHandle, @@ -61,9 +67,10 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeGetTerraformVersion.String(), }) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) dataDir := datadir.WalkDataDirOfModule(idx.fs, modHandle.Path()) idx.logger.Printf("parsed datadir: %#v", dataDir) @@ -78,9 +85,10 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand Type: op.OpTypeObtainSchema.String(), }) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } - ids = append(ids, id) } if dataDir.ModuleManifestPath != "" { @@ -95,7 +103,9 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand Defer: idx.decodeInstalledModuleCalls(modHandle), }) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, id) } // Here we wait for all module calls to be processed to @@ -105,29 +115,12 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand idx.jobStore.WaitForJobs(ctx, id) } - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceTargets.String(), - }) - if err != nil { - return ids, err - } - ids = append(ids, id) - - id, err = idx.jobStore.EnqueueJob(job.Job{ - Dir: modHandle, - Func: func(ctx context.Context) error { - return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) - }, - Type: op.OpTypeDecodeReferenceOrigins.String(), - }) + rIds, err := idx.collectReferences(ctx, modHandle) if err != nil { - return ids, err + errs = multierror.Append(errs, err) + } else { + ids = append(ids, rIds...) } - ids = append(ids, id) - return ids, nil + return ids, errs.ErrorOrNil() } diff --git a/internal/job/job.go b/internal/job/job.go index 487ae267..52847021 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -32,7 +32,7 @@ type Job struct { // DeferFunc represents a deferred function scheduling more jobs // based on jobErr (any error returned from the main job). // Newly queued job IDs should be returned to allow for synchronization. -type DeferFunc func(ctx context.Context, jobErr error) IDs +type DeferFunc func(ctx context.Context, jobErr error) (IDs, error) func (job Job) Copy() Job { return Job{ diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 387949cb..67ab66b9 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -70,7 +70,10 @@ func (s *Scheduler) eval(ctx context.Context) { deferredJobIds := make(job.IDs, 0) if nextJob.Defer != nil { deferCtx := job.WithJobStore(ctx, s.jobStorage) - deferredJobIds = nextJob.Defer(deferCtx, jobErr) + deferredJobIds, err = nextJob.Defer(deferCtx, jobErr) + if err != nil { + s.logger.Printf("deferred job failed: %s", err) + } } err = s.jobStorage.FinishJob(id, jobErr, deferredJobIds...) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 3986f675..32c15b20 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -260,11 +260,12 @@ func TestScheduler_defer(t *testing.T) { }, Dir: document.DirHandleFromPath(dirPath), Type: "test-type", - Defer: func(ctx context.Context, jobErr error) (ids job.IDs) { + Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) je, err := job.JobStoreFromContext(ctx) if err != nil { log.Fatal(err) - return nil + return ids, err } id1, err := je.EnqueueJob(job.Job{ @@ -277,7 +278,7 @@ func TestScheduler_defer(t *testing.T) { }) if err != nil { log.Fatal(err) - return nil + return ids, err } ids = append(ids, id1) @@ -291,11 +292,11 @@ func TestScheduler_defer(t *testing.T) { }) if err != nil { log.Fatal(err) - return nil + return ids, err } ids = append(ids, id2) - return + return ids, nil }, }) if err != nil { diff --git a/internal/state/jobs_test.go b/internal/state/jobs_test.go index 516b2c1a..a76f168f 100644 --- a/internal/state/jobs_test.go +++ b/internal/state/jobs_test.go @@ -638,10 +638,11 @@ func TestJobStore_FinishJob_defer(t *testing.T) { t.Fatal(err) } - defer1Func := func(ctx context.Context, jobErr error) (ids job.IDs) { + defer1Func := func(ctx context.Context, jobErr error) (job.IDs, error) { + ids := make(job.IDs, 0) jobStore, err := job.JobStoreFromContext(ctx) if err != nil { - return nil + return ids, err } id, err := jobStore.EnqueueJob(job.Job{ @@ -652,10 +653,10 @@ func TestJobStore_FinishJob_defer(t *testing.T) { Type: "test-type", }) if err != nil { - return nil + return ids, err } ids = append(ids, id) - return + return ids, err } id1, err := ss.JobStore.EnqueueJob(job.Job{ @@ -673,7 +674,10 @@ func TestJobStore_FinishJob_defer(t *testing.T) { ctx := context.Background() ctx = job.WithJobStore(ctx, ss.JobStore) // execute deferred func, which is what scheduler would do - deferredIds := defer1Func(ctx, nil) + deferredIds, err := defer1Func(ctx, nil) + if err != nil { + t.Fatal(err) + } err = ss.JobStore.FinishJob(id1, nil, deferredIds...) if err != nil { From b6fdc5d5d5f5ab3bb378507f5d281d5d7e74444e Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 5 Jul 2022 15:07:33 +0100 Subject: [PATCH 4/5] walker: fix error collection --- internal/walker/walker_collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/walker/walker_collector.go b/internal/walker/walker_collector.go index a1f8b15c..d998b60a 100644 --- a/internal/walker/walker_collector.go +++ b/internal/walker/walker_collector.go @@ -26,7 +26,7 @@ func NewWalkerCollector() *WalkerCollector { func (wc *WalkerCollector) CollectError(err error) { wc.errorsMu.Lock() defer wc.errorsMu.Unlock() - multierror.Append(wc.errors, err) + wc.errors = multierror.Append(wc.errors, err) } func (wc *WalkerCollector) ErrorOrNil() error { From 59eea8a16bd1045499e37cdf68ce4e17854a13f4 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Tue, 5 Jul 2022 15:17:38 +0100 Subject: [PATCH 5/5] scheduler/state: avoid passing JobStore via ctx This (hack) is no longer necessary since all indexing logic is now self-contained within one package. --- internal/job/job_store.go | 15 --------------- internal/scheduler/scheduler.go | 3 +-- internal/scheduler/scheduler_test.go | 6 +----- internal/state/jobs_test.go | 6 +----- 4 files changed, 3 insertions(+), 27 deletions(-) diff --git a/internal/job/job_store.go b/internal/job/job_store.go index faee5a0b..8073d0e2 100644 --- a/internal/job/job_store.go +++ b/internal/job/job_store.go @@ -2,24 +2,9 @@ package job import ( "context" - "fmt" ) type JobStore interface { EnqueueJob(newJob Job) (ID, error) WaitForJobs(ctx context.Context, ids ...ID) error } - -type jobStoreCtxKey struct{} - -func WithJobStore(ctx context.Context, js JobStore) context.Context { - return context.WithValue(ctx, jobStoreCtxKey{}, js) -} - -func JobStoreFromContext(ctx context.Context) (JobStore, error) { - js, ok := ctx.Value(jobStoreCtxKey{}).(JobStore) - if !ok { - return nil, fmt.Errorf("not found JobStore in context") - } - return js, nil -} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 67ab66b9..902f6b4f 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -69,8 +69,7 @@ func (s *Scheduler) eval(ctx context.Context) { deferredJobIds := make(job.IDs, 0) if nextJob.Defer != nil { - deferCtx := job.WithJobStore(ctx, s.jobStorage) - deferredJobIds, err = nextJob.Defer(deferCtx, jobErr) + deferredJobIds, err = nextJob.Defer(ctx, jobErr) if err != nil { s.logger.Printf("deferred job failed: %s", err) } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 32c15b20..5de053b6 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -262,11 +262,7 @@ func TestScheduler_defer(t *testing.T) { Type: "test-type", Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { ids := make(job.IDs, 0) - je, err := job.JobStoreFromContext(ctx) - if err != nil { - log.Fatal(err) - return ids, err - } + je := ss.JobStore id1, err := je.EnqueueJob(job.Job{ Dir: document.DirHandleFromPath(dirPath), diff --git a/internal/state/jobs_test.go b/internal/state/jobs_test.go index a76f168f..42468688 100644 --- a/internal/state/jobs_test.go +++ b/internal/state/jobs_test.go @@ -640,10 +640,7 @@ func TestJobStore_FinishJob_defer(t *testing.T) { defer1Func := func(ctx context.Context, jobErr error) (job.IDs, error) { ids := make(job.IDs, 0) - jobStore, err := job.JobStoreFromContext(ctx) - if err != nil { - return ids, err - } + jobStore := ss.JobStore id, err := jobStore.EnqueueJob(job.Job{ Func: func(ctx context.Context) error { @@ -672,7 +669,6 @@ func TestJobStore_FinishJob_defer(t *testing.T) { } ctx := context.Background() - ctx = job.WithJobStore(ctx, ss.JobStore) // execute deferred func, which is what scheduler would do deferredIds, err := defer1Func(ctx, nil) if err != nil {