Skip to content

Commit

Permalink
Implement CREATE EDGE UPSERT
Browse files Browse the repository at this point in the history
Resolves: #4436
  • Loading branch information
luigidellaquila committed May 8, 2018
1 parent e65169c commit 1a3fd10
Show file tree
Hide file tree
Showing 6 changed files with 2,452 additions and 2,149 deletions.
1 change: 1 addition & 0 deletions core/src/main/grammar/OrientSQL.jjt
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,7 @@ OCreateEdgeStatement CreateEdgeStatement():
<CREATE>
<EDGE>
[ jjtThis.targetClass = Identifier() [<CLUSTER> jjtThis.targetClusterName = Identifier()]]
[ <UPSERT> { jjtThis.upsert = true; } ]
<FROM>
(
jjtThis.leftExpression = Expression()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.index.OIndex;
import com.orientechnologies.orient.core.record.OEdge;
import com.orientechnologies.orient.core.record.OElement;
import com.orientechnologies.orient.core.record.OVertex;
Expand All @@ -20,6 +21,7 @@ public class CreateEdgesStep extends AbstractExecutionStep {

private final OIdentifier targetClass;
private final OIdentifier targetCluster;
private final String uniqueIndexName;
private final OIdentifier fromAlias;
private final OIdentifier toAlias;
private final Number wait;
Expand All @@ -30,22 +32,26 @@ public class CreateEdgesStep extends AbstractExecutionStep {
Iterator fromIter;
Iterator toIterator;
OVertex currentFrom;
List toList = new ArrayList<>();
OVertex currentTo;
boolean finished = false;
List toList = new ArrayList<>();
private OIndex<?> uniqueIndex;

private boolean inited = false;

private long cost = 0;

public CreateEdgesStep(OIdentifier targetClass, OIdentifier targetClusterName, OIdentifier fromAlias, OIdentifier toAlias,
Number wait, Number retry, OBatch batch, OCommandContext ctx, boolean profilingEnabled) {
public CreateEdgesStep(OIdentifier targetClass, OIdentifier targetClusterName, String uniqueIndex, OIdentifier fromAlias,
OIdentifier toAlias, Number wait, Number retry, OBatch batch, OCommandContext ctx, boolean profilingEnabled) {
super(ctx, profilingEnabled);
this.targetClass = targetClass;
this.targetCluster = targetClusterName;
this.uniqueIndexName = uniqueIndex;
this.fromAlias = fromAlias;
this.toAlias = toAlias;
this.wait = wait;
this.retry = retry;
this.batch = batch;

}

@Override
Expand All @@ -57,43 +63,38 @@ public OResultSet syncPull(OCommandContext ctx, int nRecords) throws OTimeoutExc

@Override
public boolean hasNext() {
return (currentBatch < nRecords && (toIterator.hasNext() || (toList.size() > 0 && fromIter.hasNext())));
if (currentTo == null) {
loadNextFromTo();
}
return (currentBatch < nRecords && currentTo != null && !finished);
}

@Override
public OResult next() {
if (!toIterator.hasNext()) {
toIterator = toList.iterator();
if (!fromIter.hasNext()) {
throw new IllegalStateException();
}
currentFrom = fromIter.hasNext() ? asVertex(fromIter.next()) : null;
if (currentTo == null) {
loadNextFromTo();
}
if (currentBatch < nRecords && (toIterator.hasNext() || (toList.size() > 0 && fromIter.hasNext()))) {
long begin = profilingEnabled ? System.nanoTime() : 0;
try {

if (currentFrom == null) {
throw new OCommandExecutionException("Invalid FROM vertex for edge");
if (finished || currentBatch >= nRecords) {
throw new IllegalStateException();
}
if (currentTo == null) {
throw new OCommandExecutionException("Invalid TO vertex for edge");
}

Object obj = toIterator.next();
long begin = profilingEnabled ? System.nanoTime() : 0;
try {
OVertex currentTo = asVertex(obj);
if (currentTo == null) {
throw new OCommandExecutionException("Invalid TO vertex for edge");
}

OEdge edge = currentFrom.addEdge(currentTo, targetClass.getStringValue());

OUpdatableResult result = new OUpdatableResult(edge);
result.setElement(edge);
currentBatch++;
return result;
} finally {
if(profilingEnabled){cost += (System.nanoTime() - begin);}
OEdge edge = currentFrom.addEdge(currentTo, targetClass.getStringValue());

OUpdatableResult result = new OUpdatableResult(edge);
result.setElement(edge);
currentTo = null;
currentBatch++;
return result;
} finally {
if (profilingEnabled) {
cost += (System.nanoTime() - begin);
}
} else {
throw new IllegalStateException();
}
}

Expand Down Expand Up @@ -136,8 +137,8 @@ private void init() {
}

fromIter = (Iterator) fromValues;
if(fromIter instanceof OResultSet){
try{
if (fromIter instanceof OResultSet) {
try {
((OResultSet) fromIter).reset();
} catch (Exception ignore) {
}
Expand All @@ -150,15 +151,75 @@ private void init() {
}

toIterator = toList.iterator();
if(toIter instanceof OResultSet){
try{
if (toIter instanceof OResultSet) {
try {
((OResultSet) toIter).reset();
} catch (Exception ignore) {
}
}

currentFrom = fromIter != null && fromIter.hasNext() ? asVertex(fromIter.next()) : null;

if (uniqueIndexName != null) {
uniqueIndex = ctx.getDatabase().getMetadata().getIndexManager().getIndex(uniqueIndexName);
if (uniqueIndex == null) {
throw new OCommandExecutionException("Index not found for upsert: " + uniqueIndexName);
}

}

}

protected void loadNextFromTo() {
long begin = profilingEnabled ? System.nanoTime() : 0;
try {
while (true) {
this.currentTo = null;
if (!toIterator.hasNext()) {
toIterator = toList.iterator();
if (!fromIter.hasNext()) {
finished = true;
return;
}
currentFrom = fromIter.hasNext() ? asVertex(fromIter.next()) : null;
}
if (toIterator.hasNext() || (toList.size() > 0 && fromIter.hasNext())) {
if (currentFrom == null) {
throw new OCommandExecutionException("Invalid FROM vertex for edge");
}

Object obj = toIterator.next();

currentTo = asVertex(obj);
if (currentTo == null) {
throw new OCommandExecutionException("Invalid TO vertex for edge");
}

if (isUpsert() && edgeAlreadyExists(currentFrom, currentTo)) {
currentTo = null;
continue;
}
return;

} else {
this.currentTo = null;
return;
}
}
} finally {
if (profilingEnabled) {
cost += (System.nanoTime() - begin);
}
}
}

private boolean edgeAlreadyExists(OVertex currentFrom, OVertex currentTo) {
Object key = uniqueIndex.getDefinition().createValue(currentFrom.getIdentity(), currentTo.getIdentity());
return uniqueIndex.get(key) != null;
}

private boolean isUpsert() {
return uniqueIndex != null;
}

private OVertex asVertex(Object currentFrom) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.ODatabase;
import com.orientechnologies.orient.core.exception.OCommandExecutionException;
import com.orientechnologies.orient.core.metadata.schema.OClass;
import com.orientechnologies.orient.core.sql.parser.*;

Expand All @@ -18,17 +19,19 @@ public class OCreateEdgeExecutionPlanner {
protected OExpression leftExpression;
protected OExpression rightExpression;

protected boolean upsert = false;

protected OInsertBody body;
protected Number retry;
protected Number wait;
protected OBatch batch;


public OCreateEdgeExecutionPlanner(OCreateEdgeStatement statement) {
this.targetClass = statement.getTargetClass() == null ? null : statement.getTargetClass().copy();
this.targetClusterName = statement.getTargetClusterName() == null ? null : statement.getTargetClusterName().copy();
this.leftExpression = statement.getLeftExpression() == null ? null : statement.getLeftExpression().copy();
this.rightExpression = statement.getRightExpression() == null ? null : statement.getRightExpression().copy();
this.upsert = statement.isUpsert();
this.body = statement.getBody() == null ? null : statement.getBody().copy();
this.retry = statement.getRetry();
this.wait = statement.getWait();
Expand Down Expand Up @@ -59,16 +62,34 @@ public OInsertExecutionPlan createExecutionPlan(OCommandContext ctx, boolean ena
handleGlobalLet(result, new OIdentifier("$__ORIENT_CREATE_EDGE_fromV"), leftExpression, ctx, enableProfiling);
handleGlobalLet(result, new OIdentifier("$__ORIENT_CREATE_EDGE_toV"), rightExpression, ctx, enableProfiling);

result.chain(new CreateEdgesStep(targetClass, targetClusterName, new OIdentifier("$__ORIENT_CREATE_EDGE_fromV"),
new OIdentifier("$__ORIENT_CREATE_EDGE_toV"), wait, retry, batch, ctx, enableProfiling));
String uniqueIndexName = null;
if (upsert) {
OClass clazz = ctx.getDatabase().getMetadata().getSchema().getClass(targetClass.getStringValue());
if (clazz == null) {
throw new OCommandExecutionException("Class " + targetClass + " not found in the db schema");
}
uniqueIndexName = clazz.getIndexes().stream().filter(x -> x.isUnique()).filter(
x -> x.getDefinition().getFields().size() == 2 && x.getDefinition().getFields().contains("out") && x.getDefinition()
.getFields().contains("in")).map(x -> x.getName()).findFirst().orElse(null);

if (uniqueIndexName == null) {
throw new OCommandExecutionException(
"Cannot perform an UPSERT on " + targetClass + " edge class: no unique index present on out/in");
}
}

result.chain(
new CreateEdgesStep(targetClass, targetClusterName, uniqueIndexName, new OIdentifier("$__ORIENT_CREATE_EDGE_fromV"),
new OIdentifier("$__ORIENT_CREATE_EDGE_toV"), wait, retry, batch, ctx, enableProfiling));

handleSetFields(result, body, ctx, enableProfiling);
handleSave(result, targetClusterName, ctx, enableProfiling);
//TODO implement batch, wait and retry
return result;
}

private void handleGlobalLet(OInsertExecutionPlan result, OIdentifier name, OExpression expression, OCommandContext ctx, boolean profilingEnabled) {
private void handleGlobalLet(OInsertExecutionPlan result, OIdentifier name, OExpression expression, OCommandContext ctx,
boolean profilingEnabled) {
result.chain(new GlobalLetExpressionStep(name, expression, ctx, profilingEnabled));
}

Expand All @@ -78,7 +99,8 @@ private void handleCheckType(OInsertExecutionPlan result, OCommandContext ctx, b
}
}

private void handleSave(OInsertExecutionPlan result, OIdentifier targetClusterName, OCommandContext ctx, boolean profilingEnabled) {
private void handleSave(OInsertExecutionPlan result, OIdentifier targetClusterName, OCommandContext ctx,
boolean profilingEnabled) {
result.chain(new SaveElementStep(ctx, targetClusterName, profilingEnabled));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class OCreateEdgeStatement extends OStatement {
protected OIdentifier targetClass;
protected OIdentifier targetClusterName;

protected boolean upsert = false;

protected OExpression leftExpression;

protected OExpression rightExpression;
Expand Down Expand Up @@ -81,6 +83,9 @@ public void toString(Map<Object, Object> params, StringBuilder builder) {
targetClusterName.toString(params, builder);
}
}
if(upsert){
builder.append(" UPSERT");
}
builder.append(" FROM ");
leftExpression.toString(params, builder);

Expand Down Expand Up @@ -115,6 +120,8 @@ public void toString(Map<Object, Object> params, StringBuilder builder) {
result.targetClass = targetClass==null?null:targetClass.copy();
result.targetClusterName = targetClusterName==null?null:targetClusterName.copy();

result.upsert = this.upsert;

result.leftExpression = leftExpression==null?null:leftExpression.copy();

result.rightExpression = rightExpression==null?null:rightExpression.copy();
Expand All @@ -126,14 +133,17 @@ public void toString(Map<Object, Object> params, StringBuilder builder) {
return result;
}

@Override public boolean equals(Object o) {
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;

OCreateEdgeStatement that = (OCreateEdgeStatement) o;

if (upsert != that.upsert)
return false;
if (targetClass != null ? !targetClass.equals(that.targetClass) : that.targetClass != null)
return false;
if (targetClusterName != null ? !targetClusterName.equals(that.targetClusterName) : that.targetClusterName != null)
Expand All @@ -148,15 +158,14 @@ public void toString(Map<Object, Object> params, StringBuilder builder) {
return false;
if (wait != null ? !wait.equals(that.wait) : that.wait != null)
return false;
if (batch != null ? !batch.equals(that.batch) : that.batch != null)
return false;

return true;
return batch != null ? batch.equals(that.batch) : that.batch == null;
}

@Override public int hashCode() {
@Override
public int hashCode() {
int result = targetClass != null ? targetClass.hashCode() : 0;
result = 31 * result + (targetClusterName != null ? targetClusterName.hashCode() : 0);
result = 31 * result + (upsert ? 1 : 0);
result = 31 * result + (leftExpression != null ? leftExpression.hashCode() : 0);
result = 31 * result + (rightExpression != null ? rightExpression.hashCode() : 0);
result = 31 * result + (body != null ? body.hashCode() : 0);
Expand Down Expand Up @@ -229,5 +238,9 @@ public OBatch getBatch() {
public void setBatch(OBatch batch) {
this.batch = batch;
}

public boolean isUpsert() {
return upsert;
}
}
/* JavaCC - OriginalChecksum=2d3dc5693940ffa520146f8f7f505128 (do not edit this line) */
Loading

0 comments on commit 1a3fd10

Please sign in to comment.