This repository has been archived by the owner on Jun 23, 2022. It is now read-only.
forked from Xtra-Computing/briskstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
YSB.java
114 lines (95 loc) · 3.68 KB
/
YSB.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package applications.topology;
// import applications.bolts.comm.StringParserBolt;
// import applications.bolts.wc.SplitSentenceBolt;
// import applications.bolts.wc.WordCountBolt;
import applications.bolts.ysb.DeserializeBolt;
import applications.bolts.ysb.EventFilterBolt;
import applications.bolts.ysb.EventProjectionBolt;
import applications.bolts.ysb.YSBMemFileSpout;
import applications.bolts.ysb.YSBSink;
// import applications.constants.WordCountConstants;
// import applications.constants.WordCountConstants.Component;
// import applications.constants.WordCountConstants.Field;
// import applications.constants.YSBConstants;
// import applications.constants.YSBConstants.Component;
// import applications.constants.YSBConstants.Field;
import applications.util.Configuration;
import applications.sink.BaseSink;
import brisk.components.Topology;
import brisk.components.exception.InvalidIDException;
import brisk.components.grouping.FieldsGrouping;
import brisk.components.grouping.ShuffleGrouping;
import brisk.controller.input.scheduler.SequentialScheduler;
import brisk.execution.runtime.tuple.impl.Fields;
import brisk.topology.BasicTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import brisk.components.operators.api.AbstractSpout;
// import static applications.constants.YSBConstants.PREFIX;
public class YSB extends BasicTopology {
private static final String PREFIX = "ysb";
private static final Logger LOG = LoggerFactory.getLogger(YSB.class);
public YSB(String topologyName, Configuration config) {
super("[YSB]", config);
// initilize_parser();
}
public static String getPrefix() {
return PREFIX;
}
public void initialize() {
System.out.println("[DBG] Initialize YSB topology");
loadSpout();
loadSink();
}
@Override
protected AbstractSpout loadSpout() {
if (spout == null) {
spout = new YSBMemFileSpout();
}
return (AbstractSpout) spout;
}
@Override
protected BaseSink loadSink() {
if (sink == null) {
sink = new YSBSink();
}
return (BaseSink) sink;
}
// @Override
// protected void initialize_parser() {
// // if (parser == null) {
// // parser = (Parser)(new DeserializeBolt());
// // }
// }
@Override
public Topology buildTopology() {
int parallelism = 8;
try {
builder.setSpout("ysbSpout", spout, 4);
builder.setBolt("ysbDeserializeBolt", new DeserializeBolt(), parallelism,
new ShuffleGrouping("ysbSpout"));
builder.setBolt("ysbEventBolt", new EventFilterBolt(), parallelism,
// new FieldsGrouping("ysbDeserializeBolt", new Fields("campaign_id")));
new ShuffleGrouping("ysbDeserializeBolt"));
builder.setBolt("ysbProjectionBolt", new EventProjectionBolt(), parallelism,
// new FieldsGrouping("ysbDeserializeBolt", new Fields("campaign_id")));
new ShuffleGrouping("ysbEventBolt"));
// builder.setSink("ysbSink", sink, 1,
// new ShuffleGrouping("ysbProjectionBolt"));
builder.setSink("ysbSink", sink, parallelism,
new ShuffleGrouping("ysbProjectionBolt"));
} catch (InvalidIDException e) {
e.printStackTrace();
}
builder.setGlobalScheduler(new SequentialScheduler());
return builder.createTopology();
}
@Override
public Logger getLogger() {
return LOG;
}
@Override
public String getConfigPrefix() {
return PREFIX;
}
}