Skip to content

Commit

Permalink
Add support for Conditional Upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Aug 6, 2019
1 parent 313a0b1 commit 3f56738
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 51 deletions.
7 changes: 7 additions & 0 deletions dgraph/cmd/alpha/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) {
return
}
}
if condText, ok := ms["cond"]; ok && condText != nil {
mu.Query, err = strconv.Unquote(string(condText.bs))
if err != nil {
x.SetStatus(w, x.ErrorInvalidRequest, err.Error())
return
}
}

case "application/rdf":
// Parse N-Quads.
Expand Down
93 changes: 68 additions & 25 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,43 +576,62 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) (
return l, nil
}

queryWithIf := mu.Query
if strings.TrimSpace(mu.Cond) != "" {
cond := strings.Replace(mu.Cond, "@if", "@filter", 1)
queryWithIf = mu.Query + `\n var(func: uid(0x01)) ` + cond + ` {_uid_ as uid}`
}

needVars := findVars(gmu)
needVars = append(needVars, "_uid_")
startParsingTime := time.Now()
parsedReq, err := gql.ParseWithNeedVars(gql.Request{
Str: mu.Query,
Str: queryWithIf,
Variables: make(map[string]string),
}, needVars)
l.Parsing += time.Since(startParsingTime)
if err != nil {
return nil, errors.Wrapf(err, "while parsing query: %q", mu.Query)
return nil, errors.Wrapf(err, "while parsing query: %q", queryWithIf)
}
if err := validateQuery(parsedReq.Query); err != nil {
return nil, errors.Wrapf(err, "while validating query: %q", mu.Query)
return nil, errors.Wrapf(err, "while validating query: %q", queryWithIf)
}

qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs}
if err := qr.ProcessQuery(ctx); err != nil {
return nil, errors.Wrapf(err, "while processing query: %q", mu.Query)
return nil, errors.Wrapf(err, "while processing query: %q", queryWithIf)
}

if len(qr.Vars) <= 0 {
return nil, errors.Errorf("upsert query block has no variables")
}

// TODO(Aman): allow multiple values for each variable.
// If a variable doesn't have any UID, we generate one ourselves later.
varToUID := make(map[string]string)
varToUID := make(map[string][]string)
for name, v := range qr.Vars {
if v.Uids == nil {
continue
}
if len(v.Uids.Uids) > 1 {
return nil, errors.Errorf("more than one values found for var (%s)", name)
} else if len(v.Uids.Uids) == 1 {
varToUID[name] = fmt.Sprintf("%d", v.Uids.Uids[0])

if len(v.Uids.Uids) > 0 {
uids := make([]string, len(v.Uids.Uids))
for i, u := range v.Uids.Uids {
uids[i] = fmt.Sprintf("%d", u)
}

varToUID[name] = uids
}
}

// Conditional mutation, we simply return in case condition is false
v, ok := qr.Vars["_uid_"]
isMut := ok && (len(v.Uids.Uids) == 1)
if !isMut {
gmu.Set = nil
gmu.Del = nil
return l, nil
}

updateMutations(gmu, varToUID)
return l, nil
}
Expand Down Expand Up @@ -649,39 +668,63 @@ func findVars(gmu *gql.Mutation) []string {
// updateMutations does following transformations:
// * uid(v) -> 0x123 -- If v is defined in query block
// * uid(v) -> _:uid(v) -- Otherwise
func updateMutations(gmu *gql.Mutation, varToUID map[string]string) {
getNewVal := func(s string) string {
func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) {
getNewVal := func(s string) []string {
if strings.HasPrefix(s, "uid(") {
varName := s[4 : len(s)-1]
if uid, ok := varToUID[varName]; ok {
return uid
if uids, ok := varToUID[varName]; ok {
return uids
}

return "_:" + s
return []string{"_:" + s}
}

return s
return []string{s}
}

getNewNQuad := func(nq *api.NQuad, s, o string) *api.NQuad {
// The following copy is fine because we only modify Subject and ObjectId.
// The pointer values are not modified across different copies of NQuad.
n := *nq

n.Subject = s
n.ObjectId = o
return &n
}

// Remove the mutations from gmu.Del when no UID was found.
gmuDel := gmu.Del[:0]
gmuDel := make([]*api.NQuad, 0, len(gmu.Del))
for _, nq := range gmu.Del {
nq.Subject = getNewVal(nq.Subject)
nq.ObjectId = getNewVal(nq.ObjectId)

if !strings.HasPrefix(nq.Subject, "_:uid(") &&
!strings.HasPrefix(nq.ObjectId, "_:uid(") {
newSubs := getNewVal(nq.Subject)
newObs := getNewVal(nq.ObjectId)

for _, s := range newSubs {
for _, o := range newObs {
// Blank node has no meaning in case of deletion.
if strings.HasPrefix(s, "_:uid(") ||
strings.HasPrefix(o, "_:uid(") {
continue
}

gmuDel = append(gmuDel, nq)
gmuDel = append(gmuDel, getNewNQuad(nq, s, o))
}
}
}
gmu.Del = gmuDel

// Update the values in mutation block from the query block.
gmuSet := make([]*api.NQuad, 0, len(gmu.Set))
for _, nq := range gmu.Set {
nq.Subject = getNewVal(nq.Subject)
nq.ObjectId = getNewVal(nq.ObjectId)
newSubs := getNewVal(nq.Subject)
newObs := getNewVal(nq.ObjectId)

for _, s := range newSubs {
for _, o := range newObs {
gmuSet = append(gmuSet, getNewNQuad(nq, s, o))
}
}
}
gmu.Set = gmuSet
}

// Query handles queries and returns the data.
Expand Down
2 changes: 1 addition & 1 deletion gql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func ParseWithNeedVars(r Request, needVars []string) (res Result, rerr error) {
return res, err
}

// Substitute all variables with corresponding values
// Substitute all graphql variables with corresponding values
if err := substituteVariables(qu, vmap); err != nil {
return res, err
}
Expand Down
34 changes: 25 additions & 9 deletions gql/parser_mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) {
item := it.Item()
switch item.Typ {
case itemUpsertBlock:
if mu, err = ParseUpsertBlock(it); err != nil {
if mu, err = parseUpsertBlock(it); err != nil {
return nil, err
}
case itemLeftCurl:
if mu, err = ParseMutationBlock(it); err != nil {
if mu, err = parseMutationBlock(it); err != nil {
return nil, err
}
default:
Expand All @@ -58,11 +58,11 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) {
return mu, nil
}

// ParseUpsertBlock parses the upsert block
func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
// parseUpsertBlock parses the upsert block
func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
var mu *api.Mutation
var queryText string
var queryFound bool
var queryText, condText string
var queryFound, condFound bool

// ===>upsert<=== {...}
if !it.Next() {
Expand All @@ -86,6 +86,7 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
return nil, errors.Errorf("Query op not found in upsert block")
} else {
mu.Query = queryText
mu.Cond = condText
return mu, nil
}

Expand All @@ -109,8 +110,23 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
if !it.Next() {
return nil, errors.Errorf("Unexpected end of upsert block")
}

// upsert { mutation ===>@if(...)<=== {....} query{...}}
item = it.Item()
if item.Typ == itemUpsertBlockOpContent {
if condFound {
return nil, errors.Errorf("Multiple @if directive inside upsert block")
}
condFound = true
condText = item.Val
if !it.Next() {
return nil, errors.Errorf("Unexpected end of upsert block")
}
}

// upsert @if(...) ===>{<=== ....}
var err error
if mu, err = ParseMutationBlock(it); err != nil {
if mu, err = parseMutationBlock(it); err != nil {
return nil, err
}

Expand All @@ -133,8 +149,8 @@ func ParseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) {
return nil, errors.Errorf("Invalid upsert block")
}

// ParseMutationBlock parses the mutation block
func ParseMutationBlock(it *lex.ItemIterator) (*api.Mutation, error) {
// parseMutationBlock parses the mutation block
func parseMutationBlock(it *lex.ItemIterator) (*api.Mutation, error) {
var mu api.Mutation

item := it.Item()
Expand Down
47 changes: 31 additions & 16 deletions gql/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ func lexIdentifyBlock(l *lex.Lexer) lex.StateFn {
func lexNameBlock(l *lex.Lexer) lex.StateFn {
for {
// The caller already checked isNameBegin, and absorbed one rune.
r := l.Next()
if isNameSuffix(r) {
continue
}
l.Backup()
l.AcceptRun(isNameSuffix)
switch word := l.Input[l.Start:l.Pos]; word {
case "upsert":
l.Emit(itemUpsertBlock)
Expand Down Expand Up @@ -140,11 +136,7 @@ func lexUpsertBlock(l *lex.Lexer) lex.StateFn {
func lexNameUpsertOp(l *lex.Lexer) lex.StateFn {
for {
// The caller already checked isNameBegin, and absorbed one rune.
r := l.Next()
if isNameSuffix(r) {
continue
}
l.Backup()
l.AcceptRun(isNameSuffix)
word := l.Input[l.Start:l.Pos]
switch word {
case "query":
Expand All @@ -164,33 +156,56 @@ func lexNameUpsertOp(l *lex.Lexer) lex.StateFn {

// lexBlockContent lexes and absorbs the text inside a block (covered by braces).
func lexBlockContent(l *lex.Lexer) lex.StateFn {
return lexContent(l, leftCurl, rightCurl, lexUpsertBlock)
}

// lexIfContent lexes the whole of @if directive in a mutation block (covered by small brackets)
func lexIfContent(l *lex.Lexer) lex.StateFn {
if r := l.Next(); r != at {
return l.Errorf("Expected [@], found; [%#U]", r)
}

l.AcceptRun(isNameSuffix)
word := l.Input[l.Start:l.Pos]
if word != "@if" {
return l.Errorf("Expected @if, found [%v]", word)
}

return lexContent(l, '(', ')', lexInsideMutation)
}

func lexContent(l *lex.Lexer, leftRune, rightRune rune, returnTo lex.StateFn) lex.StateFn {
depth := 0
for {
switch l.Next() {
case lex.EOF:
return l.Errorf("Unclosed block (matching braces not found)")
return l.Errorf("Matching brackets not found")
case quote:
if err := l.LexQuotedString(); err != nil {
return l.Errorf(err.Error())
}
case leftCurl:
case leftRune:
depth++
case rightCurl:
case rightRune:
depth--
if depth < 0 {
return l.Errorf("Unopened } found")
return l.Errorf("Unopened %s found", rightRune)
} else if depth == 0 {
l.Emit(itemUpsertBlockOpContent)
return lexUpsertBlock
return returnTo
}
}
}

}

func lexInsideMutation(l *lex.Lexer) lex.StateFn {
l.Mode = lexInsideMutation
for {
switch r := l.Next(); {
case r == at:
l.Backup()
return lexIfContent
case r == rightCurl:
l.Depth--
l.Emit(itemRightCurl)
Expand Down Expand Up @@ -586,7 +601,7 @@ func lexOperationType(l *lex.Lexer) lex.StateFn {
l.Emit(itemOpType)
return lexInsideSchema
} else {
l.Errorf("Invalid operation type: %s", word)
return l.Errorf("Invalid operation type: %s", word)
}

return lexQuery
Expand Down
Loading

0 comments on commit 3f56738

Please sign in to comment.