Skip to content

Commit

Permalink
Add support for Conditional Upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Aug 7, 2019
1 parent 5c9fa2e commit f44ea95
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 53 deletions.
7 changes: 7 additions & 0 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
if condText, ok := ms["cond"]; ok && condText != nil {
mu.Query, err = strconv.Unquote(string(condText.bs))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
}
}

case "application/rdf":
// Parse N-Quads.
Expand Down
68 changes: 66 additions & 2 deletions dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
"sync"
"testing"

"github.com/dgraph-io/dgraph/testutil"

"github.com/dgraph-io/dgo/y"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -794,3 +793,68 @@ upsert {
_, _, _, err := mutationWithTs(m, "application/rdf", false, true, true, 0)
require.NoError(t, err)
}

func TestConditionalUpsertExample0(t *testing.T) {
require.NoError(t, dropAll())
require.NoError(t, alterSchema(`email: string @index(exact) .`))

// Mutation with wrong name
m1 := `
upsert {
mutation {
set {
uid(v) <name> "Wrong" .
uid(v) <email> "ashish@dgraph.io" .
}
}
query {
me(func: eq(email, "ashish@dgraph.io")) {
v as uid
}
}
}`
keys, preds, _, err := mutationWithTs(m1, "application/rdf", false, true, true, 0)
require.NoError(t, err)
require.True(t, len(keys) == 0)
require.True(t, contains(preds, "email"))
require.True(t, contains(preds, "name"))

// query should return the wrong name
q1 := `
{
q(func: has(email)) {
uid
name
email
}
}`
res, _, err := queryWithTs(q1, "application/graphql+-", "", 0)
require.NoError(t, err)
require.Contains(t, res, "Wrong")

// mutation with correct name
m2 := `
upsert {
mutation @if(eq(len(v), 1)) {
set {
uid(v) <name> "Ashish" .
}
}
query {
me(func: eq(email, "ashish@dgraph.io")) {
v as uid
}
}
}`
keys, preds, _, err = mutationWithTs(m2, "application/rdf", false, true, true, 0)
require.NoError(t, err)
require.True(t, len(keys) == 0)
require.True(t, contains(preds, "name"))

// query should return correct name
res, _, err = queryWithTs(q1, "application/graphql+-", "", 0)
require.NoError(t, err)
require.Contains(t, res, "Ashish")
}
105 changes: 79 additions & 26 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,45 +571,74 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool)
func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) (
*query.Latency, error) {

isThisUpsert := mu.Query != ""
isThisCondUpsert := strings.TrimSpace(mu.Cond) != ""

l := &query.Latency{}
if mu.Query == "" {
if !isThisUpsert {
return l, nil
}

queryWithIf := mu.Query
needVars := findVars(gmu)
if isThisCondUpsert {
// currently we do not have support for @if directly.
cond := strings.Replace(mu.Cond, "@if", "@filter", 1)

// remove the trailing closing brace and add the query to
// find out whether the if condition is true or false.
queryWithIf = strings.TrimSuffix(strings.TrimSpace(mu.Query), "}") +
` var(func: uid(0x01)) ` + cond + ` { __uid__ as uid }` + `}`
needVars = append(needVars, "__uid__")
}

startParsingTime := time.Now()
parsedReq, err := gql.ParseWithNeedVars(gql.Request{
Str: mu.Query,
Str: queryWithIf,
Variables: make(map[string]string),
}, needVars)
l.Parsing += time.Since(startParsingTime)
if err != nil {
return nil, errors.Wrapf(err, "while parsing query: %q", mu.Query)
return nil, errors.Wrapf(err, "while parsing query: %q", queryWithIf)
}
if err := validateQuery(parsedReq.Query); err != nil {
return nil, errors.Wrapf(err, "while validating query: %q", mu.Query)
return nil, errors.Wrapf(err, "while validating query: %q", queryWithIf)
}

qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs}
if err := qr.ProcessQuery(ctx); err != nil {
return nil, errors.Wrapf(err, "while processing query: %q", mu.Query)
return nil, errors.Wrapf(err, "while processing query: %q", queryWithIf)
}

if len(qr.Vars) <= 0 {
return nil, errors.Errorf("upsert query block has no variables")
}

// TODO(Aman): allow multiple values for each variable.
// If a variable doesn't have any UID, we generate one ourselves later.
varToUID := make(map[string]string)
varToUID := make(map[string][]string)
for name, v := range qr.Vars {
if v.Uids == nil {
continue
}
if len(v.Uids.Uids) > 1 {
return nil, errors.Errorf("more than one values found for var (%s)", name)
} else if len(v.Uids.Uids) == 1 {
varToUID[name] = fmt.Sprintf("%d", v.Uids.Uids[0])

if len(v.Uids.Uids) > 0 {
uids := make([]string, len(v.Uids.Uids))
for i, u := range v.Uids.Uids {
uids[i] = fmt.Sprintf("%d", u)
}

varToUID[name] = uids
}
}

// Conditional mutation, we simply return in case condition is false
if isThisCondUpsert {
v, ok := qr.Vars["__uid__"]
isMut := ok && (len(v.Uids.Uids) == 1)
if !isMut {
gmu.Set = nil
gmu.Del = nil
return l, nil
}
}

Expand Down Expand Up @@ -649,39 +678,63 @@ func findVars(gmu *gql.Mutation) []string {
// updateMutations does following transformations:
// * uid(v) -> 0x123 -- If v is defined in query block
// * uid(v) -> _:uid(v) -- Otherwise
func updateMutations(gmu *gql.Mutation, varToUID map[string]string) {
getNewVal := func(s string) string {
func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) {
getNewVal := func(s string) []string {
if strings.HasPrefix(s, "uid(") {
varName := s[4 : len(s)-1]
if uid, ok := varToUID[varName]; ok {
return uid
if uids, ok := varToUID[varName]; ok {
return uids
}

return "_:" + s
return []string{"_:" + s}
}

return s
return []string{s}
}

getNewNQuad := func(nq *api.NQuad, s, o string) *api.NQuad {
// The following copy is fine because we only modify Subject and ObjectId.
// The pointer values are not modified across different copies of NQuad.
n := *nq

n.Subject = s
n.ObjectId = o
return &n
}

// Remove the mutations from gmu.Del when no UID was found.
gmuDel := gmu.Del[:0]
gmuDel := make([]*api.NQuad, 0, len(gmu.Del))
for _, nq := range gmu.Del {
nq.Subject = getNewVal(nq.Subject)
nq.ObjectId = getNewVal(nq.ObjectId)

if !strings.HasPrefix(nq.Subject, "_:uid(") &&
!strings.HasPrefix(nq.ObjectId, "_:uid(") {
newSubs := getNewVal(nq.Subject)
newObs := getNewVal(nq.ObjectId)

for _, s := range newSubs {
for _, o := range newObs {
// Blank node has no meaning in case of deletion.
if strings.HasPrefix(s, "_:uid(") ||
strings.HasPrefix(o, "_:uid(") {
continue
}

gmuDel = append(gmuDel, nq)
gmuDel = append(gmuDel, getNewNQuad(nq, s, o))
}
}
}
gmu.Del = gmuDel

// Update the values in mutation block from the query block.
gmuSet := make([]*api.NQuad, 0, len(gmu.Set))
for _, nq := range gmu.Set {
nq.Subject = getNewVal(nq.Subject)
nq.ObjectId = getNewVal(nq.ObjectId)
newSubs := getNewVal(nq.Subject)
newObs := getNewVal(nq.ObjectId)

for _, s := range newSubs {
for _, o := range newObs {
gmuSet = append(gmuSet, getNewNQuad(nq, s, o))
}
}
}
gmu.Set = gmuSet
}

// Query handles queries and returns the data.
Expand Down
34 changes: 25 additions & 9 deletions gql/parser_mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) {
item := it.Item()
switch item.Typ {
case itemUpsertBlock:
if mu, err = ParseUpsertBlock(it); err != nil {
if mu, err = parseUpsertBlock(it); err != nil {
return nil, err
}
case itemLeftCurl:
if mu, err = ParseMutationBlock(it); err != nil {
if mu, err = parseMutationBlock(it); err != nil {
return nil, err
}
default:
Expand All @@ -58,11 +58,11 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) {
return mu, nil
}

// ParseUpsertBlock parses the upsert block
func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
// parseUpsertBlock parses the upsert block
func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
var mu *api.Mutation
var queryText string
var queryFound bool
var queryText, condText string
var queryFound, condFound bool

// ===>upsert<=== {...}
if !it.Next() {
Expand All @@ -86,6 +86,7 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
return nil, errors.Errorf("Query op not found in upsert block")
} else {
mu.Query = queryText
mu.Cond = condText
return mu, nil
}

Expand All @@ -109,8 +110,23 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
if !it.Next() {
return nil, errors.Errorf("Unexpected end of upsert block")
}

// upsert { mutation ===>@if(...)<=== {....} query{...}}
item = it.Item()
if item.Typ == itemUpsertBlockOpContent {
if condFound {
return nil, errors.Errorf("Multiple @if directive inside upsert block")
}
condFound = true
condText = item.Val
if !it.Next() {
return nil, errors.Errorf("Unexpected end of upsert block")
}
}

// upsert @if(...) ===>{<=== ....}
var err error
if mu, err = ParseMutationBlock(it); err != nil {
if mu, err = parseMutationBlock(it); err != nil {
return nil, err
}

Expand All @@ -133,8 +149,8 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
return nil, errors.Errorf("Invalid upsert block")
}

// ParseMutationBlock parses the mutation block
func ParseMutationBlock(it *lex.ItemIterator) (*api.Mutation, error) {
// parseMutationBlock parses the mutation block
func parseMutationBlock(it *lex.ItemIterator) (*api.Mutation, error) {
var mu api.Mutation

item := it.Item()
Expand Down
Loading

0 comments on commit f44ea95

Please sign in to comment.