-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams.py
executable file
·145 lines (129 loc) · 4.59 KB
/
streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Python-only implementation of streams.
#
# A Stream is a collection of ProcessingElements, each of which has a number of
# inputs and a single output. ProcessingElements are connected together to
# form streams that process data.
class ProcessingCode:
def __init__(this, code):
this.code = code
NO_OUTPUT = ProcessingCode(0)
CALL_ME_AGAIN = ProcessingCode(1)
class StreamAlreadyStartedException:
pass
class ProcessingElement:
"""
A single element of a processing stream. Extend these to form elements that
do things (see for example the simple Elements below).
Initialise your ProcessingElement with the desired number of inputs and
provide a definition of run() with the correct number of input parameters.
Make sure your run definition returns either a result, NO_OUTPUT (for no
result), or CALL_ME_AGAIN (which will call run() again with the next values
of the upstream streams).
Define functional to be true if the output of this element is constant for
constant inputs.
"""
def __init__(this, numInputs, functional=False):
this.inputs = [None] * numInputs
this.outputs = []
this.current = []
this.started = False
this.constant = False
def getCurrent(this, lid):
if this.current[lid] == -1:
return this.getNext(lid)
if this.outputs[this.current[lid]] is NO_OUTPUT:
return None
return this.outputs[this.current[lid]]
def getNext(this, lid):
# we call a function on this class called run. We provide
# each input as an argument, and we expect an output. Never return
# None! This will be flagged as an error. Instead use the values
# defined above.
this.current[lid] += 1
if len(this.outputs) == this.current[lid]:
result = CALL_ME_AGAIN
while result is CALL_ME_AGAIN:
result = this.run(*this.inputs)
this.outputs.append(result)
else:
result = this.outputs[this.current[lid]]
minOutput = min(this.current)
if minOutput > 0:
this.outputs = this.outputs[minOutput:]
this.started = True
this.current = map(lambda a: a - minOutput, this.current)
return result
def connectInput(this, input, inputSlot):
if input.started:
raise StreamAlreadyStartedException()
this.inputs[inputSlot] = input.registerOutput()
class ProcessingElementAsConstantOutput:
def __init__(this, value):
this.value = value
def getCurrent(this):
return this.value
def getNext(this):
return this.value
class ProcessingElementAsOutput:
def __init__(this, actualElement, lid):
this.actualElement = actualElement
this.lid = lid
def getCurrent(this):
"get the current value from this output. Do not call this without first calling getNext"
return this.actualElement.getCurrent(this.lid)
def getNext(this):
"get the next value from this output"
return this.actualElement.getNext(this.lid)
def registerOutput(this):
"""
stream.registerOutput() returns an iterator with getCurrent() and getNext()
methods. Each registered output behaves independently of the others.
"""
if this.constant:
if this.value is None:
this.value = ProcessingElementAsOutput(this, 0).getNext()
return ProcessingElementAsConstantOutput(this.value)
lid = len(this.current)
this.current.append(-1)
return this.ProcessingElementAsOutput(this, lid)
# Some very simple examples
class AddOffsetElement(ProcessingElement):
def __init__(this, offset):
ProcessingElement.__init__(this, 1)
this.offset = offset
def run(this, value):
return value.getNext() + this.offset
class AddTwoValuesElement(ProcessingElement):
def __init__(this):
ProcessingElement.__init__(this, 2)
def run(this, a, b):
return a.getNext() + b.getNext()
class ConstantElement(ProcessingElement):
def __init__(this, constant):
ProcessingElement.__init__(this, 0)
this.constantValue = constant
def run(this):
return this.constantValue
class IncrementingElement(ProcessingElement):
def __init__(this):
ProcessingElement.__init__(this, 0)
this.value = -1
def run(this):
this.value += 1
return this.value
# Some tests!
if __name__ == "__main__":
constant = ConstantElement(7)
addOffset = AddOffsetElement(4)
addOffset.connectInput(constant, 0)
iter = addOffset.registerOutput()
assert iter.getNext() == 11
assert iter.getNext() == 11
incr = IncrementingElement()
iter1 = incr.registerOutput()
iter2 = incr.registerOutput()
assert iter1.getNext() == 0
assert iter1.getNext() == 1
assert iter2.getNext() == 0
assert iter1.getNext() == 2
assert iter2.getNext() == 1