-
Notifications
You must be signed in to change notification settings - Fork 60
Channels
In Stackless, a channel object is used for bidirectional communication between tasklets. Channels do not contain data themselves, but only transmit python objects between a sender and the next tasklet in the receiving queue.
By sending on a channel, a tasklet that is waiting to receive on that channel is resumed. If there is no waiting receiver, the sender is suspended (blocked) into the channel's queue. By receiving from a channel, a tasklet that is waiting to send on that channel is resumed. If there is no waiting sender, the receiver is suspended into the channel's queue.
Multiple tasklets sending or recieving on the same channel are kept track of in the channel's queue. The number of tasklets in the queue can be found using the channel's "balance" property. The value corresponds to the number of waiting tasklets. A positive value indicates a queue of tasklets waiting to send; receiving tasklets are signaled by a negative value, and a value of zero indicates an idle channel.
Following are basic channel examples. For more advanced derivations from the channels, please see `ChannelExamples`:trac:.
The simplest way to create a stackless channel is by calling
the class stackless.channel
:
import stackless
ch = stackless.channel()
Now that we have a channel, we have to use it for something.
Lets say we want to send the value "foo"
on our new channel:
import stackless
ch = stackless.channel()
ch.send("foo")
Now please note that if we send through a channel that has nothing waiting to receive, we stop the current tasklet from running. This includes the main tasklet so if we ran the code as is we would receive an error.
This brings us to the next step, getting data from a channel. Getting data from a channel is almost exactly the same as sending it. Lets say we want to get a value from our channel and print it:
import stackless
ch = stackless.channel()
print ch.receive()
Again, the same caveat applies about receiving. Since nothing s being sent on our channel, this code would wait forever for something to be sent.
In order to make this work, we will need to use tasklets. For an explanation of them see the tasklets page. For now it suffices to say that a function created as a tasklet runs independently and thus if we use them we won't be getting errors:
import stackless
def Sending(channel):
print "sending"
channel.send("foo")
def Receiving(channel):
print "receiving"
print channel.receive()
ch = stackless.channel()
task = stackless.tasklet(Sending)(ch)
task2 = stackless.tasklet(Receiving)(ch)
stackless.run()
When this code is run we get the following result:
sending receiving foo
You can also send and receive sequences (iterators) over a channel:
import stackless
def SendingSequence(channel, sequence):
print "sending"
channel.send_sequence(sequence)
def ReceivingSequence(channel):
for item in channel:
print "receiving"
print item
ch = stackless.channel()
task = stackless.tasklet(SendingSequence)(ch, ['a','b','c'])
task2 = stackless.tasklet(ReceivingSequence)(ch)
stackless.run()
When this code is run we get the following result:
sending receiving a receiving b receiving c
At this point, task2
is still running, waiting on the
channel for more items. To let it know that it's done, you
have to send a StopIteration
exception across the channel:
import stackless
from exceptions import StopIteration
def SendingSequence(channel, sequence):
print "sending"
channel.send_sequence(sequence)
channel.send_exception(StopIteration)
def ReceivingSequence(channel):
for item in channel:
print "receiving"
print item
print "done receiving"
ch = stackless.channel()
task = stackless.tasklet(SendingSequence)(ch, ['a','b','c'])
task2 = stackless.tasklet(ReceivingSequence)(ch)
stackless.run()
And now it should output:
sending receiving a receiving b receiving c done receiving
And that concludes sending and receiving via channels.
In stackless, the balance of a channel is how many tasklets are waiting to send or receive on it. When we first create a channel nothing is happening so the code:
import stackless
ch = stackless.channel()
print "Channel balance is ", ch.balance
displays:
Channel balance is 0
Now if we borrow our sending tasklet from above:
import stackless
def Sending(channel):
print "sending"
channel.send("foo")
ch = stackless.channel()
task = stackless.tasklet(Sending)(ch)
stackless.run()
print "Channel balance is ", ch.balance
We get the following result:
sending Channel balance is 1
Which tells us that one tasklet is waiting to send.
If we use our receiving tasklet we will get the opposite result, so:
import stackless
def Receiving(channel):
print "receiving"
print channel.receive()
ch = stackless.channel()
task2 = stackless.tasklet(Receiving)(ch)
stackless.run()
print "Channel balance is ", ch.balance
displays:
receiving Channel balance is -1
Q: What topologies are supported by Channels --littldo, Thu, 15 Apr 2004 03:07:41 +0200
Is a channel a point-to-point connection or can other topologies be supported?
Race condition in extension code? --warnes, Sat, 03 Sep 2005 06:09:43 +0200
It looks to me like both Nonblocking Channel and Broadcast Channel contain a race condition if interrupted between checking the balance and performing the send/recieve.
Race condition --rmtew, Fri, 09 Dec 2005 07:21:01 +0100
Perhaps, but uncertainties like that are why I always use the cooperative scheduling of Stackless, rather than the preemptive scheduling. In which case, there is no chance of a race condition.
A: What topologies are supported by Channels --rmtew, Fri, 09 Dec 2005 07:23:46 +0100
The most common use of channels in my experience is point-to-point where a tasklet is stalled until another awakens by sending it a result. But it is easy enough to build a queue on top of channels where multiple tasklets wait for a resource to load and then when it is loaded, the channel is sent to until all senders are removed. Is this the sort of thing meant?
Q: receive() and execution queue --garo, Thu, 23 Mar 2006 11:01:24 +0100
Does calling receive()
remove the calling tasklet from the execution/runnable queue if the receive()
blocks?
A: receive() and execution queue --BBuco, Sat, 08 Apr 2006 21:35:28 +0200
Yes, the tasklet is removed until it is unblocked when something is sent through that channel.