Skip to content

Commit

Permalink
Set error messages on Function CRDs
Browse files Browse the repository at this point in the history
Unify CRD function tasks and views status update
handler to also include an optional error message
to make it easier to diagnose operational failures.
  • Loading branch information
jessejlt committed Oct 30, 2024
1 parent 27b3d77 commit 4e518cd
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 39 deletions.
2 changes: 1 addition & 1 deletion cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func newUploaders(endpoints []string, storageDir string, concurrentUploads int,
ConcurrentUploads: concurrentUploads,
DefaultMapping: defaultMapping,
SampleType: sampleType,
ViewStore: fnStore,
FnStore: fnStore,
}))
}

Expand Down
32 changes: 9 additions & 23 deletions ingestor/adx/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/cespare/xxhash"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type SampleType int
Expand All @@ -36,14 +35,9 @@ type mgmt interface {
Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
}

type ViewStore interface {
View(database, table string) (*v1.Function, bool)
UpdateStatus(ctx context.Context, fn *v1.Function) error
}

type Syncer struct {
KustoCli mgmt
ViewStore ViewStore
KustoCli mgmt
fnStore FunctionStore

database string

Expand Down Expand Up @@ -71,10 +65,10 @@ type Table struct {
TableName string `kusto:"TableName"`
}

func NewSyncer(kustoCli mgmt, database string, defaultMapping schema.SchemaMapping, st SampleType, vs ViewStore) *Syncer {
func NewSyncer(kustoCli mgmt, database string, defaultMapping schema.SchemaMapping, st SampleType, fns FunctionStore) *Syncer {
return &Syncer{
KustoCli: kustoCli,
ViewStore: vs,
KustoCli: kustoCli,
fnStore: fns,

database: database,
defaultMapping: defaultMapping,
Expand Down Expand Up @@ -280,22 +274,14 @@ func (s *Syncer) EnsureMapping(table string, mapping schema.SchemaMapping) (stri

// EnsureView will create or update a KQL View for the specified Table if one exists.
func (s *Syncer) EnsureView(ctx context.Context, table string) error {
view, ok := s.ViewStore.View(s.database, table)
view, ok := s.fnStore.View(s.database, table)
if !ok {
if logger.IsDebug() {
logger.Debugf("No view found for %s.%s", s.database, table)
}
return nil
}

updateStatusFn := func(status v1.FunctionStatusEnum) {
view.Status.LastTimeReconciled = metav1.Time{Time: time.Now()}
view.Status.Status = status
if err := s.ViewStore.UpdateStatus(ctx, view); err != nil {
logger.Errorf("Failed to update status for view %s.%s v%s: %v", s.database, view.Name, view.ResourceVersion, err)
}
}

s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -317,19 +303,19 @@ func (s *Syncer) EnsureView(ctx context.Context, table string) error {
stmt := kql.New("").AddUnsafe(view.Spec.Body)
if _, err := s.KustoCli.Mgmt(ctx, s.database, stmt); err != nil {
if !errors.Retry(err) {
updateStatusFn(v1.PermanentFailure)
updateKQLFunctionStatus(ctx, s.fnStore, view, v1.PermanentFailure, err)
logger.Errorf("Permanent failure to create view %s.%s: %v", s.database, table, err)
// We want to fall through here so that we can cache this object, there's no need
// to retry creating it. If it's updated, we'll detect the change in the cached
// object and try again after invalidating the cache.
} else {
updateStatusFn(v1.Failed)
updateKQLFunctionStatus(ctx, s.fnStore, view, v1.Failed, err)
logger.Warnf("Transient failure to create view %s.%s: %v", s.database, table, err)
return nil
}
}

updateStatusFn(v1.Success)
updateKQLFunctionStatus(ctx, s.fnStore, view, v1.Success, nil)
s.views[s.database+table] = view

return nil
Expand Down
31 changes: 20 additions & 11 deletions ingestor/adx/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (t *DropUnusedTablesTask) loadTableDetails(ctx context.Context) ([]TableDet

type FunctionStore interface {
Functions() []*v1.Function
View(database, table string) (*v1.Function, bool)
UpdateStatus(ctx context.Context, fn *v1.Function) error
}

Expand All @@ -123,14 +124,6 @@ func NewSyncFunctionsTask(store FunctionStore, kustoCli StatementExecutor) *Sync
}
}

func (t *SyncFunctionsTask) updateStatus(fn *v1.Function, status v1.FunctionStatusEnum) {
fn.Status.LastTimeReconciled = metav1.Time{Time: time.Now()}
fn.Status.Status = status
if err := t.store.UpdateStatus(context.Background(), fn); err != nil {
logger.Errorf("Failed to update status for function %s.%s: %v", fn.Spec.Database, fn.Name, err)
}
}

func (t *SyncFunctionsTask) Run(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -156,23 +149,39 @@ func (t *SyncFunctionsTask) Run(ctx context.Context) error {
stmt := kql.New("").AddUnsafe(function.Spec.Body)
if _, err := t.kustoCli.Mgmt(ctx, stmt); err != nil {
if !errors.Retry(err) {
t.updateStatus(function, v1.PermanentFailure)
updateKQLFunctionStatus(ctx, t.store, function, v1.PermanentFailure, err)
logger.Errorf("Permanent failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
// We want to fall through here so that we can cache this object, there's no need
// to retry creating it. If it's updated, we'll detect the change in the cached
// object and try again after invalidating the cache.
t.cache[cacheKey] = function
continue
} else {
t.updateStatus(function, v1.Failed)
updateKQLFunctionStatus(ctx, t.store, function, v1.Failed, err)
logger.Warnf("Transient failure to create function %s.%s: %v", function.Spec.Database, function.Name, err)
continue
}
}

logger.Infof("Successfully created function %s.%s", function.Spec.Database, function.Name)
t.updateStatus(function, v1.Success)
updateKQLFunctionStatus(ctx, t.store, function, v1.Success, nil)
}

return nil
}

func updateKQLFunctionStatus(ctx context.Context, store FunctionStore, fn *v1.Function, status v1.FunctionStatusEnum, err error) error {
fn.Status.LastTimeReconciled = metav1.Time{Time: time.Now()}
fn.Status.Status = status
if err != nil {
errMsg := err.Error()
if len(errMsg) > 256 {
errMsg = errMsg[:256]
}
fn.Status.Error = errMsg
}
if err := store.UpdateStatus(ctx, fn); err != nil {
logger.Errorf("Failed to update status for function %s.%s: %v", fn.Spec.Database, fn.Name, err)
}
return nil
}
4 changes: 2 additions & 2 deletions ingestor/adx/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ type UploaderOpts struct {
Dimensions []string
DefaultMapping adxschema.SchemaMapping
SampleType SampleType
ViewStore ViewStore
FnStore FunctionStore
}

func NewUploader(kustoCli ingest.QueryClient, opts UploaderOpts) *uploader {
syncer := NewSyncer(kustoCli, opts.Database, opts.DefaultMapping, opts.SampleType, opts.ViewStore)
syncer := NewSyncer(kustoCli, opts.Database, opts.DefaultMapping, opts.SampleType, opts.FnStore)

return &uploader{
KustoCli: kustoCli,
Expand Down
10 changes: 8 additions & 2 deletions ingestor/storage/kql_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func (f *Functions) UpdateStatus(ctx context.Context, fn *v1.Function) error {
return f.client.Status().Update(ctx, fn)
}

func (f *Functions) Delete(ctx context.Context, fn *v1.Function) error {
if f.client == nil {
return fmt.Errorf("no client provided")
}

return nil
}

func (f *Functions) View(database, table string) (*v1.Function, bool) {
f.mu.RLock()
defer f.mu.RUnlock()
Expand Down Expand Up @@ -109,8 +117,6 @@ func (f *Functions) Receive(ctx context.Context, list client.ObjectList) error {
// TODO (jesthom): Once the parser is in place separate out the Views from the Functions.
functions.Items = append(functions.Items, *function.DeepCopy())
}
// TODO (jesthom): Do we want to identify those Views that we know about but
// are no longer in the system and should therefore be deleted?

f.mu.Lock()
f.views = views
Expand Down

0 comments on commit 4e518cd

Please sign in to comment.