Skip to content

Commit

Permalink
Work on channels service event processing. (Broken)
Browse files Browse the repository at this point in the history
  • Loading branch information
patternspandemic committed Aug 23, 2018
1 parent 20a0941 commit 90685be
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
11 changes: 5 additions & 6 deletions examples/basic/main.pony
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,21 @@ actor Main is Reactor[None]

fun ref init() =>
/* Non-registered Welcomer */
let welcomer = Welcomer(system, None, env.out)
welcomer << "Ponylang"
welcomer << "Reactors"
// let welcomer = Welcomer(system, None, env.out)
// welcomer << "Ponylang"
// welcomer << "Reactors"

/* Registered Welcomer * /
/* Registered Welcomer */
let conn = open[(ChannelReservation | None)]()
channels() << ChannelReserve(conn.channel, "welcomer")
conn.events.on_event({
(res: (ChannelReservation | None), hint: OptionalEventHint) =>
match res
| let cr: ChannelReservation =>
let welcomer = Welcomer(system, cr, env.out)
welcomer << "Ponylang"
welcomer << "Reserved Ponylang"
| None =>
env.out.print("Denied 'welcomer' reservation")
end
conn.seal()
})
*/
7 changes: 5 additions & 2 deletions reactors/reactor.pony
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ReactorState[T: Any #share]


interface tag ReactorKind
// TODO: ReactorKind.name - Integrate with reserved name of a reactor?
fun tag name(): String
be _supplant_channels_service(channels_channel: Channel[ChannelsEvent] val)

Expand Down Expand Up @@ -103,7 +104,7 @@ trait tag Reactor[E: Any #share] is ReactorKind
=>
""" Open another connector for use by this reactor. """
let channel_tag: ChannelTag = ChannelTag
// Create a partially applied version of the `_muxed_sink` with the
// Create a partially applied version of `_muxed_sink` with the
// `channel_tag` uniquely identifying this connector's channel.
let pa_muxed: {(C)} val = recover val this~_muxed_sink[C](channel_tag) end
// Build the connector.
Expand Down Expand Up @@ -186,8 +187,10 @@ trait tag Reactor[E: Any #share] is ReactorKind
rs.channels_service = channels_channel
rs.received_channels_channel = true
if rs.register_main_channel then
match main().reservation
// match main().reservation
match rs.reservation
| let cr: ChannelReservation =>
Debug.out("Sent ChannelRegister for " + name() + "'s main channel'")
channels() << ChannelRegister(cr, main().channel)
end
end
Expand Down
36 changes: 28 additions & 8 deletions reactors/services.pony
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,12 @@ type ChannelsEvent is
class val ChannelReservation
""""""
let reserved_key: (String, String)
new val create(
reactor_name': String,
channel_name': String = "main")
new val create(key: (String, String))
// reactor_name': String,
// channel_name': String = "main")
=>
reserved_key = (reactor_name', channel_name')
// reserved_key = (reactor_name', channel_name')
reserved_key = key

// TODO: Channels service
//- Give it the responsibility to lazily create services on demand. If any reactor awaits a channel that describes a reserved standard or custom? service, instantiate that reactor service and provide it. (Replaces ReactorSystemProxy, system() call with regular channel requests.) The Channels channel should be preemptively provided to all ReactorState, given its importance, perhaps via Promise from the ReactorSystem.
Expand Down Expand Up @@ -131,6 +132,24 @@ actor Channels is (Service & Reactor[ChannelsEvent])
fun ref reactor_state(): ReactorState[ChannelsEvent] => _reactor_state
fun tag _is_channels_service(): Bool => true

fun ref _reserve_channel(ev_reserve: ChannelReserve) =>
// Build a tuple key out of the reactor, channel names for which the
// channel should be mapped to.
let key = (ev_reserve.reactor_name, ev_reserve.channel_name)
if _channel_map.contains(key) then
// A channel or reservation is already mapped. Deny the requested reserve.
ev_reserve.reply_channel << None
end
/* else
// The mapping is available.
let reservation = ChannelReservation(key)
// Map the key to the reservation until a ChannelRegister event attempts
// to register a channel with the reservation as its authority to do so.
_channel_map(key) = reservation
// Reply with the reservation
ev_reserve.reply_channel << reservation
end
*/
fun ref init() =>
match _system
| let system: ReactorSystem tag =>
Expand All @@ -149,13 +168,14 @@ actor Channels is (Service & Reactor[ChannelsEvent])
// - Then will likely need to reply through the event itself, only it knows chan type?
// i.e. get.reply(_channel_map((get.reactor_name,get.channel_name))?) which will cast subtype to `E`
// TODO: Channels event handling - delegate to funs
let pa_reserve_channel = this~_reserve_channel()
main().events.on_event({ref
(event: ChannelsEvent, hint: OptionalEventHint) =>
match event
| let reserve: ChannelReserve => None //reserve_channel(reserve)
| let register: ChannelRegister => None //register_channel(register)
| let get: ChannelGet[(Any val | Any tag)] => None //get_channel(get)
| let await: ChannelAwait[(Any val | Any tag)] => None //await_channel(await)
| let ev_reserve: ChannelReserve => pa_reserve_channel(ev_reserve)
| let ev_register: ChannelRegister => None //register_channel(register)
| let ev_get: ChannelGet[(Any val | Any tag)] => None //get_channel(get)
| let ev_await: ChannelAwait[(Any val | Any tag)] => None //await_channel(await)
// | let get: ChannelGet[Any val] => None //get_channel(get)
// | let await: ChannelAwait[Any val] => None //await_channel
end
Expand Down

0 comments on commit 90685be

Please sign in to comment.