Skip to content

Commit

Permalink
Merge branch 'master' into naman/drop-prefix-nb
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanJain8 authored May 4, 2021
2 parents c872aa4 + 6b188f2 commit 82d4b4a
Show file tree
Hide file tree
Showing 36 changed files with 1,726 additions and 1,070 deletions.
42 changes: 0 additions & 42 deletions dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ func getAdminMux() *http.ServeMux {
http.MethodPut: true,
http.MethodPost: true,
}, adminAuthHandler(http.HandlerFunc(drainingHandler))))
adminMux.Handle("/admin/export", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
adminAuthHandler(http.HandlerFunc(exportHandler))))
adminMux.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPut: true,
Expand Down Expand Up @@ -160,46 +158,6 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)))
}

func exportHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
x.SetHttpStatus(w, http.StatusBadRequest, "Parse of export request failed.")
return
}

format := worker.DefaultExportFormat
if vals, ok := r.Form["format"]; ok {
if len(vals) > 1 {
x.SetHttpStatus(w, http.StatusBadRequest,
"Only one export format may be specified.")
return
}
format = worker.NormalizeExportFormat(vals[0])
if format == "" {
x.SetHttpStatus(w, http.StatusBadRequest, "Invalid export format.")
return
}
}

gqlReq := &schema.Request{
Query: `
mutation export($format: String) {
export(input: {format: $format}) {
response {
code
}
}
}`,
Variables: map[string]interface{}{},
}

if resp := resolveWithAdminServer(gqlReq, r, adminServer); len(resp.Errors) != 0 {
x.SetStatus(w, resp.Errors[0].Message, "Export failed.")
return
}
w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Export completed."}`)))
}

func memoryLimitHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
Expand Down
66 changes: 0 additions & 66 deletions dgraph/cmd/alpha/admin_backup.go

This file was deleted.

3 changes: 3 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ they form a Raft group and provide synchronous replication.
"The SASL username for Kafka.").
Flag("sasl-password",
"The SASL password for Kafka.").
Flag("sasl-mechanism",
"The SASL mechanism for Kafka (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512)").
Flag("ca-cert",
"The path to CA cert file for TLS encryption.").
Flag("client-cert",
Expand Down Expand Up @@ -734,6 +736,7 @@ func run() {
glog.Infof("worker.Config: %+v", worker.Config)

worker.InitServerState()
worker.InitTasks()

if Alpha.Conf.GetBool("expose_trace") {
// TODO: Remove this once we get rid of event logs.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tinylib/msgp v1.1.5 // indirect
github.com/twpayne/go-geom v1.0.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/etcd v0.0.0-20190228193606-a943ad0ee4c9
go.opencensus.io v0.22.5
go.uber.org/zap v1.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,9 @@ github.com/vektah/dataloaden v0.2.1-0.20190515034641-a19b9a6e7c9e/go.mod h1:/HUd
github.com/vektah/gqlparser/v2 v2.1.0/go.mod h1:SyUiHgLATUR8BiYURfTirrTcGpcE+4XkV2se04Px1Ms=
github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
Expand Down
48 changes: 38 additions & 10 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ const (
"""
The UInt64 scalar type represents an unsigned 64‐bit numeric non‐fractional value.
UInt64 can represent values in range [0,(2^64 - 1)].
"""
"""
scalar UInt64
"""
The DateTime scalar type represents date and time as a string in RFC3339 format.
For example: "1985-04-12T23:20:50.52Z" represents 20 minutes and 50.52 seconds after the 23rd hour of April 12th, 1985 in UTC.
Expand Down Expand Up @@ -243,14 +243,19 @@ const (
anonymous: Boolean
}
input TaskInput {
id: String!
}
type Response {
code: String
message: String
}
type ExportPayload {
response: Response
exportedFiles: [String]
response: Response
taskId: String
}
type DrainingPayload {
Expand All @@ -261,6 +266,26 @@ const (
response: Response
}
type TaskPayload {
kind: TaskKind
status: TaskStatus
lastUpdated: DateTime
}
enum TaskStatus {
Queued
Running
Failed
Success
Unknown
}
enum TaskKind {
Backup
Export
Unknown
}
input ConfigInput {
"""
Estimated memory the caches can take. Actual usage by the process would be
Expand Down Expand Up @@ -367,6 +392,7 @@ const (
health: [NodeState]
state: MembershipState
config: Config
task(input: TaskInput!): TaskPayload
` + adminQueries + `
}
Expand Down Expand Up @@ -718,7 +744,6 @@ func newAdminResolver(
}

func newAdminResolverFactory() resolve.ResolverFactory {

adminMutationResolvers := map[string]resolve.MutationResolverFunc{
"addNamespace": resolveAddNamespace,
"backup": resolveBackup,
Expand Down Expand Up @@ -751,18 +776,21 @@ func newAdminResolverFactory() resolve.ResolverFactory {
WithQueryResolver("listBackups", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveListBackups)
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
false
})
WithQueryResolver("task", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveTask)
}).
WithQueryResolver("getGQLSchema", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(
func(ctx context.Context, query schema.Query) *resolve.Resolved {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: q}
})
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
false
})
})
for gqlMut, resolver := range adminMutationResolvers {
// gotta force go to evaluate the right function at each loop iteration
Expand Down
21 changes: 17 additions & 4 deletions graphql/admin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"fmt"

"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
Expand All @@ -34,27 +35,39 @@ type backupInput struct {

func resolveBackup(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
glog.Info("Got backup request")
if !worker.EnterpriseEnabled() {
err := fmt.Errorf("you must enable enterprise features first. " +
"Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.")
return resolve.EmptyResult(m, err), false
}

input, err := getBackupInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
}
if input.Destination == "" {
err := fmt.Errorf("you must specify a 'destination' value")
return resolve.EmptyResult(m, err), false
}

err = worker.ProcessBackupRequest(context.Background(), &pb.BackupRequest{
req := &pb.BackupRequest{
Destination: input.Destination,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
}, input.ForceFull)

}
taskId, err := worker.Tasks.Enqueue(req)
if err != nil {
return resolve.EmptyResult(m, err), false
}

msg := fmt.Sprintf("Backup queued with ID %s", taskId)
data := response("Success", msg)
data["taskId"] = taskId
return resolve.DataResult(
m,
map[string]interface{}{m.Name(): response("Success", "Backup completed.")},
map[string]interface{}{m.Name(): data},
nil,
), true
}
Expand Down
1 change: 1 addition & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const adminTypes = `
type BackupPayload {
response: Response
taskId: String
}
input RestoreInput {
Expand Down
24 changes: 8 additions & 16 deletions graphql/admin/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"fmt"
"math"

"github.com/dgraph-io/dgraph/graphql/resolve"
Expand All @@ -39,7 +40,6 @@ type exportInput struct {
}

func resolveExport(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {

glog.Info("Got export request through GraphQL admin API")

input, err := getExportInput(m)
Expand Down Expand Up @@ -83,38 +83,30 @@ func resolveExport(ctx context.Context, m schema.Mutation) (*resolve.Resolved, b
return resolve.EmptyResult(m, err), false
}

files, err := worker.ExportOverNetwork(ctx, &pb.ExportRequest{
req := &pb.ExportRequest{
Format: format,
Namespace: exportNs,
Destination: input.Destination,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
})
}
taskId, err := worker.Tasks.Enqueue(req)
if err != nil {
return resolve.EmptyResult(m, err), false
}

responseData := response("Success", "Export completed.")
responseData["exportedFiles"] = toInterfaceSlice(files)

msg := fmt.Sprintf("Export queued with ID %s", taskId)
data := response("Success", msg)
data["taskId"] = taskId
return resolve.DataResult(
m,
map[string]interface{}{m.Name(): responseData},
map[string]interface{}{m.Name(): data},
nil,
), true
}

// toInterfaceSlice converts []string to []interface{}
func toInterfaceSlice(in []string) []interface{} {
out := make([]interface{}, 0, len(in))
for _, s := range in {
out = append(out, s)
}
return out
}

func getExportInput(m schema.Mutation) (*exportInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
Expand Down
Loading

0 comments on commit 82d4b4a

Please sign in to comment.