diff --git a/edgraph/access.go b/edgraph/access.go index 23855d5c173..cf54976421c 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -1,3 +1,4 @@ +//go:build oss // +build oss /* diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 46b2e3068a4..bedfbc74664 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -437,34 +437,39 @@ func InitializeAcl(closer *z.Closer) { return } - upsertGuardianAndGroot := func(ns uint64) { - for closer.Ctx().Err() == nil { - ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) - defer cancel() - ctx = x.AttachNamespace(ctx, ns) - if err := upsertGuardian(ctx); err != nil { - glog.Infof("Unable to upsert the guardian group. Error: %v", err) - time.Sleep(100 * time.Millisecond) - continue - } - break - } + for ns := range schema.State().Namespaces() { + upsertGuardianAndGroot(closer, ns) + } +} - for closer.Ctx().Err() == nil { - ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) - defer cancel() - ctx = x.AttachNamespace(ctx, ns) - if err := upsertGroot(ctx, "password"); err != nil { - glog.Infof("Unable to upsert the groot account. Error: %v", err) - time.Sleep(100 * time.Millisecond) - continue - } - break +// Note: The handling of closer should be done by caller. +func upsertGuardianAndGroot(closer *z.Closer, ns uint64) { + if len(worker.Config.HmacSecret) == 0 { + // The acl feature is not turned on. + return + } + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) + defer cancel() + ctx = x.AttachNamespace(ctx, ns) + if err := upsertGuardian(ctx); err != nil { + glog.Infof("Unable to upsert the guardian group. Error: %v", err) + time.Sleep(100 * time.Millisecond) + continue } + break } - for ns := range schema.State().Namespaces() { - upsertGuardianAndGroot(ns) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) + defer cancel() + ctx = x.AttachNamespace(ctx, ns) + if err := upsertGroot(ctx, "password"); err != nil { + glog.Infof("Unable to upsert the groot account. Error: %v", err) + time.Sleep(100 * time.Millisecond) + continue + } + break } } diff --git a/edgraph/server.go b/edgraph/server.go index d6b1f5b988c..31e6a14827b 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -429,14 +429,6 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er } if op.DropOp == api.Operation_DATA { - if x.Config.BlockClusterWideDrop { - glog.V(2).Info("Blocked drop-data because it is not permitted.") - return empty, errors.New("Drop data operation is not permitted.") - } - if err := AuthGuardianOfTheGalaxy(ctx); err != nil { - return empty, errors.Wrapf(err, "Drop data can only be called by the guardian of the"+ - " galaxy") - } if len(op.DropValue) > 0 { return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty") } @@ -448,13 +440,14 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er } m.DropOp = pb.Mutations_DATA + m.DropValue = fmt.Sprintf("%#x", namespace) _, err = query.ApplyMutations(ctx, m) if err != nil { return empty, err } // insert a helper record for backup & restore, indicating that drop_data was done - err = InsertDropRecord(ctx, "DROP_DATA;") + err = InsertDropRecord(ctx, fmt.Sprintf("DROP_DATA;%#x", namespace)) if err != nil { return empty, err } diff --git a/posting/index.go b/posting/index.go index d259a74c920..ee310b5d30f 100644 --- a/posting/index.go +++ b/posting/index.go @@ -19,6 +19,7 @@ package posting import ( "bytes" "context" + "encoding/binary" "encoding/hex" "fmt" "io/ioutil" @@ -1235,9 +1236,12 @@ func DeleteAll() error { return pstore.DropAll() } -// DeleteData deletes all data but leaves types and schema intact. -func DeleteData() error { - return pstore.DropPrefix([]byte{x.DefaultPrefix}) +// DeleteData deletes all data for the namespace but leaves types and schema intact. +func DeleteData(ns uint64) error { + prefix := make([]byte, 9) + prefix[0] = x.DefaultPrefix + binary.BigEndian.PutUint64(prefix[1:], ns) + return pstore.DropPrefix(prefix) } // DeletePredicate deletes all entries and indices for a given predicate. diff --git a/posting/oracle.go b/posting/oracle.go index 31b914a0f93..54036c352c8 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -18,6 +18,7 @@ package posting import ( "context" + "encoding/hex" "math" "sync" "sync/atomic" @@ -248,6 +249,23 @@ func (o *oracle) ResetTxns() { o.pendingTxns = make(map[uint64]*Txn) } +// ResetTxnForNs deletes all the pending transactions for a given namespace. +func (o *oracle) ResetTxnsForNs(ns uint64) { + txns := o.IterateTxns(func(key []byte) bool { + pk, err := x.Parse(key) + if err != nil { + glog.Errorf("error %v while parsing key %v", err, hex.EncodeToString(key)) + return false + } + return x.ParseNamespace(pk.Attr) == ns + }) + o.Lock() + defer o.Unlock() + for _, txn := range txns { + delete(o.pendingTxns, txn) + } +} + func (o *oracle) GetTxn(startTs uint64) *Txn { o.RLock() defer o.RUnlock() diff --git a/systest/backup/multi-tenancy/backup_test.go b/systest/backup/multi-tenancy/backup_test.go index a17d769be9e..aee4cf9ef50 100644 --- a/systest/backup/multi-tenancy/backup_test.go +++ b/systest/backup/multi-tenancy/backup_test.go @@ -50,13 +50,17 @@ func TestBackupMultiTenancy(t *testing.T) { dg := testutil.DgClientWithLogin(t, "groot", "password", x.GalaxyNamespace) testutil.DropAll(t, dg) - galaxyCreds := &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace} + galaxyCreds := &testutil.LoginParams{ + UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace} galaxyToken := testutil.Login(t, galaxyCreds) // Create a new namespace - ns, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) + ns1, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) require.NoError(t, err) - dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns) + ns2, err := testutil.CreateNamespaceWithRetry(t, galaxyToken) + require.NoError(t, err) + dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns1) + dg2 := testutil.DgClientWithLogin(t, "groot", "password", ns2) addSchema := func(dg *dgo.Dgraph) { // Add schema and types. @@ -69,6 +73,7 @@ func TestBackupMultiTenancy(t *testing.T) { addSchema(dg) addSchema(dg1) + addSchema(dg2) addData := func(dg *dgo.Dgraph, name string) *api.Response { var buf bytes.Buffer @@ -96,7 +101,8 @@ func TestBackupMultiTenancy(t *testing.T) { original := make(map[uint64]*api.Response) original[x.GalaxyNamespace] = addData(dg, "galaxy") - original[ns] = addData(dg1, "ns") + original[ns1] = addData(dg1, "ns1") + original[ns2] = addData(dg2, "ns2") // Setup test directories. common.DirSetup(t) @@ -111,11 +117,23 @@ func TestBackupMultiTenancy(t *testing.T) { expectedResponse := `{ "q": [{ "count": 5 }]}` testutil.VerifyQueryResponse(t, dg, query, expectedResponse) testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg2, query, expectedResponse) + + // Call drop data from namespace ns2. + require.NoError(t, dg2.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA})) + // Send backup request. + _ = runBackup(t, galaxyToken, 6, 2) + testutil.DropAll(t, dg) + sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) + testutil.WaitForRestore(t, dg) + testutil.VerifyQueryResponse(t, dg, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg2, query, `{ "q": [{ "count": 0 }]}`) // After deleting a namespace in incremental backup, we should not be able to get the data from // banned namespace. - require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns)) - _ = runBackup(t, galaxyToken, 6, 2) + require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns1)) + _ = runBackup(t, galaxyToken, 9, 3) testutil.DropAll(t, dg) sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) testutil.WaitForRestore(t, dg) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 2ddcd5c0010..73b3f7e6be2 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -669,8 +669,9 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper } // A dgraph.drop.op record can have values in only one of the following formats: // * DROP_ALL; - // * DROP_DATA; + // * DROP_DATA;ns // * DROP_ATTR;attrName + // * DROP_NS;ns // So, accordingly construct the *pb.DropOperation. dropOp := &pb.DropOperation{} dropInfo := strings.Split(string(val), ";") @@ -682,6 +683,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper dropOp.DropOp = pb.DropOperation_ALL case "DROP_DATA": dropOp.DropOp = pb.DropOperation_DATA + dropOp.DropValue = dropInfo[1] // contains namespace. case "DROP_ATTR": dropOp.DropOp = pb.DropOperation_ATTR dropOp.DropValue = dropInfo[1] diff --git a/worker/cdc_ee.go b/worker/cdc_ee.go index 9eb3a00dcec..92858009b14 100644 --- a/worker/cdc_ee.go +++ b/worker/cdc_ee.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "encoding/json" "math" + "strconv" "strings" "sync" "sync/atomic" @@ -113,6 +114,19 @@ func (cdc *CDC) resetPendingEvents() { cdc.pendingTxnEvents = make(map[uint64][]CDCEvent) } +func (cdc *CDC) resetPendingEventsForNs(ns uint64) { + if cdc == nil { + return + } + cdc.Lock() + defer cdc.Unlock() + for ts, events := range cdc.pendingTxnEvents { + if len(events) > 0 && binary.BigEndian.Uint64(events[0].Meta.Namespace) == ns { + delete(cdc.pendingTxnEvents, ts) + } + } +} + func (cdc *CDC) hasPending(attr string) bool { if cdc == nil { return false @@ -243,9 +257,15 @@ func (cdc *CDC) processCDCEvents() { switch { case proposal.Mutations.DropOp != pb.Mutations_NONE: // this means its a drop operation // if there is DROP ALL or DROP DATA operation, clear pending events also. - if proposal.Mutations.DropOp == pb.Mutations_ALL || - proposal.Mutations.DropOp == pb.Mutations_DATA { + if proposal.Mutations.DropOp == pb.Mutations_ALL { cdc.resetPendingEvents() + } else if proposal.Mutations.DropOp == pb.Mutations_DATA { + ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) + if err != nil { + glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err) + return + } + cdc.resetPendingEventsForNs(ns) } if err := sendToSink(events, proposal.Mutations.StartTs); err != nil { rerr = errors.Wrapf(err, "unable to send messages to sink") @@ -393,15 +413,24 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent { } // If drop operation - // todo (aman): right now drop all and data operations are still cluster wide. - // Fix these once we have namespace specific operations. if mutation.DropOp != pb.Mutations_NONE { - ns := make([]byte, 8) - binary.BigEndian.PutUint64(ns, x.GalaxyNamespace) + namespace := make([]byte, 8) var t string - if mutation.DropOp == pb.Mutations_TYPE { - // drop type are namespace specific. - ns, t = x.ParseNamespaceBytes(mutation.DropValue) + switch mutation.DropOp { + case pb.Mutations_ALL: + // Drop all is cluster wide. + binary.BigEndian.PutUint64(namespace, x.GalaxyNamespace) + case pb.Mutations_DATA: + ns, err := strconv.ParseUint(mutation.DropValue, 0, 64) + if err != nil { + glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err) + return nil + } + binary.BigEndian.PutUint64(namespace, ns) + case pb.Mutations_TYPE: + namespace, t = x.ParseNamespaceBytes(mutation.DropValue) + default: + glog.Error("CDC: got unhandled drop operation") } return []CDCEvent{ @@ -413,7 +442,7 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent { }, Meta: &EventMeta{ RaftIndex: index, - Namespace: ns, + Namespace: namespace, }, }, } diff --git a/worker/draft.go b/worker/draft.go index 8fa9455f71d..e102997486d 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -24,6 +24,7 @@ import ( "fmt" "math" "sort" + "strconv" "sync" "sync/atomic" "time" @@ -351,14 +352,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr span := otrace.FromContext(ctx) if proposal.Mutations.DropOp == pb.Mutations_DATA { + ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) + if err != nil { + return err + } // Ensures nothing get written to disk due to commit proposals. - posting.Oracle().ResetTxns() - if err := posting.DeleteData(); err != nil { + posting.Oracle().ResetTxnsForNs(ns) + if err := posting.DeleteData(ns); err != nil { return err } - // Clear entire cache. - posting.ResetCache() + // TODO: Revisit this when we work on posting cache. Clear entire cache. + // We don't want to drop entire cache, just due to one namespace. + // posting.ResetCache() return nil } diff --git a/worker/restore_map.go b/worker/restore_map.go index ba813884076..0ab51e4915e 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -106,12 +106,12 @@ func (br *backupReader) WithCompression(comp string) *backupReader { } type loadBackupInput struct { - restoreTs uint64 - preds predicateSet - dropOperations []*pb.DropOperation - isOld bool - keepSchema bool - compression string + restoreTs uint64 + preds predicateSet + dropNs map[uint64]struct{} + isOld bool + keepSchema bool + compression string } type listReq struct { @@ -326,6 +326,9 @@ func (m *mapper) processReqCh(ctx context.Context) error { switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + if _, ok := in.dropNs[ns]; ok { + return nil + } backupPl := &pb.BackupPostingList{} if err := backupPl.Unmarshal(kv.Value); err != nil { return errors.Wrapf(err, "while reading backup posting list") @@ -608,6 +611,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { dropAll := false dropAttr := make(map[string]struct{}) + dropNs := make(map[uint64]struct{}) // manifests are ordered as: latest..full for i, manifest := range manifests { @@ -641,11 +645,15 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { delete(predSet, p) } } + localDropNs := make(map[uint64]struct{}) + for ns := range dropNs { + localDropNs[ns] = struct{}{} + } in := &loadBackupInput{ - preds: predSet, - dropOperations: manifest.DropOperations, - isOld: manifest.Version == 0, - restoreTs: req.RestoreTs, + preds: predSet, + dropNs: localDropNs, + isOld: manifest.Version == 0, + restoreTs: req.RestoreTs, // Only map the schema keys corresponding to the latest backup. keepSchema: i == 0, compression: manifest.Compression, @@ -665,7 +673,17 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { case pb.DropOperation_ALL: dropAll = true case pb.DropOperation_DATA: - dropAll = true + var ns uint64 + if manifest.Version == 0 { + ns = x.GalaxyNamespace + } else { + var err error + ns, err = strconv.ParseUint(op.DropValue, 0, 64) + if err != nil { + return nil, errors.Wrap(err, "Map phase failed to parse namespace") + } + } + dropNs[ns] = struct{}{} case pb.DropOperation_ATTR: p := op.DropValue if manifest.Version == 0 { @@ -678,7 +696,6 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { continue } // If there is a drop namespace, we just ban the namespace in the pstore. - // TODO: We probably need to propose ban request. ns, err := strconv.ParseUint(op.DropValue, 0, 64) if err != nil { return nil, errors.Wrapf(err, "Map phase failed to parse namespace")