-
Notifications
You must be signed in to change notification settings - Fork 247
channel inputs outputs
Go channel can be used to feed data into Glow, and read data out of Glow.
There is little limitation to the channels. They can be unbuffered or buffered. The element type can be any user defined struct containing simple types or recursive structs.
The data are sent between the driver program and the Glow distributed agents. So when the driver program and the Glow cluster are in different regions, you need to think twice to send large amount of data via these channels.
The channels is a powerful way to dynamically control the flow, or multiple flows.
For example, we can have a web server front end, send inputs to the Glow cluster, receive outputs from the Glow cluster, and send the results back to the user.
Or we can conditionally loop the flow until the outputs satisfy some specific condition.
Since the driver program need to handle inputs and outputs, and the task executors do not need these part of code, you will need to follow the recommended Glow Code Structure to put flow code into func init()
and add a line of flow.Ready()
to ensure the code is consistently initialized the same way on both the driver program and the executor program.
The Input channel is flexible with little limitations.
Sometimes, if you need to send large amount of data, you can just send the data locations, and let the Map() function to fetch the data themselves.
The type for output is a little different because channel can only contain one type of struct.
For simple cases, if the dataset contains only value of type "CustomizedValue", the output channel type can be just chan CustomizedValue
.
For other cases, the output channel element type can be an envelope struct containing all fields in the dataset. e.g., if the dataset has [CustomizedKey, CustomizedValue], the output channel type should be
chan struct {CustomizedKey, CustomizedValue}
Dataset | Dataset Example Type | Output Channel Type |
---|---|---|
Only has value | ExampleValue | chan ExampleValue |
key ~ value | [ExampleKey, ExampleValue] | chan struct {ExampleKey, ExampleValue} |
Join() result | [ExampleKey, LeftValue, RightValue] | chan struct {ExampleKey, LeftValue, RightValue} |
CoGroup() result | [ExampleKey, []LeftValue, []RightValue] | chan struct {ExampleKey, []LeftValue, []RightValue} |
GroupByKey() result | [ExampleKey, []ExampleValue] | chan struct {ExampleKey, []ExampleValue} |
ch := make(chan int)
f1 := flow.New()
source := f1.Channel(ch)
left := source.Map(func(t int) (int, int) {
return t, t * 2
})
right := source.Map(func(t int) (int, int) {
return t, t * 3
})
outChannel := make(chan struct {
X, Y, Z int
})
left.Join(right).AddOutput(outChannel)
outInt := make(chan int)
source.AddOutput(outInt)