Skip to content

Commit

Permalink
release/v21.03-slash: asynchronous tasks (#7781)
Browse files Browse the repository at this point in the history
* Revert dd183aa
* Revert 4418a7a
* Cherry pick cfdf7a5
* Cherry pick 60bec16
* Cherry pick 9e1337b
* Cherry pick 6b188f2
  • Loading branch information
ajeetdsouza authored May 5, 2021
1 parent 10c49f7 commit 7e2e860
Show file tree
Hide file tree
Showing 42 changed files with 2,119 additions and 1,227 deletions.
42 changes: 0 additions & 42 deletions dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ func getAdminMux() *http.ServeMux {
http.MethodPut: true,
http.MethodPost: true,
}, adminAuthHandler(http.HandlerFunc(drainingHandler))))
adminMux.Handle("/admin/export", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
adminAuthHandler(http.HandlerFunc(exportHandler))))
adminMux.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPut: true,
Expand Down Expand Up @@ -160,46 +158,6 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)))
}

func exportHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
x.SetHttpStatus(w, http.StatusBadRequest, "Parse of export request failed.")
return
}

format := worker.DefaultExportFormat
if vals, ok := r.Form["format"]; ok {
if len(vals) > 1 {
x.SetHttpStatus(w, http.StatusBadRequest,
"Only one export format may be specified.")
return
}
format = worker.NormalizeExportFormat(vals[0])
if format == "" {
x.SetHttpStatus(w, http.StatusBadRequest, "Invalid export format.")
return
}
}

gqlReq := &schema.Request{
Query: `
mutation export($format: String) {
export(input: {format: $format}) {
response {
code
}
}
}`,
Variables: map[string]interface{}{},
}

if resp := resolveWithAdminServer(gqlReq, r, adminServer); len(resp.Errors) != 0 {
x.SetStatus(w, resp.Errors[0].Message, "Export failed.")
return
}
w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Export completed."}`)))
}

func memoryLimitHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
Expand Down
68 changes: 0 additions & 68 deletions dgraph/cmd/alpha/admin_backup.go

This file was deleted.

1 change: 1 addition & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ func run() {
glog.Infof("worker.Config: %+v", worker.Config)

worker.InitServerState()
worker.InitTasks()

if Alpha.Conf.GetBool("expose_trace") {
// TODO: Remove this once we get rid of event logs.
Expand Down
6 changes: 3 additions & 3 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ type shardState struct {

func newMapperBuffer(opt *options) *z.Buffer {
sz := float64(opt.MapBufSize) * 1.1
buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap,
filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer")
tmpDir := filepath.Join(opt.TmpDir, bufferDir)
buf, err := z.NewBufferTmp(tmpDir, int(sz))
x.Check(err)
return buf
return buf.WithMaxSize(2 * int(opt.MapBufSize))
}

func newMapper(st *state) *mapper {
Expand Down
8 changes: 3 additions & 5 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,9 @@ func bufferStats(cbuf *z.Buffer) {
}

func getBuf(dir string) *z.Buffer {
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc,
filepath.Join(dir, bufferDir), "Reducer.GetBuf")
x.Check(err)
cbuf.AutoMmapAfter(1 << 30)
return cbuf
return z.NewBuffer(64<<20, "Reducer.GetBuf").
WithAutoMmap(1<<30, filepath.Join(dir, bufferDir)).
WithMaxSize(64 << 30)
}

func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) {
Expand Down
5 changes: 3 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ func (o *Oracle) Init() {
o.commits = make(map[uint64]uint64)
// Remove the older btree file, before creating NewTree, as it may contain stale data leading
// to wrong results.
o.keyCommit = z.NewTree()
o.keyCommit = z.NewTree("oracle")
o.subscribers = make(map[int]chan pb.OracleDelta)
o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
o.doneUntil.Init(nil)
go o.sendDeltasToSubscribers()
}

// oracle close releases the memory associated with btree used for keycommit.
// close releases the memory associated with btree used for keycommit.
func (o *Oracle) close() {
o.keyCommit.Close()
}

func (o *Oracle) updateStartTxnTs(ts uint64) {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ instances to achieve high-availability.
Head("Audit options").
Flag("output",
`[stdout, /path/to/dir] This specifies where audit logs should be output to.
"stdout" is for standard output. You can also specify the directory where audit logs
"stdout" is for standard output. You can also specify the directory where audit logs
will be saved. When stdout is specified as output other fields will be ignored.`).
Flag("compress",
"Enables the compression of old audit logs.").
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
Expand Down Expand Up @@ -129,8 +131,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90 h1:arWVlUO9NhZ/2vWprIqpe825GISUPpgJhU/b0ep3j/M=
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
Expand Down
48 changes: 38 additions & 10 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ const (
"""
The UInt64 scalar type represents an unsigned 64‐bit numeric non‐fractional value.
UInt64 can represent values in range [0,(2^64 - 1)].
"""
"""
scalar UInt64
"""
The DateTime scalar type represents date and time as a string in RFC3339 format.
For example: "1985-04-12T23:20:50.52Z" represents 20 minutes and 50.52 seconds after the 23rd hour of April 12th, 1985 in UTC.
Expand Down Expand Up @@ -243,14 +243,19 @@ const (
anonymous: Boolean
}
input TaskInput {
id: String!
}
type Response {
code: String
message: String
}
type ExportPayload {
response: Response
exportedFiles: [String]
response: Response
taskId: String
}
type DrainingPayload {
Expand All @@ -261,6 +266,26 @@ const (
response: Response
}
type TaskPayload {
kind: TaskKind
status: TaskStatus
lastUpdated: DateTime
}
enum TaskStatus {
Queued
Running
Failed
Success
Unknown
}
enum TaskKind {
Backup
Export
Unknown
}
input ConfigInput {
"""
Estimated memory the caches can take. Actual usage by the process would be
Expand Down Expand Up @@ -367,6 +392,7 @@ const (
health: [NodeState]
state: MembershipState
config: Config
task(input: TaskInput!): TaskPayload
` + adminQueries + `
}
Expand Down Expand Up @@ -718,7 +744,6 @@ func newAdminResolver(
}

func newAdminResolverFactory() resolve.ResolverFactory {

adminMutationResolvers := map[string]resolve.MutationResolverFunc{
"addNamespace": resolveAddNamespace,
"backup": resolveBackup,
Expand Down Expand Up @@ -751,18 +776,21 @@ func newAdminResolverFactory() resolve.ResolverFactory {
WithQueryResolver("listBackups", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveListBackups)
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
false
})
WithQueryResolver("task", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(resolveTask)
}).
WithQueryResolver("getGQLSchema", func(q schema.Query) resolve.QueryResolver {
return resolve.QueryResolverFunc(
func(ctx context.Context, query schema.Query) *resolve.Resolved {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: q}
})
}).
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
return resolve.MutationResolverFunc(
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
false
})
})
for gqlMut, resolver := range adminMutationResolvers {
// gotta force go to evaluate the right function at each loop iteration
Expand Down
23 changes: 17 additions & 6 deletions graphql/admin/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"fmt"

"github.com/dgraph-io/dgraph/graphql/resolve"
"github.com/dgraph-io/dgraph/graphql/schema"
Expand All @@ -34,29 +35,39 @@ type backupInput struct {

func resolveBackup(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
glog.Info("Got backup request")
if !worker.EnterpriseEnabled() {
err := fmt.Errorf("you must enable enterprise features first. " +
"Supply the appropriate license file to Dgraph Zero using the HTTP endpoint.")
return resolve.EmptyResult(m, err), false
}

input, err := getBackupInput(m)
if err != nil {
return resolve.EmptyResult(m, err), false
}
if input.Destination == "" {
err := fmt.Errorf("you must specify a 'destination' value")
return resolve.EmptyResult(m, err), false
}

err = worker.ProcessBackupRequest(context.Background(), &pb.BackupRequest{
req := &pb.BackupRequest{
Destination: input.Destination,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
}, input.ForceFull)

}
taskId, err := worker.Tasks.Enqueue(req)
if err != nil {
return resolve.EmptyResult(m, err), false
}

msg := fmt.Sprintf("Backup queued with ID %#x", taskId)
data := response("Success", msg)
data["taskId"] = fmt.Sprintf("%#x", taskId)
return resolve.DataResult(
m,
map[string]interface{}{
m.Name(): response("Success", "Backup queued successfully, please check /health for status."),
},
map[string]interface{}{m.Name(): data},
nil,
), true
}
Expand Down
1 change: 1 addition & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const adminTypes = `
type BackupPayload {
response: Response
taskId: String
}
input RestoreInput {
Expand Down
Loading

0 comments on commit 7e2e860

Please sign in to comment.