Skip to content

Commit

Permalink
Update code to use new grpc API for Multiple Mutation
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Aug 20, 2019
1 parent e4b1ab7 commit 15a06a9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 62 deletions.
2 changes: 1 addition & 1 deletion contrib/integration/bigdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
// schema items.
resp, err := c.NewTxn().Query(ctx, "schema {}")
x.Check(err)
if len(resp.Schema) < 5 {
if len(resp.Json) < 5 {
// Run each schema alter separately so that there is an even
// distribution among all groups.
for _, s := range schema() {
Expand Down
9 changes: 6 additions & 3 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) {
mu = &api.Mutation{}
mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid))

mu.CommitNow = true
res, err := dc.Mutate(context.Background(), mu)
muReq := api.Request{
Mutations: []*api.Mutation{mu},
CommitNow: true,
}
res, err := dc.Query(context.Background(), &muReq)
if err != nil {
log.Fatalf("Error while running mutation: %v\n", err)
}
x.AssertTrue(res.Context.StartTs > 0)
x.AssertTrue(res.Txn.StartTs > 0)
resp, err = dc.Query(context.Background(), &req)
if err != nil {
log.Fatalf("Error while running query: %v\n", err)
Expand Down
27 changes: 14 additions & 13 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// start parsing the query
parseStart := time.Now()

var mu *api.Mutation
var req *api.Request
contentType := r.Header.Get("Content-Type")
switch strings.ToLower(contentType) {
case "application/json":
Expand All @@ -312,15 +312,16 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
return
}

mu = &api.Mutation{}
mu := &api.Mutation{}
req = &api.Request{Mutations: []*api.Mutation{mu}}
if setJSON, ok := ms["set"]; ok && setJSON != nil {
mu.SetJson = setJSON.bs
}
if delJSON, ok := ms["delete"]; ok && delJSON != nil {
mu.DeleteJson = delJSON.bs
}
if queryText, ok := ms["query"]; ok && queryText != nil {
mu.Query, err = strconv.Unquote(string(queryText.bs))
req.Query, err = strconv.Unquote(string(queryText.bs))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {

case "application/rdf":
// Parse N-Quads.
mu, err = gql.ParseMutation(string(body))
req, err = gql.ParseMutation(string(body))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
Expand All @@ -351,26 +352,26 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
// end of query parsing
parseEnd := time.Now()

mu.StartTs = startTs
mu.CommitNow = commitNow
req.StartTs = startTs
req.CommitNow = commitNow

ctx := attachAccessJwt(context.Background(), r)
resp, err := (&edgraph.Server{}).Mutate(ctx, mu)
resp, err := (&edgraph.Server{}).Query(ctx, req)
if err != nil {
x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error())
return
}

resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds())
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
Latency: resp.Latency,
}
sort.Strings(e.Txn.Keys)
sort.Strings(e.Txn.Preds)

// Don't send keys array which is part of txn context if its commit immediately.
if mu.CommitNow {
if req.CommitNow {
e.Txn.Keys = e.Txn.Keys[:0]
}

Expand Down Expand Up @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error
return nil, err
}

resp := &api.Assigned{}
resp.Context = tc
resp.Context.CommitTs = cts
resp := &api.Response{}
resp.Txn = tc
resp.Txn.CommitTs = cts
e := query.Extensions{
Txn: resp.Context,
Txn: resp.Txn,
}
e.Txn.Keys = e.Txn.Keys[:0]
response := map[string]interface{}{}
Expand Down
19 changes: 11 additions & 8 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ const queryUser = `

// authorizeUser queries the user with the given user id, and returns the associated uid,
// acl groups, and whether the password stored in DB matches the supplied password
func authorizeUser(ctx context.Context, userid string, password string) (*acl.User,
error) {
func authorizeUser(ctx context.Context, userid string, password string) (
*acl.User, error) {

queryVars := map[string]string{
"$userid": userid,
"$password": password,
Expand All @@ -270,7 +271,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
glog.Errorf("Error while query user with id %s: %v", userid, err)
return nil, err
Expand Down Expand Up @@ -304,7 +305,7 @@ func RefreshAcls(closer *y.Closer) {

ctx := context.Background()
var err error
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand Down Expand Up @@ -356,7 +357,7 @@ func ResetAcl() {
Vars: queryVars,
}

queryResp, err := (&Server{}).doQuery(ctx, &queryRequest)
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, false)
if err != nil {
return errors.Wrapf(err, "while querying user with id %s", x.GrootId)
}
Expand All @@ -373,13 +374,15 @@ func ResetAcl() {

// Insert Groot.
createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password")
mu := &api.Mutation{
req := &api.Request{
StartTs: startTs,
CommitNow: true,
Set: createUserNQuads,
Mutations: []*api.Mutation{
&api.Mutation{Set: createUserNQuads},
},
}

if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil {
if _, err := (&Server{}).doMutate(context.Background(), req, false); err != nil {
return err
}
glog.Infof("Successfully upserted the groot account")
Expand Down
84 changes: 47 additions & 37 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,24 +419,24 @@ func annotateStartTs(span *otrace.Span, ts uint64) {
span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "")
}

// Mutate handles requests to perform mutations.
func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (*api.Assigned, error) {
return s.doMutate(ctx, mu, true)
}

func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) (
resp *api.Assigned, rerr error) {
func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize bool) (
resp *api.Response, rerr error) {

if ctx.Err() != nil {
return nil, ctx.Err()
}

if len(req.Mutations) != 1 {
return nil, errors.Errorf("Only 1 mutation per request is supported")
}
mu := req.Mutations[0]

if !isMutationAllowed(ctx) {
return resp, errors.Errorf("No mutations allowed.")
}

var parsingTime time.Duration
resp = &api.Assigned{}
resp = &api.Response{}

start := time.Now()
defer func() {
Expand Down Expand Up @@ -466,7 +466,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
}

ostats.Record(ctx, x.NumMutations.M(1))
if mu.Query != "" {
if req.Query != "" {
span.Annotatef(nil, "Got Mutation with Upsert Block: %s", mu)
}
if len(mu.SetJson) > 0 {
Expand All @@ -493,12 +493,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
return resp, errors.Errorf("Empty mutation")
}

if mu.StartTs == 0 {
mu.StartTs = State.getTimestamp(false)
if req.StartTs == 0 {
req.StartTs = State.getTimestamp(false)
}
annotateStartTs(span, mu.StartTs)
annotateStartTs(span, req.StartTs)

l, err := doQueryInUpsert(ctx, mu, gmu)
l, err := doQueryInUpsert(ctx, req, gmu)
if err != nil {
return resp, err
}
Expand All @@ -514,11 +514,11 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
return resp, err
}

m := &pb.Mutations{Edges: edges, StartTs: mu.StartTs}
m := &pb.Mutations{Edges: edges, StartTs: req.StartTs}
span.Annotatef(nil, "Applying mutations: %+v", m)
resp.Context, err = query.ApplyMutations(ctx, m)
span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err)
if !mu.CommitNow {
resp.Txn, err = query.ApplyMutations(ctx, m)
span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)
if !req.CommitNow {
if err == y.ErrConflict {
err = status.Error(codes.FailedPrecondition, err.Error())
}
Expand All @@ -531,12 +531,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
// ApplyMutations failed. We now want to abort the transaction,
// ignoring any error that might occur during the abort (the user would
// care more about the previous error).
if resp.Context == nil {
resp.Context = &api.TxnContext{StartTs: mu.StartTs}
if resp.Txn == nil {
resp.Txn = &api.TxnContext{StartTs: req.StartTs}
}

resp.Context.Aborted = true
_, _ = worker.CommitOverNetwork(ctx, resp.Context)
resp.Txn.Aborted = true
_, _ = worker.CommitOverNetwork(ctx, resp.Txn)

if err == y.ErrConflict {
// We have already aborted the transaction, so the error message should reflect that.
Expand All @@ -547,36 +547,41 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
}

span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err)
ctxn := resp.Context
ctxn := resp.Txn
// zero would assign the CommitTs
cts, err := worker.CommitOverNetwork(ctx, ctxn)
span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err)
if err != nil {
if err == y.ErrAborted {
err = status.Errorf(codes.Aborted, err.Error())
resp.Context.Aborted = true
resp.Txn.Aborted = true
}

return resp, err
}

// CommitNow was true, no need to send keys.
resp.Context.Keys = resp.Context.Keys[:0]
resp.Context.CommitTs = cts
resp.Txn.Keys = resp.Txn.Keys[:0]
resp.Txn.CommitTs = cts

return resp, nil
}

// doQueryInUpsert processes the query in upsert block.
func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) (
func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) (
*query.Latency, error) {

l := &query.Latency{}
if mu.Query == "" {
if req.Query == "" {
return l, nil
}

upsertQuery := mu.Query
// if err := authorizeQuery(ctx, req); err != nil {
// return nil, err
// }

mu := req.Mutations[0]
upsertQuery := req.Query
needVars := findVars(gmu)
isCondUpsert := strings.TrimSpace(mu.Cond) != ""
varName := fmt.Sprintf("__dgraph%d__", rand.Int())
Expand All @@ -600,7 +605,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) (
// The variable __dgraph0__ will -
// * be empty if the condition is true
// * have 1 UID (the 0 UID) if the condition is false
upsertQuery = strings.TrimSuffix(strings.TrimSpace(mu.Query), "}")
upsertQuery = strings.TrimSuffix(strings.TrimSpace(req.Query), "}")
upsertQuery += varName + ` as var(func: uid(0)) ` + cond + `}`
needVars = append(needVars, varName)
}
Expand All @@ -618,7 +623,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) (
return nil, errors.Wrapf(err, "while validating query: %q", upsertQuery)
}

qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs}
qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: req.StartTs}
if err := qr.ProcessQuery(ctx); err != nil {
return nil, errors.Wrapf(err, "while processing query: %q", upsertQuery)
}
Expand Down Expand Up @@ -751,24 +756,29 @@ func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) {

// Query handles queries and returns the data.
func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
if err := authorizeQuery(ctx, req); err != nil {
return nil, err
}
if glog.V(3) {
glog.Infof("Got a query: %+v", req)
if len(req.Mutations) > 0 {
return s.doMutate(ctx, req, true)
}

return s.doQuery(ctx, req)
return s.doQuery(ctx, req, true)
}

// This method is used to execute the query and return the response to the
// client as a protocol buffer message.
func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Response, rerr error) {
func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize bool) (
resp *api.Response, rerr error) {

if ctx.Err() != nil {
return nil, ctx.Err()
}
startTime := time.Now()

if authorize {
if err := authorizeQuery(ctx, req); err != nil {
return nil, err
}
}

var measurements []ostats.Measurement
ctx, span := otrace.StartSpan(ctx, methodQuery)
ctx = x.WithMethod(ctx, methodQuery)
Expand Down

0 comments on commit 15a06a9

Please sign in to comment.