diff --git a/stackexchange/nats/stackexchange_nats.go b/stackexchange/nats/stackexchange_nats.go index 63811cb..6b47695 100644 --- a/stackexchange/nats/stackexchange_nats.go +++ b/stackexchange/nats/stackexchange_nats.go @@ -300,10 +300,6 @@ func (exc *StackExchange) Publish(msg neffos.Message) bool { // Ask implements server Ask for nats. It blocks. func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) { - if !msg.IsWait(false) { - return response, neffos.ErrInvalidPayload - } - // for some reason we can't use the exc.publisher.Subscribe, // so create a new connection for subscription which will be terminated on message receive or timeout. subConn, err := exc.opts.Connect() @@ -328,13 +324,11 @@ func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token str return response, neffos.ErrWrite } - for { - select { - case <-ctx.Done(): - return response, ctx.Err() - case response = <-ch: - return response, response.Err - } + select { + case <-ctx.Done(): + return response, ctx.Err() + case response = <-ch: + return response, response.Err } } diff --git a/stackexchange/redis/stackexchange_redis.go b/stackexchange/redis/stackexchange_redis.go index f7954a2..7d8e0a1 100644 --- a/stackexchange/redis/stackexchange_redis.go +++ b/stackexchange/redis/stackexchange_redis.go @@ -227,27 +227,46 @@ func (exc *StackExchange) OnConnect(c *neffos.Conn) error { func (exc *StackExchange) Publish(msg neffos.Message) bool { // channel := exc.getMessageChannel(c.ID(), msg) channel := exc.getChannel(msg.Namespace, msg.Room, msg.To) - b := msg.Serialize() // neffos.Debugf("[%s] publish to channel [%s] the data [%s]\n", msg.FromExplicit, channel, string(b)) + err := exc.publish(channel, msg.Serialize()) + return err == nil +} + +func (exc *StackExchange) publish(channel string, b []byte) error { cmd := radix.FlatCmd(nil, "PUBLISH", channel, b) - err := exc.pool.Do(cmd) + return exc.pool.Do(cmd) +} + +// Ask implements the server Ask feature for redis. It blocks until response. +func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (response neffos.Message, err error) { + sub := radix.PersistentPubSub("", "", exc.connFunc) + msgCh := make(chan radix.PubSubMessage) + err = sub.Subscribe(msgCh, token) if err != nil { - return false + return } + defer sub.Close() - return true -} + if !exc.Publish(msg) { + return response, neffos.ErrWrite + } + + select { + case <-ctx.Done(): + err = ctx.Err() + case redisMsg := <-msgCh: + response = neffos.DeserializeMessage(nil, redisMsg.Message, false, false) + err = response.Err + } -// Ask TODO. -// Ask will implement the server Ask feature for redis. It will block until response. -func (exc *StackExchange) Ask(ctx context.Context, msg neffos.Message, token string) (neffos.Message, error) { - panic("Not Implemented Yet") // check tomorrow... I am too tired now. + return } -// NotifyAsk TODO. +// NotifyAsk notifies and unblocks a "msg" subscriber, called on a server connection's read when expects a result. func (exc *StackExchange) NotifyAsk(msg neffos.Message, token string) error { - panic("Not Implemented Yet") + msg.ClearWait() + return exc.publish(token, msg.Serialize()) } // Subscribe subscribes to a specific namespace,