From b4651811c9ee6a0353964f5c41b1fea24f1ebb06 Mon Sep 17 00:00:00 2001 From: Kartikay Date: Sun, 16 Feb 2025 14:10:44 +0530 Subject: [PATCH] combined caveat and namespaces methods Signed-off-by: Kartikay --- internal/datastore/mysql/watch.go | 195 ++++++++++++------------------ 1 file changed, 75 insertions(+), 120 deletions(-) diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index 729ad53273..719c57bace 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -177,16 +177,9 @@ func (mds *Datastore) loadChanges( return nil, 0, err } - // Load namespace changes for the revision range. if options.Content&datastore.WatchSchema == datastore.WatchSchema { - if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { - return nil, 0, err - } - } - - // Load caveat changes for the revision range. - if options.Content&datastore.WatchSchema == datastore.WatchSchema { - if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { + // Load namespace and caveat changes for the revision range + if err := mds.loadSchemaChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil { return nil, 0, err } } @@ -287,127 +280,89 @@ func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision return } -func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) { - sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{ - sq.And{ - sq.Gt{colCreatedTxn: afterRevision}, - sq.LtOrEq{colCreatedTxn: newRevision}, - }, - sq.And{ - sq.Gt{colDeletedTxn: afterRevision}, - sq.LtOrEq{colDeletedTxn: newRevision}, - }, - }).ToSql() - if err != nil { - return fmt.Errorf("unable to prepare changes SQL: %w", err) - } - - rows, err := mds.db.QueryContext(ctx, sql, args...) - if err != nil { - if errors.Is(err, context.Canceled) { - err = datastore.NewWatchCanceledErr() +func (mds *Datastore) loadSchemaChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) error { + for _, schemaType := range []struct { + query sq.SelectBuilder + objectType string + }{ + {mds.QueryChangedNamespacesQuery, "namespace"}, + {mds.QueryChangedCaveatsQuery, "caveat"}, + } { + sql, args, err := schemaType.query.Where(sq.Or{ + sq.And{ + sq.Gt{colCreatedTxn: afterRevision}, + sq.LtOrEq{colCreatedTxn: newRevision}, + }, + sq.And{ + sq.Gt{colDeletedTxn: afterRevision}, + sq.LtOrEq{colDeletedTxn: newRevision}, + }, + }).ToSql() + if err != nil { + return fmt.Errorf("unable to prepare %s changes SQL: %w", schemaType.objectType, err) } - return - } - defer common.LogOnError(ctx, rows.Close) - for rows.Next() { - var createdTxn uint64 - var deletedTxn uint64 - var config []byte - - err = rows.Scan( - &config, - &createdTxn, - &deletedTxn, - ) + rows, err := mds.db.QueryContext(ctx, sql, args...) if err != nil { - return - } - loaded := &core.NamespaceDefinition{} - if err := loaded.UnmarshalVT(config); err != nil { - return fmt.Errorf("unable to parse changed namespace: %w", err) + if errors.Is(err, context.Canceled) { + err = datastore.NewWatchCanceledErr() + } + return err } - if createdTxn > afterRevision && createdTxn <= newRevision { - if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil { - return + for rows.Next() { + var ( + name string + config []byte + createdTxn uint64 + deletedTxn uint64 + ) + var loaded datastore.SchemaDefinition + + switch schemaType.objectType { + case "namespace": + err = rows.Scan(&config, &createdTxn, &deletedTxn) + if err != nil { + return fmt.Errorf("unable to parse changed namespace: %w", err) + } + def := &core.NamespaceDefinition{} + if err := def.UnmarshalVT(config); err != nil { + return fmt.Errorf("unable to parse changed namespace: %w", err) + } + loaded = def + case "caveat": + err = rows.Scan(&name, &config, &createdTxn, &deletedTxn) + if err != nil { + return fmt.Errorf("unable to parse changed caveat: %w", err) + } + def := &core.CaveatDefinition{} + if err := def.UnmarshalVT(config); err != nil { + return fmt.Errorf(errUnableToReadConfig, err) + } + loaded = def } - } - if deletedTxn > afterRevision && deletedTxn <= newRevision { - if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil { - return + if createdTxn > afterRevision && createdTxn <= newRevision { + if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil { + return err + } } - } - } - - if err = rows.Err(); err != nil { - return fmt.Errorf("unable to load changes: %w", err) - } - - return -} - -func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) { - sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{ - sq.And{ - sq.Gt{colCreatedTxn: afterRevision}, - sq.LtOrEq{colCreatedTxn: newRevision}, - }, - sq.And{ - sq.Gt{colDeletedTxn: afterRevision}, - sq.LtOrEq{colDeletedTxn: newRevision}, - }, - }).ToSql() - if err != nil { - return fmt.Errorf("unable to prepare changes SQL: %w", err) - } - rows, err := mds.db.QueryContext(ctx, sql, args...) - if err != nil { - if errors.Is(err, context.Canceled) { - err = datastore.NewWatchCanceledErr() - } - return - } - - defer common.LogOnError(ctx, rows.Close) - - for rows.Next() { - var createdTxn uint64 - var deletedTxn uint64 - var config []byte - var name string - - err = rows.Scan( - &name, - &config, - &createdTxn, - &deletedTxn, - ) - if err != nil { - return fmt.Errorf("unable to parse changed caveat: %w", err) - } - loaded := &core.CaveatDefinition{} - if err := loaded.UnmarshalVT(config); err != nil { - return fmt.Errorf(errUnableToReadConfig, err) - } - - if createdTxn > afterRevision && createdTxn <= newRevision { - if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil { - return - } - } - if deletedTxn > afterRevision && deletedTxn <= newRevision { - if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil { - return + if deletedTxn > afterRevision && deletedTxn <= newRevision { + if schemaType.objectType == "namespace" { + if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil { + return err + } + } else if schemaType.objectType == "caveat" { + if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil { + return err + } + } } } - } - if err = rows.Err(); err != nil { - return fmt.Errorf("unable to load changes: %w", err) + if err = rows.Err(); err != nil { + return fmt.Errorf("unable to load %s changes: %w", schemaType.objectType, err) + } } - - return + return nil }