Skip to content

Commit

Permalink
fix(multi-tenancy): update lease Ns ID after restore (#7407)
Browse files Browse the repository at this point in the history
- Fix a bunch of tests.
- Update lease namespace ID after restore.
  • Loading branch information
NamanJain8 authored Feb 10, 2021
1 parent 66c701d commit cd81510
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 122 deletions.
24 changes: 18 additions & 6 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,26 @@ func runRestoreCmd() error {
return err
}

// MaxLeaseUid can be zero if the backup was taken on an empty DB.
if result.MaxLeaseUid > 0 {
ctx, cancelUid := context.WithTimeout(context.Background(), time.Minute)
defer cancelUid()
if _, err = zc.AssignIds(ctx, &pb.Num{Val: result.MaxLeaseUid, Type: pb.Num_UID}); err != nil {
fmt.Printf("Failed to assign maxLeaseId %d in Zero: %v\n", result.MaxLeaseUid, err)
leaseID := func(val uint64, typ pb.NumLeaseType) error {
// MaxLeaseUid can be zero if the backup was taken on an empty DB.
if val == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if _, err = zc.AssignIds(ctx, &pb.Num{Val: val, Type: typ}); err != nil {
fmt.Printf("Failed to assign %s %d in Zero: %v\n",
pb.NumLeaseType_name[int32(typ)], val, err)
return err
}
return nil
}

if err := leaseID(result.MaxLeaseUid, pb.Num_UID); err != nil {
return errors.Wrapf(err, "cannot update max uid lease after restore.")
}
if err := leaseID(result.MaxLeaseNsId, pb.Num_NS_ID); err != nil {
return errors.Wrapf(err, "cannot update max namespace lease after restore.")
}
}

Expand Down
2 changes: 1 addition & 1 deletion schema/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func parseTypeDeclaration(it *lex.ItemIterator, ns uint64) (*pb.TypeUpdate, erro
for _, field := range fields {
if _, ok := fieldSet[field.GetPredicate()]; ok {
return nil, it.Item().Errorf("Duplicate fields with name: %s",
field.GetPredicate())
x.ParseAttr(field.GetPredicate()))
}

fieldSet[field.GetPredicate()] = struct{}{}
Expand Down
88 changes: 44 additions & 44 deletions schema/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ name: string .
func TestSchema(t *testing.T) {
require.NoError(t, ParseBytes([]byte(schemaVal), 1))
checkSchema(t, State().predicate, []nameType{
{"name", &pb.SchemaUpdate{
Predicate: "name",
{x.GalaxyAttr("name"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("name"),
ValueType: pb.Posting_STRING,
}},
{"address", &pb.SchemaUpdate{
Predicate: "address",
{x.GalaxyAttr("address"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("address"),
ValueType: pb.Posting_STRING,
}},
{"http://scalar.com/helloworld/", &pb.SchemaUpdate{
Predicate: "http://scalar.com/helloworld/",
{x.GalaxyAttr("http://scalar.com/helloworld/"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("http://scalar.com/helloworld/"),
ValueType: pb.Posting_STRING,
}},
{"age", &pb.SchemaUpdate{
Predicate: "age",
{x.GalaxyAttr("age"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("age"),
ValueType: pb.Posting_INT,
}},
})

typ, err := State().TypeOf("age")
typ, err := State().TypeOf(x.GalaxyAttr("age"))
require.NoError(t, err)
require.Equal(t, types.IntID, typ)

_, err = State().TypeOf("agea")
_, err = State().TypeOf(x.GalaxyAttr("agea"))
require.Error(t, err)
}

Expand Down Expand Up @@ -165,36 +165,36 @@ friend : [uid] @reverse @count .
func TestSchemaIndexCustom(t *testing.T) {
require.NoError(t, ParseBytes([]byte(schemaIndexVal5), 1))
checkSchema(t, State().predicate, []nameType{
{"name", &pb.SchemaUpdate{
Predicate: "name",
{x.GalaxyAttr("name"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("name"),
ValueType: pb.Posting_STRING,
Tokenizer: []string{"exact"},
Directive: pb.SchemaUpdate_INDEX,
Count: true,
}},
{"address", &pb.SchemaUpdate{
Predicate: "address",
{x.GalaxyAttr("address"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("address"),
ValueType: pb.Posting_STRING,
Tokenizer: []string{"term"},
Directive: pb.SchemaUpdate_INDEX,
}},
{"age", &pb.SchemaUpdate{
Predicate: "age",
{x.GalaxyAttr("age"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("age"),
ValueType: pb.Posting_INT,
Tokenizer: []string{"int"},
Directive: pb.SchemaUpdate_INDEX,
}},
{"friend", &pb.SchemaUpdate{
{x.GalaxyAttr("friend"), &pb.SchemaUpdate{
ValueType: pb.Posting_UID,
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
Directive: pb.SchemaUpdate_REVERSE,
Count: true,
List: true,
}},
})
require.True(t, State().IsIndexed(context.Background(), "name"))
require.False(t, State().IsReversed(context.Background(), "name"))
require.Equal(t, "int", State().Tokenizer(context.Background(), "age")[0].Name())
require.True(t, State().IsIndexed(context.Background(), x.GalaxyAttr("name")))
require.False(t, State().IsReversed(context.Background(), x.GalaxyAttr("name")))
require.Equal(t, "int", State().Tokenizer(context.Background(), x.GalaxyAttr("age"))[0].Name())
}

func TestParse(t *testing.T) {
Expand Down Expand Up @@ -277,21 +277,21 @@ func TestParseScalarList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "jobs",
Predicate: x.GalaxyAttr("jobs"),
ValueType: 9,
Directive: pb.SchemaUpdate_INDEX,
Tokenizer: []string{"term"},
List: true,
}, result.Preds[0])

require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "occupations",
Predicate: x.GalaxyAttr("occupations"),
ValueType: 9,
List: true,
}, result.Preds[1])

require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "graduation",
Predicate: x.GalaxyAttr("graduation"),
ValueType: 5,
List: true,
}, result.Preds[2])
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestParseUidList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
ValueType: 7,
List: true,
}, result.Preds[0])
Expand All @@ -348,7 +348,7 @@ func TestParseUidSingleValue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
ValueType: 7,
List: false,
}, result.Preds[0])
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestParseEmptyType(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
}, result.Types[0])

}
Expand All @@ -391,7 +391,7 @@ func TestParseTypeEOF(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
}, result.Types[0])

}
Expand All @@ -406,10 +406,10 @@ func TestParseSingleType(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
Expand All @@ -426,15 +426,15 @@ func TestParseCombinedSchemasAndTypes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.Equal(t, &pb.SchemaUpdate{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
ValueType: 9,
}, result.Preds[0])
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
Expand All @@ -453,18 +453,18 @@ func TestParseMultipleTypes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
require.Equal(t, &pb.TypeUpdate{
TypeName: "Animal",
TypeName: x.GalaxyAttr("Animal"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[1])
Expand Down Expand Up @@ -494,16 +494,16 @@ func TestOldTypeFormat(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
{
Predicate: "address",
Predicate: x.GalaxyAttr("address"),
},
{
Predicate: "children",
Predicate: x.GalaxyAttr("children"),
},
},
}, result.Types[0])
Expand All @@ -520,13 +520,13 @@ func TestOldAndNewTypeFormat(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
{
Predicate: "address",
Predicate: x.GalaxyAttr("address"),
},
},
}, result.Types[0])
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,20 @@ func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error)
// A reader, the backup groupId, and a map whose keys are the predicates to restore
// are passed as arguments.
type loadFn func(reader io.Reader, groupId uint32, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, error)
dropOperations []*pb.DropOperation) (uint64, uint64, error)

// LoadBackup will scan location l for backup files in the given backup series and load them
// sequentially. Returns the maximum Since value on success, otherwise an error.
func LoadBackup(location, backupId string, backupNum uint64, creds *x.MinioCredentials,
fn loadFn) LoadResult {
uri, err := url.Parse(location)
if err != nil {
return LoadResult{0, 0, err}
return LoadResult{Err: err}
}

h := getHandler(uri.Scheme, creds)
if h == nil {
return LoadResult{0, 0, errors.Errorf("Unsupported URI: %v", uri)}
return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)}
}

return h.Load(uri, backupId, backupNum, fn)
Expand Down
2 changes: 2 additions & 0 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type LoadResult struct {
// MaxLeaseUid is the max UID seen by the load operation. Needed to request zero
// for the proper number of UIDs.
MaxLeaseUid uint64
// MaxLeaseNsId is the max namespace ID seen by the load operation.
MaxLeaseNsId uint64
// The error, if any, of the load operation.
Err error
}
Expand Down
19 changes: 9 additions & 10 deletions worker/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ func (h *fileHandler) GetManifests(uri *url.URL, backupId string,
func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn loadFn) LoadResult {
manifests, err := h.GetManifests(uri, backupId, backupNum)
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "cannot retrieve manifests")}
return LoadResult{Err: errors.Wrapf(err, "cannot retrieve manifests")}
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
var since uint64
var maxUid uint64
var maxUid, maxNsId uint64
for i, manifest := range manifests {
if manifest.Since == 0 || len(manifest.Groups) == 0 {
continue
Expand All @@ -186,26 +186,25 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn l
file := filepath.Join(path, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "Failed to open %q", file)}
return LoadResult{Err: errors.Wrapf(err, "Failed to open %q", file)}
}
defer fp.Close()

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)

groupMaxUid, err := fn(fp, gid, predSet, manifest.DropOperations)
groupMaxUid, groupMaxNsId, err := fn(fp, gid, predSet, manifest.DropOperations)
if err != nil {
return LoadResult{0, 0, err}
}
if groupMaxUid > maxUid {
maxUid = groupMaxUid
return LoadResult{Err: err}
}
maxUid = x.Max(maxUid, groupMaxUid)
maxNsId = x.Max(maxNsId, groupMaxNsId)
}
since = manifest.Since
}

return LoadResult{since, maxUid, nil}
return LoadResult{Version: since, MaxLeaseUid: maxUid, MaxLeaseNsId: maxNsId}
}

// Verify performs basic checks to decide whether the specified backup can be restored
Expand Down Expand Up @@ -314,7 +313,7 @@ func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, err = loadFromBackup(db, gzReader, 0, preds, nil)
_, _, err = loadFromBackup(db, gzReader, 0, preds, nil)
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
Expand Down
Loading

0 comments on commit cd81510

Please sign in to comment.