From bee9bbf4f1725493ea44397217657c006e467151 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 17:23:03 +0530 Subject: [PATCH 01/11] Rearrange backup code to prepare calling it from graphql admin API. --- dgraph/cmd/alpha/admin_backup.go | 50 +++++++++--------- graphql/admin/admin.go | 26 ++++++++- graphql/admin/backup.go | 90 ++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 25 deletions(-) create mode 100644 graphql/admin/backup.go diff --git a/dgraph/cmd/alpha/admin_backup.go b/dgraph/cmd/alpha/admin_backup.go index 5aa223c8f6e..09ed32d7d2e 100644 --- a/dgraph/cmd/alpha/admin_backup.go +++ b/dgraph/cmd/alpha/admin_backup.go @@ -44,14 +44,24 @@ func backupHandler(w http.ResponseWriter, r *http.Request) { }) { return } - if !worker.EnterpriseEnabled() { - x.SetStatus(w, "You must enable enterprise features first. "+ - "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", - "Backup failed.") - return + + destination := r.FormValue("destination") + accessKey := r.FormValue("access_key") + secretKey := r.FormValue("secret_key") + sessionToken := r.FormValue("session_token") + anonymous := r.FormValue("anonymous") == "true" + forceFull := r.FormValue("force_full") == "true" + + req := pb.BackupRequest{ + Destination: destination, + UnixTs: time.Now().UTC().Format("20060102.150405.000"), + AccessKey: accessKey, + SecretKey: secretKey, + SessionToken: sessionToken, + Anonymous: anonymous, } - if err := processHttpBackupRequest(context.Background(), r); err != nil { + if err := processBackupRequest(context.Background(), req, forceFull); err != nil { x.SetStatus(w, err.Error(), "Backup failed.") return } @@ -60,17 +70,16 @@ func backupHandler(w http.ResponseWriter, r *http.Request) { x.Check2(w.Write([]byte(`{"code": "Success", "message": "Backup completed."}`))) } -func processHttpBackupRequest(ctx context.Context, r *http.Request) error { - destination := r.FormValue("destination") - if destination == "" { - return errors.Errorf("You must specify a 'destination' value") +func processBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { + if !worker.EnterpriseEnabled() { + return errors.Errorf("You must enable enterprise features first. "+ + "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", + "Backup failed.") } - accessKey := r.FormValue("access_key") - secretKey := r.FormValue("secret_key") - sessionToken := r.FormValue("session_token") - anonymous := r.FormValue("anonymous") == "true" - forceFull := r.FormValue("force_full") == "true" + if req.Destination == "" { + return errors.Errorf("You must specify a 'destination' value") + } if err := x.HealthCheck(); err != nil { glog.Errorf("Backup canceled, not ready to accept requests: %s", err) @@ -83,15 +92,8 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error { return err } - req := pb.BackupRequest{ - ReadTs: ts.ReadOnly, - Destination: destination, - UnixTs: time.Now().UTC().Format("20060102.150405.000"), - AccessKey: accessKey, - SecretKey: secretKey, - SessionToken: sessionToken, - Anonymous: anonymous, - } + req.ReadTs = ts.ReadOnly + req.UnixTs = time.Now().UTC().Format("20060102.150405.000") // Read the manifests to get the right timestamp from which to start the backup. uri, err := url.Parse(req.Destination) diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index f1e63a26389..8d0fafd26bc 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -54,7 +54,7 @@ const ( graphqlAdminSchema = ` type GQLSchema @dgraph(type: "dgraph.graphql") { id: ID! - schema: String! @dgraph(type: "dgraph.graphql.schema") + schema: String! @dgraph(type: "dgraph.graphql.schema") generatedSchema: String! } @@ -85,6 +85,20 @@ const ( schema: String! } + input BackupInput { + destination: String! + accessKey: String + secretKey: String + sessionToken: String + anonymous: Boolean + forceFull: Boolean + } + + type BackupPayload { + code: String + message: String + } + type Query { getGQLSchema: GQLSchema health: Health @@ -92,6 +106,7 @@ const ( type Mutation { updateGQLSchema(input: UpdateGQLSchemaInput!) : UpdateGQLSchemaPayload + backup(input: BackupInput!) : BackupPayload } ` ) @@ -257,6 +272,15 @@ func newAdminResolverFactory() resolve.ResolverFactory { return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady)} }) }). + WithMutationResolver("backup", func(m schema.Mutation) resolve.MutationResolver { + backup := &backupResolver{} + + return resolve.NewMutationResolver( + backup, + backup, + backup, + resolve.StdQueryCompletion()) + }). WithSchemaIntrospection() return rf diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go new file mode 100644 index 00000000000..48bf67ccb11 --- /dev/null +++ b/graphql/admin/backup.go @@ -0,0 +1,90 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package admin + +import ( + "context" + "encoding/json" + "fmt" + + dgoapi "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/gql" + "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/golang/glog" +) + +type backupResolver struct { + // admin *adminServer + + // mutation schema.Mutation +} + +type backupInput struct { + Destination string + AccessKey string + SecretKey string + SessionToken string + Anonymous bool + ForceFull bool +} + +func (br *backupResolver) Rewrite( + m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) { + + glog.Info("Got backup request") + + input, err := getBackupInput(m) + if err != nil { + return nil, nil, err + } + fmt.Printf("input: %+v\n", input) + + // Return error if request fails here. + return nil, nil, nil +} + +func (br *backupResolver) FromMutationResult( + mutation schema.Mutation, + assigned map[string]string, + result map[string]interface{}) (*gql.GraphQuery, error) { + + return nil, nil +} + +func (br *backupResolver) Mutate( + ctx context.Context, + query *gql.GraphQuery, + mutations []*dgoapi.Mutation) (map[string]string, map[string]interface{}, error) { + + return nil, nil, nil +} + +func (br *backupResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) { + return nil, nil +} + +func getBackupInput(m schema.Mutation) (*backupInput, error) { + inputArg := m.ArgValue(schema.InputArgName) + inputByts, err := json.Marshal(inputArg) + if err != nil { + return nil, schema.GQLWrapf(err, "couldn't get input argument") + } + + var input backupInput + err = json.Unmarshal(inputByts, &input) + return &input, schema.GQLWrapf(err, "couldn't get input argument") +} From 26309f6acdc03d53677236f444336e7d103cedc8 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 17:45:36 +0530 Subject: [PATCH 02/11] Move ProcessBackupRequest to worker so that it can be called by both graphql/admin and ee package --- dgraph/cmd/alpha/admin_backup.go | 106 +------------------------------ ee/backup/backup.go | 1 + graphql/admin/backup.go | 19 ++++-- worker/backup_ee.go | 100 +++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 112 deletions(-) diff --git a/dgraph/cmd/alpha/admin_backup.go b/dgraph/cmd/alpha/admin_backup.go index 09ed32d7d2e..501e58664a6 100644 --- a/dgraph/cmd/alpha/admin_backup.go +++ b/dgraph/cmd/alpha/admin_backup.go @@ -21,16 +21,10 @@ package alpha import ( "context" "net/http" - "net/url" - "time" - "github.com/dgraph-io/dgraph/ee/backup" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" - - "github.com/golang/glog" - "github.com/pkg/errors" ) func init() { @@ -54,14 +48,13 @@ func backupHandler(w http.ResponseWriter, r *http.Request) { req := pb.BackupRequest{ Destination: destination, - UnixTs: time.Now().UTC().Format("20060102.150405.000"), AccessKey: accessKey, SecretKey: secretKey, SessionToken: sessionToken, Anonymous: anonymous, } - if err := processBackupRequest(context.Background(), req, forceFull); err != nil { + if err := worker.ProcessBackupRequest(context.Background(), req, forceFull); err != nil { x.SetStatus(w, err.Error(), "Backup failed.") return } @@ -69,100 +62,3 @@ func backupHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") x.Check2(w.Write([]byte(`{"code": "Success", "message": "Backup completed."}`))) } - -func processBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { - if !worker.EnterpriseEnabled() { - return errors.Errorf("You must enable enterprise features first. "+ - "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", - "Backup failed.") - } - - if req.Destination == "" { - return errors.Errorf("You must specify a 'destination' value") - } - - if err := x.HealthCheck(); err != nil { - glog.Errorf("Backup canceled, not ready to accept requests: %s", err) - return err - } - - ts, err := worker.Timestamps(ctx, &pb.Num{ReadOnly: true}) - if err != nil { - glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err) - return err - } - - req.ReadTs = ts.ReadOnly - req.UnixTs = time.Now().UTC().Format("20060102.150405.000") - - // Read the manifests to get the right timestamp from which to start the backup. - uri, err := url.Parse(req.Destination) - if err != nil { - return err - } - handler, err := backup.NewUriHandler(uri, backup.GetCredentialsFromRequest(&req)) - if err != nil { - return err - } - latestManifest, err := handler.GetLatestManifest(uri) - if err != nil { - return err - } - req.SinceTs = latestManifest.Since - if forceFull { - req.SinceTs = 0 - } - - // Update the membership state to get the latest mapping of groups to predicates. - if err := worker.UpdateMembershipState(ctx); err != nil { - return err - } - - // Get the current membership state and parse it for easier processing. - state := worker.GetMembershipState() - var groups []uint32 - predMap := make(map[uint32][]string) - for gid, group := range state.Groups { - groups = append(groups, gid) - predMap[gid] = make([]string, 0) - for pred := range group.Tablets { - predMap[gid] = append(predMap[gid], pred) - } - } - - glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - errCh := make(chan error, len(state.Groups)) - for _, gid := range groups { - req := req - req.GroupId = gid - req.Predicates = predMap[gid] - go func(req *pb.BackupRequest) { - _, err := worker.BackupGroup(ctx, req) - errCh <- err - }(&req) - } - - for range groups { - if err := <-errCh; err != nil { - glog.Errorf("Error received during backup: %v", err) - return err - } - } - - m := backup.Manifest{Since: req.ReadTs, Groups: predMap} - if req.SinceTs == 0 { - m.Type = "full" - m.BackupId = x.GetRandomName(1) - m.BackupNum = 1 - } else { - m.Type = "incremental" - m.BackupId = latestManifest.BackupId - m.BackupNum = latestManifest.BackupNum + 1 - } - - bp := &backup.Processor{Request: &req} - return bp.CompleteBackup(ctx, &m) -} diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 8f93799744e..7de92b484ab 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -297,3 +297,4 @@ func writeKVList(list *bpb.KVList, w io.Writer) error { _, err = w.Write(buf) return err } + diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go index 48bf67ccb11..0172490923f 100644 --- a/graphql/admin/backup.go +++ b/graphql/admin/backup.go @@ -24,14 +24,12 @@ import ( dgoapi "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/graphql/schema" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/worker" "github.com/golang/glog" ) -type backupResolver struct { - // admin *adminServer - - // mutation schema.Mutation -} +type backupResolver struct{} type backupInput struct { Destination string @@ -53,8 +51,15 @@ func (br *backupResolver) Rewrite( } fmt.Printf("input: %+v\n", input) - // Return error if request fails here. - return nil, nil, nil + err = worker.ProcessBackupRequest(context.Background(), pb.BackupRequest{ + Destination: input.Destination, + AccessKey: input.AccessKey, + SecretKey: input.SecretKey, + SessionToken: input.SessionToken, + Anonymous: input.Anonymous, + }, input.ForceFull) + + return nil, nil, err } func (br *backupResolver) FromMutationResult( diff --git a/worker/backup_ee.go b/worker/backup_ee.go index de0442c9306..a77178efd8d 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -14,10 +14,13 @@ package worker import ( "context" + "net/url" + "time" "github.com/dgraph-io/dgraph/ee/backup" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" "github.com/pkg/errors" @@ -70,3 +73,100 @@ func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) return res, nil } + +func ProcessRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { + if !EnterpriseEnabled() { + return errors.Errorf("You must enable enterprise features first. "+ + "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", + "Backup failed.") + } + + if req.Destination == "" { + return errors.Errorf("You must specify a 'destination' value") + } + + if err := x.HealthCheck(); err != nil { + glog.Errorf("Backup canceled, not ready to accept requests: %s", err) + return err + } + + ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true}) + if err != nil { + glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err) + return err + } + + req.ReadTs = ts.ReadOnly + req.UnixTs = time.Now().UTC().Format("20060102.150405.000") + + // Read the manifests to get the right timestamp from which to start the backup. + uri, err := url.Parse(req.Destination) + if err != nil { + return err + } + handler, err := backup.NewUriHandler(uri, backup.GetCredentialsFromRequest(&req)) + if err != nil { + return err + } + latestManifest, err := handler.GetLatestManifest(uri) + if err != nil { + return err + } + req.SinceTs = latestManifest.Since + if forceFull { + req.SinceTs = 0 + } + + // Update the membership state to get the latest mapping of groups to predicates. + if err := UpdateMembershipState(ctx); err != nil { + return err + } + + // Get the current membership state and parse it for easier processing. + state := GetMembershipState() + var groups []uint32 + predMap := make(map[uint32][]string) + for gid, group := range state.Groups { + groups = append(groups, gid) + predMap[gid] = make([]string, 0) + for pred := range group.Tablets { + predMap[gid] = append(predMap[gid], pred) + } + } + + glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errCh := make(chan error, len(state.Groups)) + for _, gid := range groups { + req := req + req.GroupId = gid + req.Predicates = predMap[gid] + go func(req *pb.BackupRequest) { + _, err := BackupGroup(ctx, req) + errCh <- err + }(&req) + } + + for range groups { + if err := <-errCh; err != nil { + glog.Errorf("Error received during backup: %v", err) + return err + } + } + + m := backup.Manifest{Since: req.ReadTs, Groups: predMap} + if req.SinceTs == 0 { + m.Type = "full" + m.BackupId = x.GetRandomName(1) + m.BackupNum = 1 + } else { + m.Type = "incremental" + m.BackupId = latestManifest.BackupId + m.BackupNum = latestManifest.BackupNum + 1 + } + + bp := &backup.Processor{Request: &req} + return bp.CompleteBackup(ctx, &m) +} From d34c0201f23b158d7d80733a8871f0fc42622064 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:19:25 +0530 Subject: [PATCH 03/11] It works and sends the correct response in case of success. --- graphql/admin/admin.go | 8 ++++++-- graphql/admin/backup.go | 35 ++++++++++++++++++++++++++++++----- worker/backup_ee.go | 2 +- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 8d0fafd26bc..8635fd4b933 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -94,11 +94,15 @@ const ( forceFull: Boolean } - type BackupPayload { + type Response { code: String message: String } + type BackupPayload { + response: Response + } + type Query { getGQLSchema: GQLSchema health: Health @@ -279,7 +283,7 @@ func newAdminResolverFactory() resolve.ResolverFactory { backup, backup, backup, - resolve.StdQueryCompletion()) + resolve.StdMutationCompletion(m.ResponseName())) }). WithSchemaIntrospection() diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go index 0172490923f..039d7a862a2 100644 --- a/graphql/admin/backup.go +++ b/graphql/admin/backup.go @@ -17,19 +17,22 @@ package admin import ( + "bytes" "context" "encoding/json" - "fmt" dgoapi "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/graphql/schema" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" + "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" ) -type backupResolver struct{} +type backupResolver struct { + mutation schema.Mutation +} type backupInput struct { Destination string @@ -42,14 +45,13 @@ type backupInput struct { func (br *backupResolver) Rewrite( m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) { - glog.Info("Got backup request") + br.mutation = m input, err := getBackupInput(m) if err != nil { return nil, nil, err } - fmt.Printf("input: %+v\n", input) err = worker.ProcessBackupRequest(context.Background(), pb.BackupRequest{ Destination: input.Destination, @@ -79,7 +81,30 @@ func (br *backupResolver) Mutate( } func (br *backupResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) { - return nil, nil + var buf bytes.Buffer + + x.Check2(buf.WriteString(`{ "`)) + x.Check2(buf.WriteString(br.mutation.SelectionSet()[0].ResponseName() + `": [{`)) + + for i, sel := range br.mutation.SelectionSet()[0].SelectionSet() { + var val string + switch sel.Name() { + case "code": + val = "Success" + case "message": + val = "Backup completed." + } + if i != 0 { + x.Check2(buf.WriteString(",")) + } + x.Check2(buf.WriteString(`"`)) + x.Check2(buf.WriteString(sel.ResponseName())) + x.Check2(buf.WriteString(`":`)) + x.Check2(buf.WriteString(`"` + val + `"`)) + } + x.Check2(buf.WriteString("}]}")) + + return buf.Bytes(), nil } func getBackupInput(m schema.Mutation) (*backupInput, error) { diff --git a/worker/backup_ee.go b/worker/backup_ee.go index a77178efd8d..62da28a5080 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -74,7 +74,7 @@ func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) return res, nil } -func ProcessRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { +func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { if !EnterpriseEnabled() { return errors.Errorf("You must enable enterprise features first. "+ "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", From b2efa3706ab321dcbf9e46aae80ac94bab1ce909 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:25:50 +0530 Subject: [PATCH 04/11] Add a dummy ProcessBackupRequest for oss binary. --- worker/backup.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/worker/backup.go b/worker/backup.go index 51c1ef64b18..3cd318c1fb6 100644 --- a/worker/backup.go +++ b/worker/backup.go @@ -31,3 +31,8 @@ func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.Sta glog.Warningf("Backup failed: %v", x.ErrNotSupported) return nil, x.ErrNotSupported } + +func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { + glog.Warningf("Backup failed: %v", x.ErrNotSupported) + return x.ErrNotSupported +} From 6c4473ffa643c655ba7aaef9d2ec26ebc232a16c Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:38:19 +0530 Subject: [PATCH 05/11] Run gofmt on backup.go --- ee/backup/backup.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 7de92b484ab..8f93799744e 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -297,4 +297,3 @@ func writeKVList(list *bpb.KVList, w io.Writer) error { _, err = w.Write(buf) return err } - From 085246f9f72fe0826d6e35f7c922285411044e9a Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:46:59 +0530 Subject: [PATCH 06/11] Pass a pointer around --- dgraph/cmd/alpha/admin_backup.go | 2 +- graphql/admin/backup.go | 2 +- worker/backup.go | 2 +- worker/backup_ee.go | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/alpha/admin_backup.go b/dgraph/cmd/alpha/admin_backup.go index 501e58664a6..8919e0940da 100644 --- a/dgraph/cmd/alpha/admin_backup.go +++ b/dgraph/cmd/alpha/admin_backup.go @@ -54,7 +54,7 @@ func backupHandler(w http.ResponseWriter, r *http.Request) { Anonymous: anonymous, } - if err := worker.ProcessBackupRequest(context.Background(), req, forceFull); err != nil { + if err := worker.ProcessBackupRequest(context.Background(), &req, forceFull); err != nil { x.SetStatus(w, err.Error(), "Backup failed.") return } diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go index 039d7a862a2..2759ff23ee8 100644 --- a/graphql/admin/backup.go +++ b/graphql/admin/backup.go @@ -53,7 +53,7 @@ func (br *backupResolver) Rewrite( return nil, nil, err } - err = worker.ProcessBackupRequest(context.Background(), pb.BackupRequest{ + err = worker.ProcessBackupRequest(context.Background(), &pb.BackupRequest{ Destination: input.Destination, AccessKey: input.AccessKey, SecretKey: input.SecretKey, diff --git a/worker/backup.go b/worker/backup.go index 3cd318c1fb6..d6ec5d0ab31 100644 --- a/worker/backup.go +++ b/worker/backup.go @@ -32,7 +32,7 @@ func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.Sta return nil, x.ErrNotSupported } -func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { +func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull bool) error { glog.Warningf("Backup failed: %v", x.ErrNotSupported) return x.ErrNotSupported } diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 62da28a5080..0173b6aa916 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -74,7 +74,7 @@ func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) return res, nil } -func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull bool) error { +func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull bool) error { if !EnterpriseEnabled() { return errors.Errorf("You must enable enterprise features first. "+ "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", @@ -104,7 +104,7 @@ func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull b if err != nil { return err } - handler, err := backup.NewUriHandler(uri, backup.GetCredentialsFromRequest(&req)) + handler, err := backup.NewUriHandler(uri, backup.GetCredentialsFromRequest(req)) if err != nil { return err } @@ -146,7 +146,7 @@ func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull b go func(req *pb.BackupRequest) { _, err := BackupGroup(ctx, req) errCh <- err - }(&req) + }(req) } for range groups { @@ -167,6 +167,6 @@ func ProcessBackupRequest(ctx context.Context, req pb.BackupRequest, forceFull b m.BackupNum = latestManifest.BackupNum + 1 } - bp := &backup.Processor{Request: &req} + bp := &backup.Processor{Request: req} return bp.CompleteBackup(ctx, &m) } From 9edc429f351e869e8b422b05736e04faf7123255 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:48:41 +0530 Subject: [PATCH 07/11] Fix another golancibot issue. --- worker/backup_ee.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 0173b6aa916..acf59ab6321 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -76,9 +76,8 @@ func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull bool) error { if !EnterpriseEnabled() { - return errors.Errorf("You must enable enterprise features first. "+ - "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.", - "Backup failed.") + return errors.New("You must enable enterprise features first. " + + "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.") } if req.Destination == "" { From 149e16507ba73a9ef1d24d7d23a7b2c9119924e0 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Fri, 31 Jan 2020 19:52:16 +0530 Subject: [PATCH 08/11] Fix a log. --- worker/backup_ee.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/backup_ee.go b/worker/backup_ee.go index acf59ab6321..251cfeee55f 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -133,7 +133,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull } } - glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups) + glog.Infof("Created backup request: %s. Groups=%v\n", req, groups) ctx, cancel := context.WithCancel(ctx) defer cancel() From 6a69feec9990144923b3efd92d635535d3c52fd4 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Mon, 3 Feb 2020 16:35:47 +0530 Subject: [PATCH 09/11] Fix race condition while sending backup request to different groups. --- ee/backup/tests/minio-large/backup_test.go | 6 ++++-- worker/backup_ee.go | 9 +++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/ee/backup/tests/minio-large/backup_test.go b/ee/backup/tests/minio-large/backup_test.go index 6492fb3a936..6b81a19d6e0 100644 --- a/ee/backup/tests/minio-large/backup_test.go +++ b/ee/backup/tests/minio-large/backup_test.go @@ -207,7 +207,8 @@ func copyToLocalFs(t *testing.T) { for object := range objectCh1 { require.NoError(t, object.Err) dstDir := backupDir + "/" + object.Key - os.MkdirAll(dstDir, os.ModePerm) + err := os.MkdirAll(dstDir, os.ModePerm) + require.NoError(t, err) // Get all the files in that folder and lsCh2 := make(chan struct{}) @@ -215,7 +216,8 @@ func copyToLocalFs(t *testing.T) { for object := range objectCh2 { require.NoError(t, object.Err) dstFile := backupDir + "/" + object.Key - mc.FGetObject(bucketName, object.Key, dstFile, minio.GetObjectOptions{}) + err := mc.FGetObject(bucketName, object.Key, dstFile, minio.GetObjectOptions{}) + require.NoError(t, err) } close(lsCh2) } diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 251cfeee55f..3cbaad3b1b1 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -23,6 +23,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" ) @@ -139,13 +140,13 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull errCh := make(chan error, len(state.Groups)) for _, gid := range groups { - req := req - req.GroupId = gid - req.Predicates = predMap[gid] + br := proto.Clone(req).(*pb.BackupRequest) + br.GroupId = gid + br.Predicates = predMap[gid] go func(req *pb.BackupRequest) { _, err := BackupGroup(ctx, req) errCh <- err - }(req) + }(br) } for range groups { From 7ad48e447dcff78d1c8f6af2ff564fdbae2527f6 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Mon, 3 Feb 2020 18:18:52 +0530 Subject: [PATCH 10/11] Fix copyright year --- graphql/admin/backup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphql/admin/backup.go b/graphql/admin/backup.go index 2759ff23ee8..8b877e58ee0 100644 --- a/graphql/admin/backup.go +++ b/graphql/admin/backup.go @@ -1,5 +1,5 @@ /* - * Copyright 2019 Dgraph Labs, Inc. and Contributors + * Copyright 2020 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 7598585bfdcd7ffe2c43a9a804588934c2bc51c2 Mon Sep 17 00:00:00 2001 From: Pawan Rawal Date: Thu, 6 Feb 2020 16:00:48 +0530 Subject: [PATCH 11/11] Address comments by Michael. --- ee/backup/tests/minio-large/backup_test.go | 3 +-- graphql/admin/admin.go | 2 ++ worker/backup_ee.go | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/ee/backup/tests/minio-large/backup_test.go b/ee/backup/tests/minio-large/backup_test.go index 6b81a19d6e0..d5eecdf3a42 100644 --- a/ee/backup/tests/minio-large/backup_test.go +++ b/ee/backup/tests/minio-large/backup_test.go @@ -207,8 +207,7 @@ func copyToLocalFs(t *testing.T) { for object := range objectCh1 { require.NoError(t, object.Err) dstDir := backupDir + "/" + object.Key - err := os.MkdirAll(dstDir, os.ModePerm) - require.NoError(t, err) + require.NoError(t, os.MkdirAll(dstDir, os.ModePerm)) // Get all the files in that folder and lsCh2 := make(chan struct{}) diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 8635fd4b933..7e899646f61 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -279,6 +279,8 @@ func newAdminResolverFactory() resolve.ResolverFactory { WithMutationResolver("backup", func(m schema.Mutation) resolve.MutationResolver { backup := &backupResolver{} + // backup implements the mutation rewriter, executor and query executor hence its passed + // thrice here. return resolve.NewMutationResolver( backup, backup, diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 3cbaad3b1b1..8c87eadfe3d 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -77,12 +77,12 @@ func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.Status, error) func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull bool) error { if !EnterpriseEnabled() { - return errors.New("You must enable enterprise features first. " + + return errors.New("you must enable enterprise features first. " + "Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.") } if req.Destination == "" { - return errors.Errorf("You must specify a 'destination' value") + return errors.Errorf("you must specify a 'destination' value") } if err := x.HealthCheck(); err != nil {