Skip to content

Commit

Permalink
fix(schema-update): Start opIndexing only when index creation is requ…
Browse files Browse the repository at this point in the history
…ired. (#7845)

We were starting opIndexing while schema update irrespective of whether index 
building is required or not. This caused Dgraph to be unable to create namespaces
while backup is running, despite the fact that namespace creation doesn't need 
any indexing. This PR fixes this issue.
  • Loading branch information
ahsanbarkati authored May 24, 2021
1 parent df1300f commit 5595f9c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
3 changes: 0 additions & 3 deletions edgraph/multi_tenancy_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ func (s *Server) CreateNamespace(ctx context.Context, passwd string) (uint64, er
return 0, err
}

if err = worker.WaitForIndexing(ctx, true); err != nil {
return 0, errors.Wrap(err, "Creating namespace, got error: ")
}
err = x.RetryUntilSuccess(10, 100*time.Millisecond, func() error {
return createGuardianAndGroot(ctx, ids.StartId, passwd)
})
Expand Down
7 changes: 7 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ func (n *node) waitForTask(id op) {
closer.Wait()
}

func (n *node) isRunningTask(id op) bool {
n.opsLock.Lock()
_, ok := n.ops[id]
n.opsLock.Unlock()
return ok
}

func (n *node) stopAllTasks() {
defer n.closer.Done() // CLOSER:1
<-n.closer.HasBeenClosed()
Expand Down
27 changes: 16 additions & 11 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,6 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
}
}

// Ensure that rollup is not running.
closer, err := gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(closer)

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
Expand Down Expand Up @@ -214,9 +207,9 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
// "Too many open files" error.
throttle := y.NewThrottle(maxOpenFileLimit / 8)

buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) {
buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild, c *z.Closer) {
// In case background indexing is running, we should call it here again.
defer stopIndexing(closer)
defer stopIndexing(c)

// We should only start building indexes once this function has returned.
// This is in order to ensure that we do not call DropPrefix for one predicate
Expand All @@ -233,6 +226,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
throttle.Done(nil)
}

var closer *z.Closer
for _, su := range updates {
if tablet, err := groups().Tablet(su.Predicate); err != nil {
return err
Expand All @@ -251,6 +245,17 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
OldSchema: &old,
CurrentSchema: su,
}
shouldRebuild := ok && rebuild.NeedIndexRebuild()

// Start opIndexing task only if schema update needs to build the indexes.
if shouldRebuild && !gr.Node.isRunningTask(opIndexing) {
closer, err = gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(closer)
}

querySchema := rebuild.GetQuerySchema()
// Sets the schema only in memory. The schema is written to
// disk only after schema mutations are successful.
Expand All @@ -273,8 +278,8 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
return err
}

if ok && rebuild.NeedIndexRebuild() {
go buildIndexes(su, rebuild)
if shouldRebuild {
go buildIndexes(su, rebuild, closer)
} else if err := updateSchema(su); err != nil {
return err
}
Expand Down

0 comments on commit 5595f9c

Please sign in to comment.