Skip to content

Commit

Permalink
add example code
Browse files Browse the repository at this point in the history
  • Loading branch information
wangtianthu committed Aug 24, 2016
1 parent 2f24e01 commit 7695c4e
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ GSYMS
.project
.settings
_build/
build/
dist
target
.buildcache
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/twitter/nodes/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -348,7 +349,9 @@ public final Node addSinkNodes(Node... nodes) {
* This is only used to generate DOT graph.
*/
public final Set<? extends Enum> getOptionalDependencies() {
Preconditions.checkArgument(!dependentNodesByName.isEmpty(), "This node has no dependency");
if (dependentNodesByName.isEmpty()) {
return Collections.emptySet();
}
Enum firstEnum = dependentNodesByName.keySet().iterator().next();
return getOptionalDependenciesForClass(firstEnum.getClass());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/twitter/nodes/ServiceNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public String getServiceName() {
* NOTE: Called at the evaluation time.
*/
@VisibleForTesting
Service<Req, Resp> getService() {
protected Service<Req, Resp> getService() {
return service;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2016 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.nodes.examples.search;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.twitter.util.Future;
import com.twitter.util.Promise;

/**
* A class to wrap an immediate value into a Future that delays its materialization by some random
* time. This is just used to simulate the asynchronous computing environment. In the real server,
* this delay could be caused by calls to external services, and computation in a different thread.
*/
public class DelayedResponse {
private final ExecutorService executor = Executors.newScheduledThreadPool(5);
private Random random = new Random();

private static final DelayedResponse DELAY = new DelayedResponse();
public static DelayedResponse get() {
return DELAY;
}

public final <T> Future<T> delay(final T value) {
final Promise<T> promise = new Promise<T>();
executor.submit(() -> {
try {
Thread.sleep(500 + random.nextInt(500)); // somewhere between 0.5 ~ 1.0 second
promise.become(Future.value(value));
} catch (InterruptedException e) {
Thread.interrupted();
}
});
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright 2016 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.nodes.examples.search;

import java.io.File;
import java.io.IOException;
import java.util.logging.Logger;

import com.google.common.io.Files;

import com.twitter.nodes.Node;
import com.twitter.nodes.utils.DebugLevel;
import com.twitter.nodes.utils.DebugManager;
import com.twitter.nodes.utils.DebugMessageBuilder;
import com.twitter.util.Await;
import com.twitter.util.Future;

/**
* An example search engine server implemented with Nodes.
*/
public class SearchExampleMain {
private static final Logger LOG = Logger.getLogger(SearchExampleMain.class.getSimpleName());

public static void main(String[] args) {
// Enable debugging, this is both for the collection of debug info and also for enabling the
// DOT visualization.
DebugManager.update(new DebugMessageBuilder(DebugLevel.DEBUG_DETAILED));

// Create the input and the graph
SearchRequest request = new SearchRequest("query", 10, 777L);
SearchGraph searchGraph = new SearchGraph(Node.value(request));

// Actually start execution
Future<SearchResponse> responseFuture = searchGraph.responseNode.apply();

// Wait for response and print results
try {
SearchResponse response = Await.result(responseFuture);
System.out.println("Search response: " + response);
} catch (Exception e) {
LOG.warning("Exception thrown while waiting for results: " + e);
}

// Print the debug information
System.out.println("Debug output:\n" + DebugManager.getDebugMessage());

// Produce dependency graph visualization
try {
Files.write(searchGraph.toDotGraph().getBytes(), new File("graph.dot"));
} catch (IOException e) {
LOG.warning("Cannot write to local file");
}
}
}
180 changes: 180 additions & 0 deletions src/main/java/com/twitter/nodes/examples/search/SearchGraph.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Copyright 2016 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.nodes.examples.search;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import com.twitter.finagle.Service;
import com.twitter.nodes.Node;
import com.twitter.nodes.OptionalDep;
import com.twitter.nodes.ServiceNode;
import com.twitter.nodes.Subgraph;
import com.twitter.util.Future;

/**
* The main graph for search workflow.
*/
public class SearchGraph extends Subgraph {
// The exposed output node
public final Node<SearchResponse> responseNode;

/**
* Construct a search graph using the request node as input.
*/
public SearchGraph(Node<SearchRequest> requestNode) {
// look up user score using UserScoreServiceNode, this step is independent from search
Node<Double> userScoreNode =
Node.ifThenElse(
requestNode.map("hasUserId", r -> r.getUserId() > 0),
new UserScoreServiceNode(
requestNode.map("getUserId", SearchRequest::getUserId)),
Node.value(0.0, "defaultUserScore"));

// Search the index and find result ids for the given query
Node<List<Long>> resultIdsNode = Node.build(
SearchIndexNode.class,
SearchIndexNode.D.QUERY, requestNode.map("getQuery", SearchRequest::getQuery),
SearchIndexNode.D.NUM_RESULTS, requestNode.map("getNumResults",
r -> r.getNumResults() > 0 ? r.getNumResults() : 10));

// Hydrate the ids returned from the index
Node<Map<Long, String>> hydrationMapNode = Node.build(
HydrationNode.class,
HydrationNode.D.ID_LIST, resultIdsNode,
HydrationNode.D.PREFIX, Node.value("cool_prefix", "newPrefix"));

// Combined the hydration information and the user score to create the final response
this.responseNode = Node.build(
BuildResponseNode.class,
BuildResponseNode.D.USER_SCORE, userScoreNode,
BuildResponseNode.D.RESULT_ID_LIST, resultIdsNode,
BuildResponseNode.D.HYDRATION_MAP, hydrationMapNode)
.when(requestNode.map("hasQuery", r -> !r.getQuery().isEmpty()));

markExposedNodes();
}

// ------------- A bunch of mock service-like nodes ----------------

/**
* A mock search index node, only return ids for results
*/
public static class SearchIndexNode extends Node<List<Long>> {
public enum D {
QUERY,
NUM_RESULTS
}

@Override
protected Future<List<Long>> evaluate() throws Exception {
String query = getDep(D.QUERY);
int numResult = getDep(D.NUM_RESULTS);
List<Long> results = new ArrayList<>(numResult);
for (int i = 0; i < numResult; ++i) {
results.add((long) query.hashCode() + i);
}
return DelayedResponse.get().delay(results);
}
}

/**
* A mock hydration node, just create a map of strings keyed by long ids.
*/
public static class HydrationNode extends Node<Map<Long, String>> {
public enum D {
ID_LIST,
@OptionalDep PREFIX
}

@Override
protected Future<Map<Long, String>> evaluate() throws Exception {
List<Long> idList = getDep(D.ID_LIST);
String prefix = getDep(D.PREFIX, "default_text");
Map<Long, String> map = Maps.newHashMap();
for (long id : idList) {
if (id % 3 == 0) {
continue; // skip some entries
}
map.put(id, prefix + ":" + id);
}
return DelayedResponse.get().delay(map);
}
}

/**
* A node to build responses
*/
public static class BuildResponseNode extends Node<SearchResponse> {
public enum D {
USER_SCORE,
RESULT_ID_LIST,
HYDRATION_MAP
}

@Override
protected Future<SearchResponse> evaluate() throws Exception {
Double userScore = getDep(D.USER_SCORE);
List<Long> idList = getDep(D.RESULT_ID_LIST);
Map<Long, String> hydrationMap = getDep(D.HYDRATION_MAP);

// Create a list of results following the original order, but only keeping those
// successfully hydrated.
List<SearchResult> results = Lists.newArrayList();
for (long id : idList) {
String text = hydrationMap.get(id);
if (text != null) {
results.add(new SearchResult(id, text));
} else {
debugDetailed("Unable to create result with id=%d", id);
}
}

SearchResponse response = new SearchResponse(results, userScore);
return DelayedResponse.get().delay(response);
}
}

/**
* Another mocked service to get user score, a double value based on the user id. This shows
* one possible usage of the ServiceNode.
*/
public static class UserScoreServiceNode extends ServiceNode<Long, Double> {
public UserScoreServiceNode(Node<Long> user) {
super(ImmutableList.of(user));
}

@Override
protected Long buildRequest() {
return getDep(DefaultDependencyEnum.DEP0); // get the first dependency
}

@Override
protected Service<Long, Double> getService() {
return new Service<Long, Double>() {
@Override
public Future<Double> apply(Long request) {
return DelayedResponse.get().delay(request.longValue() / (request.longValue() + 100.0));
}
};
}
}
}
43 changes: 43 additions & 0 deletions src/main/java/com/twitter/nodes/examples/search/SearchRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2016 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.twitter.nodes.examples.search;

/**
* Search Request
*/
public class SearchRequest {
private final String query;
private final int numResults;
private final long userId;

public SearchRequest(String query, int numResults, long userId) {
this.query = query;
this.numResults = numResults;
this.userId = userId;
}

public String getQuery() {
return query;
}

public int getNumResults() {
return numResults;
}

public long getUserId() {
return userId;
}
}
Loading

0 comments on commit 7695c4e

Please sign in to comment.