Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(multi-tenancy): update lease Ns ID after restore #7407

Merged
merged 8 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,26 @@ func runRestoreCmd() error {
return err
}

// MaxLeaseUid can be zero if the backup was taken on an empty DB.
if result.MaxLeaseUid > 0 {
ctx, cancelUid := context.WithTimeout(context.Background(), time.Minute)
defer cancelUid()
if _, err = zc.AssignIds(ctx, &pb.Num{Val: result.MaxLeaseUid, Type: pb.Num_UID}); err != nil {
fmt.Printf("Failed to assign maxLeaseId %d in Zero: %v\n", result.MaxLeaseUid, err)
leaseID := func(val uint64, typ pb.NumLeaseType) error {
// MaxLeaseUid can be zero if the backup was taken on an empty DB.
if val == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
if _, err = zc.AssignIds(ctx, &pb.Num{Val: val, Type: typ}); err != nil {
fmt.Printf("Failed to assign %s %d in Zero: %v\n",
pb.NumLeaseType_name[int32(typ)], val, err)
return err
}
return nil
}

if err := leaseID(result.MaxLeaseUid, pb.Num_UID); err != nil {
return err
}
if err := leaseID(result.MaxLeaseNsId, pb.Num_NS_ID); err != nil {
return err
}
}

Expand Down
2 changes: 1 addition & 1 deletion schema/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func parseTypeDeclaration(it *lex.ItemIterator, ns uint64) (*pb.TypeUpdate, erro
for _, field := range fields {
if _, ok := fieldSet[field.GetPredicate()]; ok {
return nil, it.Item().Errorf("Duplicate fields with name: %s",
field.GetPredicate())
x.ParseAttr(field.GetPredicate()))
}

fieldSet[field.GetPredicate()] = struct{}{}
Expand Down
88 changes: 44 additions & 44 deletions schema/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ name: string .
func TestSchema(t *testing.T) {
require.NoError(t, ParseBytes([]byte(schemaVal), 1))
checkSchema(t, State().predicate, []nameType{
{"name", &pb.SchemaUpdate{
Predicate: "name",
{x.GalaxyAttr("name"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("name"),
ValueType: pb.Posting_STRING,
}},
{"address", &pb.SchemaUpdate{
Predicate: "address",
{x.GalaxyAttr("address"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("address"),
ValueType: pb.Posting_STRING,
}},
{"http://scalar.com/helloworld/", &pb.SchemaUpdate{
Predicate: "http://scalar.com/helloworld/",
{x.GalaxyAttr("http://scalar.com/helloworld/"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("http://scalar.com/helloworld/"),
ValueType: pb.Posting_STRING,
}},
{"age", &pb.SchemaUpdate{
Predicate: "age",
{x.GalaxyAttr("age"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("age"),
ValueType: pb.Posting_INT,
}},
})

typ, err := State().TypeOf("age")
typ, err := State().TypeOf(x.GalaxyAttr("age"))
require.NoError(t, err)
require.Equal(t, types.IntID, typ)

_, err = State().TypeOf("agea")
_, err = State().TypeOf(x.GalaxyAttr("agea"))
require.Error(t, err)
}

Expand Down Expand Up @@ -165,36 +165,36 @@ friend : [uid] @reverse @count .
func TestSchemaIndexCustom(t *testing.T) {
require.NoError(t, ParseBytes([]byte(schemaIndexVal5), 1))
checkSchema(t, State().predicate, []nameType{
{"name", &pb.SchemaUpdate{
Predicate: "name",
{x.GalaxyAttr("name"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("name"),
ValueType: pb.Posting_STRING,
Tokenizer: []string{"exact"},
Directive: pb.SchemaUpdate_INDEX,
Count: true,
}},
{"address", &pb.SchemaUpdate{
Predicate: "address",
{x.GalaxyAttr("address"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("address"),
ValueType: pb.Posting_STRING,
Tokenizer: []string{"term"},
Directive: pb.SchemaUpdate_INDEX,
}},
{"age", &pb.SchemaUpdate{
Predicate: "age",
{x.GalaxyAttr("age"), &pb.SchemaUpdate{
Predicate: x.GalaxyAttr("age"),
ValueType: pb.Posting_INT,
Tokenizer: []string{"int"},
Directive: pb.SchemaUpdate_INDEX,
}},
{"friend", &pb.SchemaUpdate{
{x.GalaxyAttr("friend"), &pb.SchemaUpdate{
ValueType: pb.Posting_UID,
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
Directive: pb.SchemaUpdate_REVERSE,
Count: true,
List: true,
}},
})
require.True(t, State().IsIndexed(context.Background(), "name"))
require.False(t, State().IsReversed(context.Background(), "name"))
require.Equal(t, "int", State().Tokenizer(context.Background(), "age")[0].Name())
require.True(t, State().IsIndexed(context.Background(), x.GalaxyAttr("name")))
require.False(t, State().IsReversed(context.Background(), x.GalaxyAttr("name")))
require.Equal(t, "int", State().Tokenizer(context.Background(), x.GalaxyAttr("age"))[0].Name())
}

func TestParse(t *testing.T) {
Expand Down Expand Up @@ -277,21 +277,21 @@ func TestParseScalarList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "jobs",
Predicate: x.GalaxyAttr("jobs"),
ValueType: 9,
Directive: pb.SchemaUpdate_INDEX,
Tokenizer: []string{"term"},
List: true,
}, result.Preds[0])

require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "occupations",
Predicate: x.GalaxyAttr("occupations"),
ValueType: 9,
List: true,
}, result.Preds[1])

require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "graduation",
Predicate: x.GalaxyAttr("graduation"),
ValueType: 5,
List: true,
}, result.Preds[2])
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestParseUidList(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
ValueType: 7,
List: true,
}, result.Preds[0])
Expand All @@ -348,7 +348,7 @@ func TestParseUidSingleValue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.EqualValues(t, &pb.SchemaUpdate{
Predicate: "friend",
Predicate: x.GalaxyAttr("friend"),
ValueType: 7,
List: false,
}, result.Preds[0])
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestParseEmptyType(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
}, result.Types[0])

}
Expand All @@ -391,7 +391,7 @@ func TestParseTypeEOF(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
}, result.Types[0])

}
Expand All @@ -406,10 +406,10 @@ func TestParseSingleType(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
Expand All @@ -426,15 +426,15 @@ func TestParseCombinedSchemasAndTypes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Preds))
require.Equal(t, &pb.SchemaUpdate{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
ValueType: 9,
}, result.Preds[0])
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
Expand All @@ -453,18 +453,18 @@ func TestParseMultipleTypes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[0])
require.Equal(t, &pb.TypeUpdate{
TypeName: "Animal",
TypeName: x.GalaxyAttr("Animal"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
},
}, result.Types[1])
Expand Down Expand Up @@ -494,16 +494,16 @@ func TestOldTypeFormat(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
{
Predicate: "address",
Predicate: x.GalaxyAttr("address"),
},
{
Predicate: "children",
Predicate: x.GalaxyAttr("children"),
},
},
}, result.Types[0])
Expand All @@ -520,13 +520,13 @@ func TestOldAndNewTypeFormat(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result.Types))
require.Equal(t, &pb.TypeUpdate{
TypeName: "Person",
TypeName: x.GalaxyAttr("Person"),
Fields: []*pb.SchemaUpdate{
{
Predicate: "name",
Predicate: x.GalaxyAttr("name"),
},
{
Predicate: "address",
Predicate: x.GalaxyAttr("address"),
},
},
}, result.Types[0])
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,20 @@ func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error)
// A reader, the backup groupId, and a map whose keys are the predicates to restore
// are passed as arguments.
type loadFn func(reader io.Reader, groupId uint32, preds predicateSet,
dropOperations []*pb.DropOperation) (uint64, error)
dropOperations []*pb.DropOperation) (uint64, uint64, error)

// LoadBackup will scan location l for backup files in the given backup series and load them
// sequentially. Returns the maximum Since value on success, otherwise an error.
func LoadBackup(location, backupId string, backupNum uint64, creds *x.MinioCredentials,
fn loadFn) LoadResult {
uri, err := url.Parse(location)
if err != nil {
return LoadResult{0, 0, err}
return LoadResult{Err: err}
}

h := getHandler(uri.Scheme, creds)
if h == nil {
return LoadResult{0, 0, errors.Errorf("Unsupported URI: %v", uri)}
return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)}
}

return h.Load(uri, backupId, backupNum, fn)
Expand Down
2 changes: 2 additions & 0 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type LoadResult struct {
// MaxLeaseUid is the max UID seen by the load operation. Needed to request zero
// for the proper number of UIDs.
MaxLeaseUid uint64
// MaxLeaseNsId is the max namespace ID seen by the load operation.
MaxLeaseNsId uint64
// The error, if any, of the load operation.
Err error
}
Expand Down
17 changes: 10 additions & 7 deletions worker/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ func (h *fileHandler) GetManifests(uri *url.URL, backupId string,
func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn loadFn) LoadResult {
manifests, err := h.GetManifests(uri, backupId, backupNum)
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "cannot retrieve manifests")}
return LoadResult{Err: errors.Wrapf(err, "cannot retrieve manifests")}
}

// Process each manifest, first check that they are valid and then confirm the
// backup files for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
var since uint64
var maxUid uint64
var maxUid, maxNsId uint64
for i, manifest := range manifests {
if manifest.Since == 0 || len(manifest.Groups) == 0 {
continue
Expand All @@ -186,26 +186,29 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, backupNum uint64, fn l
file := filepath.Join(path, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "Failed to open %q", file)}
return LoadResult{Err: errors.Wrapf(err, "Failed to open %q", file)}
}
defer fp.Close()

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)

groupMaxUid, err := fn(fp, gid, predSet, manifest.DropOperations)
groupMaxUid, groupMaxNsId, err := fn(fp, gid, predSet, manifest.DropOperations)
if err != nil {
return LoadResult{0, 0, err}
return LoadResult{Err: err}
}
if groupMaxUid > maxUid {
maxUid = groupMaxUid
}
if groupMaxNsId > maxNsId {
maxNsId = groupMaxNsId
}
}
since = manifest.Since
}

return LoadResult{since, maxUid, nil}
return LoadResult{Version: since, MaxLeaseUid: maxUid, MaxLeaseNsId: maxNsId}
}

// Verify performs basic checks to decide whether the specified backup can be restored
Expand Down Expand Up @@ -314,7 +317,7 @@ func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, err = loadFromBackup(db, gzReader, 0, preds, nil)
_, _, err = loadFromBackup(db, gzReader, 0, preds, nil)
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
Expand Down
Loading