Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Commit

Permalink
[WIP] Degree Centrality (#830)
Browse files Browse the repository at this point in the history
* uncomment test

* For Cypher loading we will always treat the graph as outgoing, and let the user handle the actual direction in the query

* deduplicate nodes in Cypher loading land

* deduplicate nodes in Graph View land

* tests for new custom counting

* degree centrality proc

* Integration test

* Integration test for weighted

* tests for outgoing

* use the proper threading approach

* use the proper threading approach for weighted

* Partition Degree Centrality computation

* weighted degree in the same class

* remove unused code

* put the direction logic into the loader

* handle Cypher loading

* use the sum strategy for accumulating weight degrees

* use the correct duplicates strategy in these tests
  • Loading branch information
mneedham authored Mar 4, 2019
1 parent febe378 commit e7cbde8
Show file tree
Hide file tree
Showing 28 changed files with 1,889 additions and 192 deletions.
230 changes: 230 additions & 0 deletions algo/src/main/java/org/neo4j/graphalgo/DegreeCentralityProc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/**
* Copyright (c) 2017 "Neo4j, Inc." <http://neo4j.com>
*
* This file is part of Neo4j Graph Algorithms <http://github.com/neo4j-contrib/neo4j-graph-algorithms>.
*
* Neo4j Graph Algorithms is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.graphalgo;

import org.neo4j.graphalgo.api.Graph;
import org.neo4j.graphalgo.api.GraphFactory;
import org.neo4j.graphalgo.api.HugeGraph;
import org.neo4j.graphalgo.core.GraphLoader;
import org.neo4j.graphalgo.core.ProcedureConfiguration;
import org.neo4j.graphalgo.core.ProcedureConstants;
import org.neo4j.graphalgo.core.utils.Pools;
import org.neo4j.graphalgo.core.utils.ProgressTimer;
import org.neo4j.graphalgo.core.utils.TerminationFlag;
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
import org.neo4j.graphalgo.core.write.Exporter;
import org.neo4j.graphalgo.impl.Algorithm;
import org.neo4j.graphalgo.impl.degree.DegreeCentrality;
import org.neo4j.graphalgo.impl.results.CentralityResult;
import org.neo4j.graphalgo.impl.pagerank.DegreeCentralityAlgorithm;
import org.neo4j.graphalgo.results.DegreeCentralityScore;
import org.neo4j.graphdb.Direction;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.procedure.*;

import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.neo4j.graphalgo.core.ProcedureConstants.CYPHER_QUERY;

public final class DegreeCentralityProc {

public static final String DEFAULT_SCORE_PROPERTY = "degree";
public static final String CONFIG_WEIGHT_KEY = "weightProperty";

@Context
public GraphDatabaseAPI api;

@Context
public Log log;

@Context
public KernelTransaction transaction;

@Procedure(value = "algo.degree", mode = Mode.WRITE)
@Description("CALL algo.degree(label:String, relationship:String, " +
"{ weightProperty: null, write: true, writeProperty:'degree', concurrency:4}) " +
"YIELD nodes, iterations, loadMillis, computeMillis, writeMillis, dampingFactor, write, writeProperty" +
" - calculates page rank and potentially writes back")
public Stream<DegreeCentralityScore.Stats> degree(
@Name(value = "label", defaultValue = "") String label,
@Name(value = "relationship", defaultValue = "") String relationship,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {

ProcedureConfiguration configuration = ProcedureConfiguration.create(config);
final String weightPropertyKey = configuration.getString(CONFIG_WEIGHT_KEY, null);

DegreeCentralityScore.Stats.Builder statsBuilder = new DegreeCentralityScore.Stats.Builder();
AllocationTracker tracker = AllocationTracker.create();
Direction direction = getDirection(configuration);
final Graph graph = load(label, relationship, tracker, configuration.getGraphImpl(), statsBuilder, configuration, weightPropertyKey, direction);

if(graph.nodeCount() == 0) {
graph.release();
return Stream.of(statsBuilder.build());
}

TerminationFlag terminationFlag = TerminationFlag.wrap(transaction);
CentralityResult scores = evaluate(graph, tracker, terminationFlag, configuration, statsBuilder, weightPropertyKey, direction);

logMemoryUsage(tracker);

write(graph, terminationFlag, scores, configuration, statsBuilder);

return Stream.of(statsBuilder.build());
}

private Direction getDirection(ProcedureConfiguration configuration) {
String graphName = configuration.getGraphName(ProcedureConstants.DEFAULT_GRAPH_IMPL);
Direction direction = configuration.getDirection(Direction.INCOMING);
return CYPHER_QUERY.equals(graphName) ? Direction.OUTGOING : direction;
}

@Procedure(value = "algo.degree.stream", mode = Mode.READ)
@Description("CALL algo.degree.stream(label:String, relationship:String, " +
"{weightProperty: null, concurrency:4}) " +
"YIELD node, score - calculates page rank and streams results")
public Stream<DegreeCentralityScore> degreeStream(
@Name(value = "label", defaultValue = "") String label,
@Name(value = "relationship", defaultValue = "") String relationship,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {

ProcedureConfiguration configuration = ProcedureConfiguration.create(config);

final String weightPropertyKey = configuration.getString(CONFIG_WEIGHT_KEY, null);

DegreeCentralityScore.Stats.Builder statsBuilder = new DegreeCentralityScore.Stats.Builder();
Direction direction = getDirection(configuration);
AllocationTracker tracker = AllocationTracker.create();
final Graph graph = load(label, relationship, tracker, configuration.getGraphImpl(), statsBuilder, configuration, weightPropertyKey, direction);

if(graph.nodeCount() == 0) {
graph.release();
return Stream.empty();
}

TerminationFlag terminationFlag = TerminationFlag.wrap(transaction);
CentralityResult scores = evaluate(graph, tracker, terminationFlag, configuration, statsBuilder, weightPropertyKey, direction);

logMemoryUsage(tracker);

if (graph instanceof HugeGraph) {
HugeGraph hugeGraph = (HugeGraph) graph;
return LongStream.range(0, hugeGraph.nodeCount())
.mapToObj(i -> {
final long nodeId = hugeGraph.toOriginalNodeId(i);
return new DegreeCentralityScore(
nodeId,
scores.score(i)
);
});
}

return IntStream.range(0, Math.toIntExact(graph.nodeCount()))
.mapToObj(i -> {
final long nodeId = graph.toOriginalNodeId(i);
return new DegreeCentralityScore(
nodeId,
scores.score(i)
);
});
}

private void logMemoryUsage(AllocationTracker tracker) {
log.info("Degree Centrality: overall memory usage: %s", tracker.getUsageString());
}

private Graph load(
String label,
String relationship,
AllocationTracker tracker,
Class<? extends GraphFactory> graphFactory,
DegreeCentralityScore.Stats.Builder statsBuilder,
ProcedureConfiguration configuration,
String weightPropertyKey, Direction direction) {
GraphLoader graphLoader = new GraphLoader(api, Pools.DEFAULT)
.init(log, label, relationship, configuration)
.withAllocationTracker(tracker)
.withOptionalRelationshipWeightsFromProperty(weightPropertyKey, configuration.getWeightPropertyDefaultValue(0.0));

graphLoader.direction(direction);

try (ProgressTimer timer = statsBuilder.timeLoad()) {
Graph graph = graphLoader.load(graphFactory);
statsBuilder.withNodes(graph.nodeCount());
return graph;
}
}

private CentralityResult evaluate(
Graph graph,
AllocationTracker tracker,
TerminationFlag terminationFlag,
ProcedureConfiguration configuration,
DegreeCentralityScore.Stats.Builder statsBuilder,
String weightPropertyKey, Direction direction) {

final int concurrency = configuration.getConcurrency(Pools.getNoThreadsInDefaultPool());

if (direction == Direction.BOTH) {
direction = Direction.OUTGOING;
}

DegreeCentralityAlgorithm algo = new DegreeCentrality(graph, Pools.DEFAULT, concurrency, direction, weightPropertyKey != null);
statsBuilder.timeEval(algo::compute);
Algorithm<?> algorithm = algo.algorithm();
algorithm.withTerminationFlag(terminationFlag);

final CentralityResult pageRank = algo.result();
algo.algorithm().release();
graph.release();
return pageRank;
}

private void write(
Graph graph,
TerminationFlag terminationFlag,
CentralityResult result,
ProcedureConfiguration configuration,
final DegreeCentralityScore.Stats.Builder statsBuilder) {
if (configuration.isWriteFlag(true)) {
log.debug("Writing results");
String propertyName = configuration.getWriteProperty(DEFAULT_SCORE_PROPERTY);
try (ProgressTimer timer = statsBuilder.timeWrite()) {
Exporter exporter = Exporter
.of(api, graph)
.withLog(log)
.parallel(Pools.DEFAULT, configuration.getConcurrency(), terminationFlag)
.build();
result.export(propertyName, exporter);
}
statsBuilder
.withWrite(true)
.withProperty(propertyName);
} else {
statsBuilder.withWrite(false);
}
}


}
127 changes: 0 additions & 127 deletions algo/src/main/java/org/neo4j/graphalgo/impl/DegreeCentrality.java

This file was deleted.

Loading

0 comments on commit e7cbde8

Please sign in to comment.