forked from mlartz/rflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
NOTES
187 lines (129 loc) · 4.73 KB
/
NOTES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
RFlow Manager
Components
Input Ports
Output Ports
Connections
Input Ports
Output Ports
rflow <config file>
figure out a work directory
make sure that it has the right subdirectories (can be overridden)
run tmp logs schemas components
Set up logging to logs/rflow.log
Load all schemas
Verify all component installation
Initialize components
Start components running and make sure that they "daemonize" correctly
- place pid files in deployment's run directory
Configure components via zmq
Daemonize self
class Component
def self.input_port
end
def self.output_port
end
attr_accessor :state
def initialize(config, run_directory)
end
def run
end
def configure
end
class PassThrough < Component
input_port [:in]
input_port :another_in
output_port :out
output_port :another_out
def initialize(config, run_directory)
# This will initialize the ports
super
# Do stuff to initialize component
end
end
Computation Requirements:
Initial startup with:
- management bus connection information
- group and instance UUID
- beacon interval
- run directory, containing
- PID files
- log dir + logs
- computation-specific configuration (conf dir)
Needs to process the following messages from mgmt bus:
- CONFIGURE (ports)
- RUN
- SHUTDOWN
Needs to send the following messages to mgmt bus:
- LOG
- BEACON (state machine of the below submessages)
- STARTED
- CONFIGURED
- RUNNING
- STOPPING
- STOPPED
- ERROR
On startup:
- listen to mgmt bus
- publish BEACON + state to mgmt bus every (beacon interval) seconds (default to 1 sec)
External Computations:
- Given (out-of-band) startup info (mgmt bus, UUIDs, beacon interval)
-
RFlow
- Will need a DB for config
- Initial startup will need to resolve all remaining outstanding items (ports, UUIDs, etc) and store in config DB
- MVC, Mongrel2-like?
Translate
- Need to add <associated type="objtype" name="myname"> where name attr can be used in later XML templates
----------------
Plugins:
an externally defined plugin needs access to all current data types, as well as being able to define its own and tell the system about that.
- necessary to tell system?
- need a protocol for defining schema transfer
- each message has attached schema
lib/rflow/message.rb
RFlow::Config
RFlow::Management
- Somewhere for external people to register new computations with running system
- computation says that its running and asks for Connection configuration
- how will it specify where in the workflow it wants to run????
RFlow::Message(complete on-the-wire Avro message format)
data_type, provenance, external_ids, empty, data (see below)
RFlow::Data::(various message data blocks)
RFlow::Computation
uuid, name, class, input_ports, output_ports
RFlow::Connection
encapsulates link knowlege and provides an API for sending and receiving
each computation will have one for each port
each computation will call into the connection to send (possibly via a Port object) and recieve
RFlow::Connection::AMQP
will manage connections to an AMQP server
RFlow::Connection::ZMQ
computation_a.output_port -> (connection.incoming -> connection.outgoing) -> computation_b.input_port
AMQP::Topic - responsible for setting up a topic -> queue binding
r.incoming = amqp connection, channel, vhost, login, password, topic
r.outgoing = amqp connection, channel, vhost, login, password, queue name
behavior -> n x m, "round-robin" among the connected outgoing
incoming behavior will need to set topic/key, uses the data type in the RFlow::Message
ZMQ::PubSub - device-less, responsible for assigning ip/port and assigning one client to bind the port
r.incoming = zmq connection string (tcp://ip:port), type pub
r.outgoing = zmq connection string (tcp://ip:port), type sub
behavior -> n x m, broadcast sending,
ZMQ::PushPull - device-less, responsible for assigning ip/port and assigning one client to bind the port
r.incoming = zmq connection string (tcp://ip:port), type push
r.outgoing = zmq connection string (tcp://ip:port), type pull
Startup
RFlow.run is the management process for the workflow
computations = config.computations.map do |c|
instantiate_computation(c)
# Check for errors here, which would be evident if a computation couldn't be found/created
# Just creating single process ruby objects here to check for errors
end
computations.each do |c|
c.configure # with what????
# Still single ruby process to set and deconflict all the configuration parameters
end
computations.each do |c|
c.run
end
listen_for_management_events_from_old_computations
listen_for_new_computation_registration