diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java new file mode 100644 index 0000000000..4ab9a8859a --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.query; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.type.define.Directions; + +public class EdgesQueryIterator implements Iterator { + + private final List labels; + private final Directions directions; + private final long limit; + private final Iterator sources; + + public EdgesQueryIterator(Iterator sources, + Directions directions, + List labels, + long limit) { + this.sources = sources; + this.labels = labels; + this.directions = directions; + // Traverse NO_LIMIT ε’Œ Query.NO_LIMIT 不同 + this.limit = limit < 0 ? Query.NO_LIMIT : limit; + } + + @Override + public boolean hasNext() { + return sources.hasNext(); + } + + @Override + public Query next() { + Id sourceId = this.sources.next(); + ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId, + this.directions, + this.labels); + if (this.limit != Query.NO_LIMIT) { + query.limit(this.limit); + query.capacity(this.limit); + } else { + query.capacity(Query.NO_CAPACITY); + } + return query; + } +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 524a1f7593..0ad96f443c 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -26,16 +26,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; import org.apache.hugegraph.type.define.NodeRole; -import org.apache.hugegraph.util.*; import org.apache.hugegraph.util.Consumers; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.LockUtil; +import org.apache.hugegraph.util.Log; import org.slf4j.Logger; -import org.apache.hugegraph.HugeException; -import org.apache.hugegraph.HugeGraphParams; -import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; - public final class TaskManager { private static final Logger LOG = Log.logger(TaskManager.class); @@ -48,7 +49,7 @@ public final class TaskManager { public static final String TASK_SCHEDULER = "task-scheduler-%d"; protected static final long SCHEDULE_PERIOD = 1000L; // unit ms - + private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -134,7 +135,7 @@ private void closeTaskTx(HugeGraphParams graph) { graph.closeTx(); } else { Consumers.executeOncePerThread(this.taskExecutor, totalThreads, - graph::closeTx); + graph::closeTx, TX_CLOSE_TIMEOUT); } } catch (Exception e) { throw new HugeException("Exception when closing task tx", e); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index f5415d9c51..194576e857 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.traversal.algorithm; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,6 +39,7 @@ import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.query.Aggregate; import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; @@ -66,6 +69,7 @@ import org.apache.hugegraph.util.collection.ObjectIntMapping; import org.apache.hugegraph.util.collection.ObjectIntMappingFactory; import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; import org.slf4j.Logger; import com.google.common.collect.ImmutableList; @@ -465,6 +469,13 @@ private Iterator edgesOfVertex(Id source, EdgeStep edgeStep, return edgeStep.skipSuperNodeIfNeeded(edges); } + public EdgesIterator edgesOfVertices(Iterator sources, + Directions dir, + List labelIds, + long degree) { + return new EdgesIterator(new EdgesQueryIterator(sources, dir, labelIds, degree)); + } + public Iterator edgesOfVertex(Id source, Steps steps) { List edgeLabels = steps.edgeLabels(); ConditionQuery cq = GraphTransaction.constructEdgesQuery( @@ -474,6 +485,11 @@ public Iterator edgesOfVertex(Id source, Steps steps) { cq.limit(steps.limit()); } + if (steps.isEdgeEmpty()) { + Iterator edges = this.graph().edges(cq); + return edgesOfVertexStep(edges, steps); + } + Map edgeConditions = getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE); @@ -1004,4 +1020,33 @@ public Set getEdges(Iterator vertexIter) { return edges; } } + + public class EdgesIterator implements Iterator>, Closeable { + + private final Iterator> currentIter; + + public EdgesIterator(EdgesQueryIterator queries) { + List> iteratorList = new ArrayList<>(); + while (queries.hasNext()) { + Iterator edges = graph.edges(queries.next()); + iteratorList.add(edges); + } + this.currentIter = iteratorList.iterator(); + } + + @Override + public boolean hasNext() { + return this.currentIter.hasNext(); + } + + @Override + public Iterator next() { + return this.currentIter.next(); + } + + @Override + public void close() throws IOException { + CloseableIterator.closeIterator(currentIter); + } + } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java index 9f16f480b2..565d0af5f6 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java @@ -17,11 +17,11 @@ package org.apache.hugegraph.traversal.algorithm; -import java.util.Iterator; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords; @@ -48,25 +48,27 @@ public Set kneighbor(Id sourceV, Directions dir, Id labelId = this.getEdgeLabelId(label); - Set latest = newSet(); - Set all = newSet(); + KneighborRecords records = new KneighborRecords(true, sourceV, true); - latest.add(sourceV); - this.vertexIterCounter.addAndGet(1L); + Consumer consumer = edgeId -> { + if (this.reachLimit(limit, records.size())) { + return; + } + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + }; while (depth-- > 0) { - long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size(); - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); - if (reachLimit(limit, all.size())) { + records.startOneLayer(true); + traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT, consumer); + records.finishOneLayer(); + if (reachLimit(limit, records.size())) { break; } } - return all; + this.vertexIterCounter.addAndGet(records.size()); + + return records.idsBySet(limit); } public KneighborRecords customizedKneighbor(Id source, Steps steps, @@ -76,33 +78,29 @@ public KneighborRecords customizedKneighbor(Id source, Steps steps, checkPositive(maxDepth, "k-neighbor max_depth"); checkLimit(limit); - boolean concurrent = maxDepth >= this.concurrentDepth(); - - KneighborRecords records = new KneighborRecords(concurrent, + KneighborRecords records = new KneighborRecords(true, source, true); - Consumer consumer = v -> { + Consumer consumer = edge -> { if (this.reachLimit(limit, records.size())) { return; } - Iterator edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, records.size()) && edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); }; while (maxDepth-- > 0) { records.startOneLayer(true); - traverseIds(records.keys(), consumer, concurrent); + traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer); records.finishOneLayer(); + if (this.reachLimit(limit, records.size())) { + break; + } } + + this.vertexIterCounter.addAndGet(records.size()); + return records; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java index 9924c766c5..c683694c14 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java @@ -18,12 +18,15 @@ package org.apache.hugegraph.traversal.algorithm; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KoutRecords; import org.apache.hugegraph.traversal.algorithm.steps.Steps; @@ -57,34 +60,45 @@ public Set kout(Id sourceV, Directions dir, String label, Id labelId = this.getEdgeLabelId(label); - Set latest = newIdSet(); - latest.add(sourceV); + Set sources = newIdSet(); + Set neighbors = newIdSet(); + Set visited = nearest ? newIdSet() : null; - Set all = newIdSet(); - all.add(sourceV); + neighbors.add(sourceV); + + ConcurrentVerticesConsumer consumer; + + long remaining = capacity == NO_LIMIT ? NO_LIMIT : capacity - 1; - long remaining = capacity == NO_LIMIT ? - NO_LIMIT : capacity - latest.size(); - this.vertexIterCounter.addAndGet(1L); while (depth-- > 0) { // Just get limit nodes in last layer if limit < remaining capacity if (depth == 0 && limit != NO_LIMIT && (limit < remaining || remaining == NO_LIMIT)) { remaining = limit; } - if (nearest) { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - } else { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - null, degree, remaining); + + if (visited != null) { + visited.addAll(neighbors); } - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); + + // swap sources and neighbors + Set tmp = neighbors; + neighbors = sources; + sources = tmp; + + // start + consumer = new ConcurrentVerticesConsumer(sourceV, visited, remaining, neighbors); + + this.vertexIterCounter.addAndGet(sources.size()); + this.edgeIterCounter.addAndGet(neighbors.size()); + + traverseIdsByBfs(sources.iterator(), dir, labelId, degree, capacity, consumer); + + sources.clear(); + if (capacity != NO_LIMIT) { // Update 'remaining' value to record remaining capacity - remaining -= latest.size(); + remaining -= neighbors.size(); if (remaining <= 0 && depth > 0) { throw new HugeException( @@ -94,7 +108,7 @@ public Set kout(Id sourceV, Directions dir, String label, } } - return latest; + return neighbors; } public KoutRecords customizedKout(Id source, Steps steps, @@ -107,33 +121,25 @@ public KoutRecords customizedKout(Id source, Steps steps, checkLimit(limit); long[] depth = new long[1]; depth[0] = maxDepth; - boolean concurrent = maxDepth >= this.concurrentDepth(); - KoutRecords records = new KoutRecords(concurrent, source, nearest, 0); + KoutRecords records = new KoutRecords(true, source, nearest, 0); - Consumer consumer = v -> { + Consumer consumer = edge -> { if (this.reachLimit(limit, depth[0], records.size())) { return; } - Iterator edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, depth[0], records.size()) && - edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - this.checkCapacity(capacity, records.accessed(), depth[0]); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); }; while (depth[0]-- > 0) { + List sources = records.ids(Query.NO_LIMIT); records.startOneLayer(true); - this.traverseIds(records.keys(), consumer, concurrent); + traverseIdsByBfs(sources.iterator(), steps, capacity, consumer); + this.vertexIterCounter.addAndGet(sources.size()); records.finishOneLayer(); + checkCapacity(capacity, records.accessed(), depth[0]); } return records; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index b05de24228..c05d8f89f4 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -17,24 +17,36 @@ package org.apache.hugegraph.traversal.algorithm; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import com.google.common.base.Objects; import org.apache.commons.lang3.tuple.Pair; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.iterator.FilterIterator; +import org.apache.hugegraph.iterator.MapperIterator; +import org.apache.hugegraph.structure.HugeEdge; +import org.apache.hugegraph.traversal.algorithm.steps.Steps; +import org.apache.hugegraph.type.define.Directions; import org.apache.hugegraph.util.Consumers; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; -import org.apache.hugegraph.iterator.FilterIterator; +import com.google.common.base.Objects; public abstract class OltpTraverser extends HugeTraverser implements AutoCloseable { @@ -75,7 +87,7 @@ public static void destroy() { protected long traversePairs(Iterator> pairs, Consumer> consumer) { - return this.traverse(pairs, consumer, "traverse-pairs"); + return this.traverseByOne(pairs, consumer, "traverse-pairs"); } protected long traverseIds(Iterator ids, Consumer consumer, @@ -93,18 +105,19 @@ protected long traverseIds(Iterator ids, Consumer consumer, } protected long traverseIds(Iterator ids, Consumer consumer) { - return this.traverse(ids, consumer, "traverse-ids"); + return this.traverseByOne(ids, consumer, "traverse-ids"); } - protected long traverse(Iterator iterator, Consumer consumer, - String name) { + protected long traverseByOne(Iterator iterator, + Consumer consumer, + String taskName) { if (!iterator.hasNext()) { return 0L; } Consumers consumers = new Consumers<>(executors.getExecutor(), consumer, null); - consumers.start(name); + consumers.start(taskName); long total = 0L; try { while (iterator.hasNext()) { @@ -129,11 +142,101 @@ protected long traverse(Iterator iterator, Consumer consumer, return total; } + protected void traverseIdsByBfs(Iterator vertices, + Directions dir, + Id label, + long degree, + long capacity, + Consumer consumer) { + List labels = label == null ? Collections.emptyList() : + Collections.singletonList(label); + OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity); + + EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step", 1); + } + + protected void traverseIdsByBfs(Iterator vertices, + Steps steps, + long capacity, + Consumer consumer) { + StepsEdgeIterConsumer edgeIterConsumer = + new StepsEdgeIterConsumer(consumer, capacity, steps); + + EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices, + steps.direction(), + steps.edgeLabels(), + steps.degree()); + + // get Iterator> from Iterator + EdgesIterator edgeIter = new EdgesIterator(queryIterator); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps", 1); + } + + protected long traverseByBatch(Iterator> sources, + Consumer> consumer, + String taskName, int concurrentWorkers) { + if (!sources.hasNext()) { + return 0L; + } + AtomicBoolean done = new AtomicBoolean(false); + Consumers> consumers = null; + try { + consumers = buildConsumers(consumer, concurrentWorkers, done, + executors.getExecutor()); + return startConsumers(sources, taskName, done, consumers); + } finally { + assert consumers != null; + executors.returnExecutor(consumers.executor()); + } + } + + private long startConsumers(Iterator> sources, + String taskName, + AtomicBoolean done, + Consumers> consumers) { + long total = 0L; + try { + consumers.start(taskName); + while (sources.hasNext() && !done.get()) { + total++; + Iterator v = sources.next(); + consumers.provide(v); + } + } catch (Consumers.StopExecution e) { + // pass + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + try { + consumers.await(); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + CloseableIterator.closeIterator(sources); + } + } + return total; + } + + private Consumers> buildConsumers(Consumer> consumer, + int queueSizePerWorker, + AtomicBoolean done, + ExecutorService executor) { + return new Consumers<>(executor, + consumer, + null, + e -> done.set(true), + queueSizePerWorker); + } + protected Iterator filter(Iterator vertices, String key, Object value) { - return new FilterIterator<>(vertices, vertex -> { - return match(vertex, key, value); - }); + return new FilterIterator<>(vertices, vertex -> match(vertex, key, value)); } protected boolean match(Element elem, String key, Object value) { @@ -175,4 +278,104 @@ public List getValues(K key) { return values; } } + + public static class ConcurrentVerticesConsumer implements Consumer { + + private final Id sourceV; + private final Set excluded; + private final Set neighbors; + private final long limit; + private final AtomicInteger count; + + public ConcurrentVerticesConsumer(Id sourceV, Set excluded, long limit, + Set neighbors) { + this.sourceV = sourceV; + this.excluded = excluded; + this.limit = limit; + this.neighbors = neighbors; + this.count = new AtomicInteger(0); + } + + @Override + public void accept(EdgeId edgeId) { + if (this.limit != NO_LIMIT && count.get() >= this.limit) { + throw new Consumers.StopExecution("reach limit"); + } + + Id targetV = edgeId.otherVertexId(); + if (this.sourceV.equals(targetV)) { + return; + } + + if (this.excluded != null && this.excluded.contains(targetV)) { + return; + } + + if (this.neighbors.add(targetV)) { + if (this.limit != NO_LIMIT) { + this.count.getAndIncrement(); + } + } + } + } + + public abstract class EdgesConsumer implements Consumer> { + + private final Consumer consumer; + private final long capacity; + + public EdgesConsumer(Consumer consumer, long capacity) { + this.consumer = consumer; + this.capacity = capacity; + } + + protected abstract Iterator prepare(Iterator iter); + + @Override + public void accept(Iterator edgeIter) { + Iterator ids = prepare(edgeIter); + long counter = 0; + while (ids.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Consumer is Interrupted"); + break; + } + counter++; + this.consumer.accept(ids.next()); + } + long total = edgeIterCounter.addAndGet(counter); + // traverse by batch & improve performance + if (this.capacity != NO_LIMIT && total >= this.capacity) { + throw new Consumers.StopExecution("reach capacity"); + } + } + } + + public class OneStepEdgeIterConsumer extends EdgesConsumer { + + public OneStepEdgeIterConsumer(Consumer consumer, long capacity) { + super(consumer, capacity); + } + + @Override + protected Iterator prepare(Iterator edgeIter) { + return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id()); + } + } + + public class StepsEdgeIterConsumer extends EdgesConsumer { + + private final Steps steps; + + public StepsEdgeIterConsumer(Consumer consumer, long capacity, + Steps steps) { + super(consumer, capacity); + this.steps = steps; + } + + @Override + protected Iterator prepare(Iterator edgeIter) { + return edgesOfVertexStep(edgeIter, this.steps); + } + } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java index 7e04a286c3..649b1c2116 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java @@ -19,7 +19,9 @@ import static org.apache.hugegraph.traversal.algorithm.HugeTraverser.NO_LIMIT; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.Stack; import org.apache.hugegraph.backend.id.Id; @@ -45,6 +47,17 @@ public int size() { @Override public List ids(long limit) { List ids = CollectionFactory.newList(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + public Set idsBySet(long limit) { + Set ids = CollectionFactory.newSet(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + private void getRecords(long limit, Collection ids) { Stack records = this.records(); // Not include record(i=0) to ignore source vertex for (int i = 1; i < records.size(); i++) { @@ -54,7 +67,6 @@ public List ids(long limit) { limit--; } } - return ids; } @Override diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java index d1a9238be1..c2a1a7e1e1 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java @@ -138,6 +138,10 @@ public List edgeLabels() { return new ArrayList<>(this.edgeSteps.keySet()); } + public boolean isEdgeEmpty() { + return this.edgeSteps.isEmpty(); + } + public boolean isVertexEmpty() { return this.vertexSteps.isEmpty(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 00689e0c5e..06e678fd98 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -27,16 +27,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.apache.hugegraph.config.CoreOptions; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.task.TaskManager.ContextCallable; +import org.slf4j.Logger; public final class Consumers { @@ -46,16 +46,16 @@ public final class Consumers { private static final Logger LOG = Log.logger(Consumers.class); + private final V QUEUE_END = (V) new Object(); private final ExecutorService executor; private final Consumer consumer; - private final Runnable done; - + private final Runnable doneHandle; + private final Consumer exceptionHandle; private final int workers; + private final List runningFutures; private final int queueSize; private final CountDownLatch latch; private final BlockingQueue queue; - - private volatile boolean ending = false; private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer consumer) { @@ -63,23 +63,40 @@ public Consumers(ExecutorService executor, Consumer consumer) { } public Consumers(ExecutorService executor, - Consumer consumer, Runnable done) { + Consumer consumer, Runnable doneHandle) { + this(executor, consumer, doneHandle, QUEUE_WORKER_SIZE); + } + + public Consumers(ExecutorService executor, + Consumer consumer, + Runnable doneHandle, + int queueSizePerWorker) { + this(executor, consumer, doneHandle, null, queueSizePerWorker); + } + + public Consumers(ExecutorService executor, + Consumer consumer, + Runnable doneHandle, + Consumer exceptionHandle, + int queueSizePerWorker) { this.executor = executor; this.consumer = consumer; - this.done = done; + this.doneHandle = doneHandle; + this.exceptionHandle = exceptionHandle; int workers = THREADS; if (this.executor instanceof ThreadPoolExecutor) { workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize(); } this.workers = workers; - this.queueSize = QUEUE_WORKER_SIZE * workers; + + this.runningFutures = new ArrayList<>(workers); + this.queueSize = queueSizePerWorker * workers + 1; this.latch = new CountDownLatch(workers); this.queue = new ArrayBlockingQueue<>(this.queueSize); } public void start(String name) { - this.ending = false; this.exception = null; if (this.executor == null) { return; @@ -87,7 +104,8 @@ public void start(String name) { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.executor.submit(new ContextCallable<>(this::runAndDone)); + this.runningFutures.add( + this.executor.submit(new ContextCallable<>(this::runAndDone))); } } @@ -95,11 +113,15 @@ private Void runAndDone() { try { this.run(); } catch (Throwable e) { - // Only the first exception of one thread can be stored - this.exception = e; - if (!(e instanceof StopExecution)) { + if (e instanceof StopExecution) { + this.queue.clear(); + putQueueEnd(); + } else { + // Only the first exception to one thread can be stored + this.exception = e; LOG.error("Error when running task", e); } + exceptionHandle(e); } finally { this.done(); this.latch.countDown(); @@ -109,11 +131,7 @@ private Void runAndDone() { private void run() { LOG.debug("Start to work..."); - while (!this.ending) { - this.consume(); - } - assert this.ending; - while (this.consume()){ + while (this.consume()) { // ignore } @@ -121,14 +139,18 @@ private void run() { } private boolean consume() { - V elem; - try { - elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - return true; + V elem = null; + while (elem == null) { + try { + elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + return false; + } } - if (elem == null) { + + if (elem == QUEUE_END) { + putQueueEnd(); return false; } // do job @@ -136,13 +158,29 @@ private boolean consume() { return true; } + private void exceptionHandle(Throwable e) { + if (this.exceptionHandle == null) { + return; + } + + try { + this.exceptionHandle.accept(e); + } catch (Throwable ex) { + if (this.exception == null) { + this.exception = ex; + } else { + LOG.warn("Error while calling exceptionHandle()", ex); + } + } + } + private void done() { - if (this.done == null) { + if (this.doneHandle == null) { return; } try { - this.done.run(); + this.doneHandle.run(); } catch (Throwable e) { if (this.exception == null) { this.exception = e; @@ -169,6 +207,16 @@ public void provide(V v) throws Throwable { } else { try { this.queue.put(v); + } catch (InterruptedException e) { + LOG.warn("Interrupt while queuing QUEUE_END", e); + } + } + } + + private void putQueueEnd() { + if (this.executor != null) { + try { + this.queue.put(QUEUE_END); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -176,15 +224,18 @@ public void provide(V v) throws Throwable { } public void await() throws Throwable { - this.ending = true; if (this.executor == null) { // call done() directly if without thread pool this.done(); } else { try { + putQueueEnd(); this.latch.await(); } catch (InterruptedException e) { String error = "Interrupted while waiting for consumers"; + for (Future f : this.runningFutures) { + f.cancel(true); + } this.exception = new HugeException(error, e); LOG.warn(error, e); } @@ -201,7 +252,8 @@ public ExecutorService executor() { public static void executeOncePerThread(ExecutorService executor, int totalThreads, - Runnable callback) + Runnable callback, + long invokeTimeout) throws InterruptedException { // Ensure callback execute at least once for every thread final Map threadsTimes = new ConcurrentHashMap<>(); @@ -230,7 +282,7 @@ public static void executeOncePerThread(ExecutorService executor, for (int i = 0; i < totalThreads; i++) { tasks.add(task); } - executor.invokeAll(tasks); + executor.invokeAll(tasks, invokeTimeout, TimeUnit.SECONDS); } public static ExecutorService newThreadPool(String prefix, int workers) { @@ -290,13 +342,21 @@ public synchronized ExecutorService getExecutor() { public synchronized void returnExecutor(ExecutorService executor) { E.checkNotNull(executor, "executor"); if (!this.executors.offer(executor)) { - executor.shutdown(); + try { + executor.shutdown(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } } public synchronized void destroy() { for (ExecutorService executor : this.executors) { - executor.shutdown(); + try { + executor.shutdownNow(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } this.executors.clear(); } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 2dba5fa766..283baa622a 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -44,9 +44,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.rocksdb.RocksDBException; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.id.Id; @@ -69,6 +66,9 @@ import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.InsertionOrderUtil; import org.apache.hugegraph.util.Log; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; + import com.google.common.collect.ImmutableList; public abstract class RocksDBStore extends AbstractBackendStore { @@ -93,7 +93,8 @@ public abstract class RocksDBStore extends AbstractBackendStore