Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL Admin API: Support Backup operation #4706

Merged
merged 11 commits into from
Feb 7, 2020
112 changes: 5 additions & 107 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,10 @@ package alpha
import (
"context"
"net/http"
"net/url"
"time"

"github.com/dgraph-io/dgraph/ee/backup"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/pkg/errors"
)

func init() {
Expand All @@ -44,123 +38,27 @@ func backupHandler(w http.ResponseWriter, r *http.Request) {
}) {
return
}
if !worker.EnterpriseEnabled() {
x.SetStatus(w, "You must enable enterprise features first. "+
"Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.",
"Backup failed.")
return
}

if err := processHttpBackupRequest(context.Background(), r); err != nil {
x.SetStatus(w, err.Error(), "Backup failed.")
return
}

w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Backup completed."}`)))
}

func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
destination := r.FormValue("destination")
if destination == "" {
return errors.Errorf("You must specify a 'destination' value")
}

accessKey := r.FormValue("access_key")
secretKey := r.FormValue("secret_key")
sessionToken := r.FormValue("session_token")
anonymous := r.FormValue("anonymous") == "true"
forceFull := r.FormValue("force_full") == "true"

if err := x.HealthCheck(); err != nil {
glog.Errorf("Backup canceled, not ready to accept requests: %s", err)
return err
}

ts, err := worker.Timestamps(ctx, &pb.Num{ReadOnly: true})
if err != nil {
glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err)
return err
}

req := pb.BackupRequest{
ReadTs: ts.ReadOnly,
Destination: destination,
UnixTs: time.Now().UTC().Format("20060102.150405.000"),
AccessKey: accessKey,
SecretKey: secretKey,
SessionToken: sessionToken,
Anonymous: anonymous,
}

// Read the manifests to get the right timestamp from which to start the backup.
uri, err := url.Parse(req.Destination)
if err != nil {
return err
}
handler, err := backup.NewUriHandler(uri, backup.GetCredentialsFromRequest(&req))
if err != nil {
return err
}
latestManifest, err := handler.GetLatestManifest(uri)
if err != nil {
return err
}
req.SinceTs = latestManifest.Since
if forceFull {
req.SinceTs = 0
}

// Update the membership state to get the latest mapping of groups to predicates.
if err := worker.UpdateMembershipState(ctx); err != nil {
return err
}

// Get the current membership state and parse it for easier processing.
state := worker.GetMembershipState()
var groups []uint32
predMap := make(map[uint32][]string)
for gid, group := range state.Groups {
groups = append(groups, gid)
predMap[gid] = make([]string, 0)
for pred := range group.Tablets {
predMap[gid] = append(predMap[gid], pred)
}
}

glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

errCh := make(chan error, len(state.Groups))
for _, gid := range groups {
req := req
req.GroupId = gid
req.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
_, err := worker.BackupGroup(ctx, req)
errCh <- err
}(&req)
}

for range groups {
if err := <-errCh; err != nil {
glog.Errorf("Error received during backup: %v", err)
return err
}
}

m := backup.Manifest{Since: req.ReadTs, Groups: predMap}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
m.BackupNum = 1
} else {
m.Type = "incremental"
m.BackupId = latestManifest.BackupId
m.BackupNum = latestManifest.BackupNum + 1
if err := worker.ProcessBackupRequest(context.Background(), &req, forceFull); err != nil {
x.SetStatus(w, err.Error(), "Backup failed.")
return
}

bp := &backup.Processor{Request: &req}
return bp.CompleteBackup(ctx, &m)
w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Backup completed."}`)))
}
30 changes: 29 additions & 1 deletion graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
graphqlAdminSchema = `
type GQLSchema @dgraph(type: "dgraph.graphql") {
id: ID!
schema: String! @dgraph(type: "dgraph.graphql.schema")
schema: String! @dgraph(type: "dgraph.graphql.schema")
generatedSchema: String!
}

Expand Down Expand Up @@ -85,13 +85,32 @@ const (
schema: String!
}

input BackupInput {
destination: String!
accessKey: String
secretKey: String
sessionToken: String
anonymous: Boolean
forceFull: Boolean
}

type Response {
code: String
message: String
}

type BackupPayload {
response: Response
}

type Query {
getGQLSchema: GQLSchema
health: Health
}

type Mutation {
updateGQLSchema(input: UpdateGQLSchemaInput!) : UpdateGQLSchemaPayload
backup(input: BackupInput!) : BackupPayload
}
`
)
Expand Down Expand Up @@ -257,6 +276,15 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady)}
})
}).
WithMutationResolver("backup", func(m schema.Mutation) resolve.MutationResolver {
backup := &backupResolver{}

return resolve.NewMutationResolver(
backup,
backup,
backup,
resolve.StdMutationCompletion(m.ResponseName()))
}).
WithSchemaIntrospection()

return rf
Expand Down
120 changes: 120 additions & 0 deletions graphql/admin/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package admin

import (
"bytes"
"context"
"encoding/json"

dgoapi "github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/gql"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

type backupResolver struct {
mutation schema.Mutation
}

type backupInput struct {
Destination string
AccessKey string
SecretKey string
SessionToken string
Anonymous bool
ForceFull bool
}

func (br *backupResolver) Rewrite(
m schema.Mutation) (*gql.GraphQuery, []*dgoapi.Mutation, error) {
glog.Info("Got backup request")

br.mutation = m
input, err := getBackupInput(m)
if err != nil {
return nil, nil, err
}

err = worker.ProcessBackupRequest(context.Background(), &pb.BackupRequest{
Destination: input.Destination,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
}, input.ForceFull)

return nil, nil, err
}

func (br *backupResolver) FromMutationResult(
mutation schema.Mutation,
assigned map[string]string,
result map[string]interface{}) (*gql.GraphQuery, error) {

return nil, nil
}

func (br *backupResolver) Mutate(
ctx context.Context,
query *gql.GraphQuery,
mutations []*dgoapi.Mutation) (map[string]string, map[string]interface{}, error) {

return nil, nil, nil
}

func (br *backupResolver) Query(ctx context.Context, query *gql.GraphQuery) ([]byte, error) {
var buf bytes.Buffer

x.Check2(buf.WriteString(`{ "`))
x.Check2(buf.WriteString(br.mutation.SelectionSet()[0].ResponseName() + `": [{`))

for i, sel := range br.mutation.SelectionSet()[0].SelectionSet() {
var val string
switch sel.Name() {
case "code":
val = "Success"
case "message":
val = "Backup completed."
}
if i != 0 {
x.Check2(buf.WriteString(","))
}
x.Check2(buf.WriteString(`"`))
x.Check2(buf.WriteString(sel.ResponseName()))
x.Check2(buf.WriteString(`":`))
x.Check2(buf.WriteString(`"` + val + `"`))
}
x.Check2(buf.WriteString("}]}"))

return buf.Bytes(), nil
}

func getBackupInput(m schema.Mutation) (*backupInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
if err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var input backupInput
err = json.Unmarshal(inputByts, &input)
return &input, schema.GQLWrapf(err, "couldn't get input argument")
}
5 changes: 5 additions & 0 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ func (w *grpcWorker) Backup(ctx context.Context, req *pb.BackupRequest) (*pb.Sta
glog.Warningf("Backup failed: %v", x.ErrNotSupported)
return nil, x.ErrNotSupported
}

func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull bool) error {
glog.Warningf("Backup failed: %v", x.ErrNotSupported)
return x.ErrNotSupported
}
Loading