Skip to content

Commit

Permalink
combined caveat and namespaces methods
Browse files Browse the repository at this point in the history
Signed-off-by: Kartikay <kartikay_2101ce32@iitp.ac.in>
  • Loading branch information
kartikaysaxena committed Feb 22, 2025
1 parent bdc6eeb commit b465181
Showing 1 changed file with 75 additions and 120 deletions.
195 changes: 75 additions & 120 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,9 @@ func (mds *Datastore) loadChanges(
return nil, 0, err
}

// Load namespace changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadNamespaceChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}

// Load caveat changes for the revision range.
if options.Content&datastore.WatchSchema == datastore.WatchSchema {
if err := mds.loadCaveatChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
// Load namespace and caveat changes for the revision range
if err := mds.loadSchemaChanges(ctx, afterRevision, newRevision, stagedChanges); err != nil {
return nil, 0, err
}
}
Expand Down Expand Up @@ -287,127 +280,89 @@ func (mds *Datastore) loadRelationshipChanges(ctx context.Context, afterRevision
return
}

func (mds *Datastore) loadNamespaceChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedNamespacesQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
func (mds *Datastore) loadSchemaChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) error {
for _, schemaType := range []struct {
query sq.SelectBuilder
objectType string
}{
{mds.QueryChangedNamespacesQuery, "namespace"},
{mds.QueryChangedCaveatsQuery, "caveat"},
} {
sql, args, err := schemaType.query.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare %s changes SQL: %w", schemaType.objectType, err)
}
return
}
defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte

err = rows.Scan(
&config,
&createdTxn,
&deletedTxn,
)
rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
return
}
loaded := &core.NamespaceDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return err
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
for rows.Next() {
var (
name string
config []byte
createdTxn uint64
deletedTxn uint64
)
var loaded datastore.SchemaDefinition

switch schemaType.objectType {
case "namespace":
err = rows.Scan(&config, &createdTxn, &deletedTxn)
if err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
}
def := &core.NamespaceDefinition{}
if err := def.UnmarshalVT(config); err != nil {
return fmt.Errorf("unable to parse changed namespace: %w", err)
}
loaded = def
case "caveat":
err = rows.Scan(&name, &config, &createdTxn, &deletedTxn)
if err != nil {
return fmt.Errorf("unable to parse changed caveat: %w", err)
}
def := &core.CaveatDefinition{}
if err := def.UnmarshalVT(config); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}
loaded = def
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return err
}
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
}

return
}

func (mds *Datastore) loadCaveatChanges(ctx context.Context, afterRevision uint64, newRevision uint64, stagedChanges *common.Changes[revisions.TransactionIDRevision, uint64]) (err error) {
sql, args, err := mds.QueryChangedCaveatsQuery.Where(sq.Or{
sq.And{
sq.Gt{colCreatedTxn: afterRevision},
sq.LtOrEq{colCreatedTxn: newRevision},
},
sq.And{
sq.Gt{colDeletedTxn: afterRevision},
sq.LtOrEq{colDeletedTxn: newRevision},
},
}).ToSql()
if err != nil {
return fmt.Errorf("unable to prepare changes SQL: %w", err)
}

rows, err := mds.db.QueryContext(ctx, sql, args...)
if err != nil {
if errors.Is(err, context.Canceled) {
err = datastore.NewWatchCanceledErr()
}
return
}

defer common.LogOnError(ctx, rows.Close)

for rows.Next() {
var createdTxn uint64
var deletedTxn uint64
var config []byte
var name string

err = rows.Scan(
&name,
&config,
&createdTxn,
&deletedTxn,
)
if err != nil {
return fmt.Errorf("unable to parse changed caveat: %w", err)
}
loaded := &core.CaveatDefinition{}
if err := loaded.UnmarshalVT(config); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}

if createdTxn > afterRevision && createdTxn <= newRevision {
if err = stagedChanges.AddChangedDefinition(ctx, revisions.NewForTransactionID(createdTxn), loaded); err != nil {
return
}
}
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.Name); err != nil {
return
if deletedTxn > afterRevision && deletedTxn <= newRevision {
if schemaType.objectType == "namespace" {
if err = stagedChanges.AddDeletedNamespace(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil {
return err
}
} else if schemaType.objectType == "caveat" {
if err = stagedChanges.AddDeletedCaveat(ctx, revisions.NewForTransactionID(deletedTxn), loaded.GetName()); err != nil {
return err
}
}
}
}
}

if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load changes: %w", err)
if err = rows.Err(); err != nil {
return fmt.Errorf("unable to load %s changes: %w", schemaType.objectType, err)
}
}

return
return nil
}

0 comments on commit b465181

Please sign in to comment.