Skip to content

Commit

Permalink
Fixed distributed test cases and added distributed merge test case
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Aug 4, 2015
1 parent d1e1730 commit 5e8d735
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.sql.functions.coll;

import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.record.impl.ODocument;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.record.impl.ODocument;

/**
* This operator add an entry in a map. The entry is composed by a key and a value.
*
Expand Down Expand Up @@ -105,10 +105,10 @@ protected ODocument prepareResult(ODocument res) {
@SuppressWarnings("unchecked")
@Override
public Object mergeDistributedResult(List<Object> resultsToMerge) {
final Map<Long, Map<Object, Object>> chunks = new HashMap<Long, Map<Object, Object>>();
final Map<String, Map<Object, Object>> chunks = new HashMap<String, Map<Object, Object>>();
for (Object iParameter : resultsToMerge) {
final Map<String, Object> container = (Map<String, Object>) ((Map<Object, Object>) iParameter).get("doc");
chunks.put((Long) container.get("node"), (Map<Object, Object>) container.get("context"));
chunks.put((String) container.get("node"), (Map<Object, Object>) container.get("context"));
}
final Map<Object, Object> result = new HashMap<Object, Object>();
for (Map<Object, Object> chunk : chunks.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -82,14 +83,10 @@ public List<Object> getResult() {
@SuppressWarnings("unchecked")
@Override
public Object mergeDistributedResult(List<Object> resultsToMerge) {
final Map<Long, Collection<Object>> chunks = new HashMap<Long, Collection<Object>>();
final Collection<Object> result = new HashSet<Object>();
for (Object iParameter : resultsToMerge) {
final Map<String, Object> container = (Map<String, Object>) ((Collection<?>) iParameter).iterator().next();
chunks.put((Long) container.get("node"), (Collection<Object>) container.get("context"));
}
final Collection<Object> result = new ArrayList<Object>();
for (Collection<Object> chunk : chunks.values()) {
result.addAll(chunk);
result.addAll((Collection<?>) container.get("context"));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.sql.functions.coll;

import com.orientechnologies.orient.core.command.OCommandContext;
Expand Down Expand Up @@ -49,10 +49,17 @@ public Object execute(Object iThis, final OIdentifiable iCurrentRecord, Object i
context = new HashMap<Object, Object>();

if (iParams.length == 1) {
if (iParams[0] instanceof Map<?, ?>)
if (iParams[0] == null)
return null;

if (iParams[0] instanceof Map<?, ?>) {
if (context == null)
// AGGREGATION MODE (STATEFULL)
context = new HashMap<Object, Object>();

// INSERT EVERY SINGLE COLLECTION ITEM
context.putAll((Map<Object, Object>) iParams[0]);
else
} else
throw new IllegalArgumentException("Map function: expected a map or pairs of parameters as key, value");
} else if (iParams.length % 2 != 0)
throw new IllegalArgumentException("Map function: expected a map or pairs of parameters as key, value");
Expand Down Expand Up @@ -88,7 +95,7 @@ public Map<Object, Object> getResult() {
return prepareResult(res);
}

protected Map<Object, Object> prepareResult(Map<Object, Object> res) {
protected Map<Object, Object> prepareResult(final Map<Object, Object> res) {
if (returnDistributedResult()) {
final Map<String, Object> doc = new HashMap<String, Object>();
doc.put("node", getDistributedStorageId());
Expand All @@ -101,15 +108,11 @@ protected Map<Object, Object> prepareResult(Map<Object, Object> res) {

@SuppressWarnings("unchecked")
@Override
public Object mergeDistributedResult(List<Object> resultsToMerge) {
final Map<Long, Map<Object, Object>> chunks = new HashMap<Long, Map<Object, Object>>();
public Object mergeDistributedResult(final List<Object> resultsToMerge) {
final Map<Object, Object> result = new HashMap<Object, Object>();
for (Object iParameter : resultsToMerge) {
final Map<String, Object> container = (Map<String, Object>) ((Map<Object, Object>) iParameter).get("doc");
chunks.put((Long) container.get("node"), (Map<Object, Object>) container.get("context"));
}
final Map<Object, Object> result = new HashMap<Object, Object>();
for (Map<Object, Object> chunk : chunks.values()) {
result.putAll(chunk);
result.putAll((Map<Object, Object>) container.get("context"));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.sql.functions.coll;

import com.orientechnologies.common.collection.OMultiValue;
Expand Down Expand Up @@ -86,14 +86,10 @@ public Set<Object> getResult() {
@SuppressWarnings("unchecked")
@Override
public Object mergeDistributedResult(List<Object> resultsToMerge) {
final Map<Long, Collection<Object>> chunks = new HashMap<Long, Collection<Object>>();
final Collection<Object> result = new HashSet<Object>();
for (Object iParameter : resultsToMerge) {
final Map<String, Object> container = (Map<String, Object>) ((Collection<?>) iParameter).iterator().next();
chunks.put((Long) container.get("node"), (Collection<Object>) container.get("context"));
}
final Collection<Object> result = new HashSet<Object>();
for (Collection<Object> chunk : chunks.values()) {
result.addAll(chunk);
result.addAll((Collection<?>) container.get("context"));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.orientechnologies.orient.server.distributed;

import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import junit.framework.Assert;
import org.junit.Test;

import java.util.Collections;

public class DistributedAggregateCollectionTest extends AbstractServerClusterTest {
private final static int SERVERS = 1;

@Override
public String getDatabaseName() {
return "DistributedAggregateCollectionTest";
}

@Test
public void test() throws Exception {
init(SERVERS);
prepare(false);
execute();
}

@Override
protected void onAfterDatabaseCreation(OrientBaseGraph db) {
db.command(new OCommandSQL("CREATE CLASS Item extends V")).execute();
db.command(new OCommandSQL("CREATE PROPERTY Item.name STRING")).execute();
db.command(new OCommandSQL("CREATE PROPERTY Item.map EMBEDDEDMAP")).execute();
}

@Override
protected void executeTest() throws Exception {

ODatabaseDocumentTx db = new ODatabaseDocumentTx("plocal:target/server0/databases/" + getDatabaseName());
db.open("admin", "admin");

try {
db.command(new OCommandSQL("INSERT into Item (name) values ('foo')")).execute();

Iterable<ODocument> result = db.command(new OCommandSQL("select set(name) as names from Item")).execute();
Assert.assertEquals(Collections.singleton("foo"), result.iterator().next().field("names"));

result = db.command(new OCommandSQL("select list(name) as names from Item")).execute();
Assert.assertEquals(Collections.singleton("foo"), result.iterator().next().field("names"));

db.command(new OCommandSQL("INSERT into Item (map) values ({'a':'b'}) return @this")).execute();

result = db.command(new OCommandSQL("select map(map) as names from Item")).execute();
Assert.assertEquals(Collections.singletonMap("a", "b"), result.iterator().next().field("names"));

} finally {
db.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,42 +258,44 @@ public Object command(final OCommandRequestText iCommand) {

final Map<String, Collection<String>> nodeClusterMap = dbCfg.getServerClusterMap(involvedClusters, localNodeName);

if (nodeClusterMap.size() == 1 && nodeClusterMap.keySet().iterator().next().equals(localNodeName))
final Map<String, Object> results;

if (nodeClusterMap.size() == 1 && nodeClusterMap.keySet().iterator().next().equals(localNodeName)) {
// LOCAL NODE, AVOID TO DISTRIBUTE IT
return ODistributedAbstractPlugin.runInDistributedMode(new Callable() {
result = ODistributedAbstractPlugin.runInDistributedMode(new Callable() {
@Override
public Object call() throws Exception {
return wrapped.command(iCommand);
}
});
results = new HashMap<String, Object>(1);
results.put(localNodeName, result);

// SELECT: SPLIT CLASSES/CLUSTER IF ANY
final Map<String, Object> results = executeOnServers(iCommand, involvedClusters, nodeClusterMap);

if (results.size() == 1)
// ONE RESULT ONLY: RETURN IT DIRECTLY
result = results.values().iterator().next();
else {
final OCommandExecutorSQLSelect select = exec instanceof OCommandExecutorSQLSelect ? (OCommandExecutorSQLSelect) exec
: null;

if (select != null && select.isAnyFunctionAggregates()) {
result = mergeResultByAggegation(select, results);
} else {
// MIX & FILTER RESULT SET AVOIDING DUPLICATES
// TODO: ONCE OPTIMIZED (SEE ABOVE) AVOID TO FILTER HERE
final Set<Object> set = new HashSet<Object>();
for (Map.Entry<String, Object> entry : ((Map<String, Object>) results).entrySet()) {
final Object nodeResult = entry.getValue();
if (nodeResult instanceof Collection)
set.addAll((Collection<?>) nodeResult);
else if (nodeResult instanceof Exception)
// RECEIVED EXCEPTION
throw (Exception) nodeResult;
}
result = new ArrayList<Object>(set);
} else {
// SELECT: SPLIT CLASSES/CLUSTER IF ANY
results = executeOnServers(iCommand, involvedClusters, nodeClusterMap);
}

final OCommandExecutorSQLSelect select = exec instanceof OCommandExecutorSQLSelect ? (OCommandExecutorSQLSelect) exec
: null;

if (select != null && select.isAnyFunctionAggregates()) {
result = mergeResultByAggegation(select, results);
} else {
// MIX & FILTER RESULT SET AVOIDING DUPLICATES
// TODO: ONCE OPTIMIZED (SEE ABOVE) AVOID TO FILTER HERE
final Set<Object> set = new HashSet<Object>();
for (Map.Entry<String, Object> entry : ((Map<String, Object>) results).entrySet()) {
final Object nodeResult = entry.getValue();
if (nodeResult instanceof Collection)
set.addAll((Collection<?>) nodeResult);
else if (nodeResult instanceof Exception)
// RECEIVED EXCEPTION
throw (Exception) nodeResult;
}
result = new ArrayList<Object>(set);
}

} else {
final OAbstractCommandTask task = iCommand instanceof OCommandScript ? new OScriptTask(iCommand) : new OSQLCommandTask(
iCommand, new HashSet<String>());
Expand Down

0 comments on commit 5e8d735

Please sign in to comment.