Skip to content

Commit

Permalink
fix(backup): make drop data namespace aware
Browse files Browse the repository at this point in the history
cherry-picks:
  * 260a789
  * 7981a4d
  • Loading branch information
NamanJain8 authored and mangalaman93 committed Jan 3, 2023
1 parent 992ef57 commit 165f5a6
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 69 deletions.
1 change: 1 addition & 0 deletions edgraph/access.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build oss
// +build oss

/*
Expand Down
53 changes: 29 additions & 24 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,34 +437,39 @@ func InitializeAcl(closer *z.Closer) {
return
}

upsertGuardianAndGroot := func(ns uint64) {
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
for ns := range schema.State().Namespaces() {
upsertGuardianAndGroot(closer, ns)
}
}

for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
// Note: The handling of closer should be done by caller.
func upsertGuardianAndGroot(closer *z.Closer, ns uint64) {
if len(worker.Config.HmacSecret) == 0 {
// The acl feature is not turned on.
return
}
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}

for ns := range schema.State().Namespaces() {
upsertGuardianAndGroot(ns)
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
}

Expand Down
11 changes: 2 additions & 9 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,6 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
}

if op.DropOp == api.Operation_DATA {
if x.Config.BlockClusterWideDrop {
glog.V(2).Info("Blocked drop-data because it is not permitted.")
return empty, errors.New("Drop data operation is not permitted.")
}
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
return empty, errors.Wrapf(err, "Drop data can only be called by the guardian of the"+
" galaxy")
}
if len(op.DropValue) > 0 {
return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty")
}
Expand All @@ -448,13 +440,14 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
}

m.DropOp = pb.Mutations_DATA
m.DropValue = fmt.Sprintf("%#x", namespace)
_, err = query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert a helper record for backup & restore, indicating that drop_data was done
err = InsertDropRecord(ctx, "DROP_DATA;")
err = InsertDropRecord(ctx, fmt.Sprintf("DROP_DATA;%#x", namespace))
if err != nil {
return empty, err
}
Expand Down
10 changes: 7 additions & 3 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package posting
import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -1235,9 +1236,12 @@ func DeleteAll() error {
return pstore.DropAll()
}

// DeleteData deletes all data but leaves types and schema intact.
func DeleteData() error {
return pstore.DropPrefix([]byte{x.DefaultPrefix})
// DeleteData deletes all data for the namespace but leaves types and schema intact.
func DeleteData(ns uint64) error {
prefix := make([]byte, 9)
prefix[0] = x.DefaultPrefix
binary.BigEndian.PutUint64(prefix[1:], ns)
return pstore.DropPrefix(prefix)
}

// DeletePredicate deletes all entries and indices for a given predicate.
Expand Down
18 changes: 18 additions & 0 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package posting

import (
"context"
"encoding/hex"
"math"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -248,6 +249,23 @@ func (o *oracle) ResetTxns() {
o.pendingTxns = make(map[uint64]*Txn)
}

// ResetTxnForNs deletes all the pending transactions for a given namespace.
func (o *oracle) ResetTxnsForNs(ns uint64) {
txns := o.IterateTxns(func(key []byte) bool {
pk, err := x.Parse(key)
if err != nil {
glog.Errorf("error %v while parsing key %v", err, hex.EncodeToString(key))
return false
}
return x.ParseNamespace(pk.Attr) == ns
})
o.Lock()
defer o.Unlock()
for _, txn := range txns {
delete(o.pendingTxns, txn)
}
}

func (o *oracle) GetTxn(startTs uint64) *Txn {
o.RLock()
defer o.RUnlock()
Expand Down
30 changes: 24 additions & 6 deletions systest/backup/multi-tenancy/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,17 @@ func TestBackupMultiTenancy(t *testing.T) {
dg := testutil.DgClientWithLogin(t, "groot", "password", x.GalaxyNamespace)
testutil.DropAll(t, dg)

galaxyCreds := &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace}
galaxyCreds := &testutil.LoginParams{
UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace}
galaxyToken := testutil.Login(t, galaxyCreds)

// Create a new namespace
ns, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
ns1, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
require.NoError(t, err)
dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns)
ns2, err := testutil.CreateNamespaceWithRetry(t, galaxyToken)
require.NoError(t, err)
dg1 := testutil.DgClientWithLogin(t, "groot", "password", ns1)
dg2 := testutil.DgClientWithLogin(t, "groot", "password", ns2)

addSchema := func(dg *dgo.Dgraph) {
// Add schema and types.
Expand All @@ -69,6 +73,7 @@ func TestBackupMultiTenancy(t *testing.T) {

addSchema(dg)
addSchema(dg1)
addSchema(dg2)

addData := func(dg *dgo.Dgraph, name string) *api.Response {
var buf bytes.Buffer
Expand Down Expand Up @@ -96,7 +101,8 @@ func TestBackupMultiTenancy(t *testing.T) {

original := make(map[uint64]*api.Response)
original[x.GalaxyNamespace] = addData(dg, "galaxy")
original[ns] = addData(dg1, "ns")
original[ns1] = addData(dg1, "ns1")
original[ns2] = addData(dg2, "ns2")

// Setup test directories.
common.DirSetup(t)
Expand All @@ -111,11 +117,23 @@ func TestBackupMultiTenancy(t *testing.T) {
expectedResponse := `{ "q": [{ "count": 5 }]}`
testutil.VerifyQueryResponse(t, dg, query, expectedResponse)
testutil.VerifyQueryResponse(t, dg1, query, expectedResponse)
testutil.VerifyQueryResponse(t, dg2, query, expectedResponse)

// Call drop data from namespace ns2.
require.NoError(t, dg2.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA}))
// Send backup request.
_ = runBackup(t, galaxyToken, 6, 2)
testutil.DropAll(t, dg)
sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt)
testutil.WaitForRestore(t, dg)
testutil.VerifyQueryResponse(t, dg, query, expectedResponse)
testutil.VerifyQueryResponse(t, dg1, query, expectedResponse)
testutil.VerifyQueryResponse(t, dg2, query, `{ "q": [{ "count": 0 }]}`)

// After deleting a namespace in incremental backup, we should not be able to get the data from
// banned namespace.
require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns))
_ = runBackup(t, galaxyToken, 6, 2)
require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns1))
_ = runBackup(t, galaxyToken, 9, 3)
testutil.DropAll(t, dg)
sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt)
testutil.WaitForRestore(t, dg)
Expand Down
4 changes: 3 additions & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,9 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper
}
// A dgraph.drop.op record can have values in only one of the following formats:
// * DROP_ALL;
// * DROP_DATA;
// * DROP_DATA;ns
// * DROP_ATTR;attrName
// * DROP_NS;ns
// So, accordingly construct the *pb.DropOperation.
dropOp := &pb.DropOperation{}
dropInfo := strings.Split(string(val), ";")
Expand All @@ -682,6 +683,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper
dropOp.DropOp = pb.DropOperation_ALL
case "DROP_DATA":
dropOp.DropOp = pb.DropOperation_DATA
dropOp.DropValue = dropInfo[1] // contains namespace.
case "DROP_ATTR":
dropOp.DropOp = pb.DropOperation_ATTR
dropOp.DropValue = dropInfo[1]
Expand Down
49 changes: 39 additions & 10 deletions worker/cdc_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/binary"
"encoding/json"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -113,6 +114,19 @@ func (cdc *CDC) resetPendingEvents() {
cdc.pendingTxnEvents = make(map[uint64][]CDCEvent)
}

func (cdc *CDC) resetPendingEventsForNs(ns uint64) {
if cdc == nil {
return
}
cdc.Lock()
defer cdc.Unlock()
for ts, events := range cdc.pendingTxnEvents {
if len(events) > 0 && binary.BigEndian.Uint64(events[0].Meta.Namespace) == ns {
delete(cdc.pendingTxnEvents, ts)
}
}
}

func (cdc *CDC) hasPending(attr string) bool {
if cdc == nil {
return false
Expand Down Expand Up @@ -243,9 +257,15 @@ func (cdc *CDC) processCDCEvents() {
switch {
case proposal.Mutations.DropOp != pb.Mutations_NONE: // this means its a drop operation
// if there is DROP ALL or DROP DATA operation, clear pending events also.
if proposal.Mutations.DropOp == pb.Mutations_ALL ||
proposal.Mutations.DropOp == pb.Mutations_DATA {
if proposal.Mutations.DropOp == pb.Mutations_ALL {
cdc.resetPendingEvents()
} else if proposal.Mutations.DropOp == pb.Mutations_DATA {
ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64)
if err != nil {
glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err)
return
}
cdc.resetPendingEventsForNs(ns)
}
if err := sendToSink(events, proposal.Mutations.StartTs); err != nil {
rerr = errors.Wrapf(err, "unable to send messages to sink")
Expand Down Expand Up @@ -393,15 +413,24 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent {
}

// If drop operation
// todo (aman): right now drop all and data operations are still cluster wide.
// Fix these once we have namespace specific operations.
if mutation.DropOp != pb.Mutations_NONE {
ns := make([]byte, 8)
binary.BigEndian.PutUint64(ns, x.GalaxyNamespace)
namespace := make([]byte, 8)
var t string
if mutation.DropOp == pb.Mutations_TYPE {
// drop type are namespace specific.
ns, t = x.ParseNamespaceBytes(mutation.DropValue)
switch mutation.DropOp {
case pb.Mutations_ALL:
// Drop all is cluster wide.
binary.BigEndian.PutUint64(namespace, x.GalaxyNamespace)
case pb.Mutations_DATA:
ns, err := strconv.ParseUint(mutation.DropValue, 0, 64)
if err != nil {
glog.Warningf("CDC: parsing namespace failed with error %v. Ignoring.", err)
return nil
}
binary.BigEndian.PutUint64(namespace, ns)
case pb.Mutations_TYPE:
namespace, t = x.ParseNamespaceBytes(mutation.DropValue)
default:
glog.Error("CDC: got unhandled drop operation")
}

return []CDCEvent{
Expand All @@ -413,7 +442,7 @@ func toCDCEvent(index uint64, mutation *pb.Mutations) []CDCEvent {
},
Meta: &EventMeta{
RaftIndex: index,
Namespace: ns,
Namespace: namespace,
},
},
}
Expand Down
14 changes: 10 additions & 4 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -351,14 +352,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
span := otrace.FromContext(ctx)

if proposal.Mutations.DropOp == pb.Mutations_DATA {
ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64)
if err != nil {
return err
}
// Ensures nothing get written to disk due to commit proposals.
posting.Oracle().ResetTxns()
if err := posting.DeleteData(); err != nil {
posting.Oracle().ResetTxnsForNs(ns)
if err := posting.DeleteData(ns); err != nil {
return err
}

// Clear entire cache.
posting.ResetCache()
// TODO: Revisit this when we work on posting cache. Clear entire cache.
// We don't want to drop entire cache, just due to one namespace.
// posting.ResetCache()
return nil
}

Expand Down
Loading

0 comments on commit 165f5a6

Please sign in to comment.