diff --git a/hub/hub.go b/hub/hub.go index 7ab491f..12a7986 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -12,84 +12,108 @@ import ( // Hub - Abstraction between message publishers & subscribers, // works as a multiplexer ( or router ) type Hub struct { - addr string - watcher *gaio.Watcher - pendingPublishers map[net.Conn]bool - pendingNewSubscribers map[net.Conn]bool - pendingExistingSubscribers map[net.Conn]bool - pendingUnsubscribers map[net.Conn]bool - enqueuedReadLock *sync.RWMutex - enqueuedRead map[net.Conn]*enqueuedRead - connectedSubscribers map[net.Conn]uint64 - index uint64 - subLock *sync.RWMutex - subscribers map[string]map[uint64]net.Conn - revLock *sync.RWMutex - revSubscribers map[uint64]map[string]bool - queueLock *sync.RWMutex - pendingQueue []*ops.Msg - ping chan struct{} - evict chan uint64 - Connected chan string - Disconnected chan string + addr string + watchersLock *sync.RWMutex + watchers map[uint]*watcher + watcherCount uint + connectedSubscribers map[net.Conn]*subInfo + connectedSubscribersLock *sync.RWMutex + index uint64 + subLock *sync.RWMutex + subscribers map[string]map[uint64]net.Conn + revLock *sync.RWMutex + revSubscribers map[uint64]map[string]bool + queueLock *sync.RWMutex + pendingQueue []*ops.Msg + ping chan struct{} + evict chan uint64 + Connected chan string + Disconnected chan string } func (h *Hub) Addr() string { return h.addr } -type enqueuedRead struct { - yes bool - buf []byte +type watcher struct { + eventLoop *gaio.Watcher + ongoingRead map[net.Conn]*readState + lock *sync.RWMutex +} + +type readState struct { + opcode ops.OP + envelopeRead bool + buf []byte +} + +type subInfo struct { + id uint64 + watcherId uint } // New - Creates a new instance of hub, ready to be used func New(ctx context.Context, addr string, cap uint64) (*Hub, error) { - watcher, err := gaio.NewWatcher() - if err != nil { - return nil, err + hub := Hub{ + watcherCount: 2, + watchersLock: &sync.RWMutex{}, + watchers: make(map[uint]*watcher), + connectedSubscribers: make(map[net.Conn]*subInfo), + connectedSubscribersLock: &sync.RWMutex{}, + index: 0, + subLock: &sync.RWMutex{}, + subscribers: make(map[string]map[uint64]net.Conn), + revLock: &sync.RWMutex{}, + revSubscribers: make(map[uint64]map[string]bool), + queueLock: &sync.RWMutex{}, + pendingQueue: make([]*ops.Msg, 0, cap), + ping: make(chan struct{}, cap), + evict: make(chan uint64, cap), + Connected: make(chan string, 1), + Disconnected: make(chan string, 1), } - hub := Hub{ - watcher: watcher, - pendingPublishers: make(map[net.Conn]bool), - pendingNewSubscribers: make(map[net.Conn]bool), - pendingExistingSubscribers: make(map[net.Conn]bool), - pendingUnsubscribers: make(map[net.Conn]bool), - enqueuedReadLock: &sync.RWMutex{}, - enqueuedRead: make(map[net.Conn]*enqueuedRead), - connectedSubscribers: make(map[net.Conn]uint64), - index: 0, - subLock: &sync.RWMutex{}, - subscribers: make(map[string]map[uint64]net.Conn), - revLock: &sync.RWMutex{}, - revSubscribers: make(map[uint64]map[string]bool), - queueLock: &sync.RWMutex{}, - pendingQueue: make([]*ops.Msg, 0, cap), - ping: make(chan struct{}, cap), - evict: make(chan uint64, cap), - Connected: make(chan string, 1), - Disconnected: make(chan string, 1), + var runWatcher = make(chan struct{}, hub.watcherCount) + var i uint + hub.watchersLock.Lock() + for ; i < hub.watcherCount; i++ { + w, err := gaio.NewWatcher() + if err != nil { + return nil, err + } + hub.watchers[i] = &watcher{ + eventLoop: w, + ongoingRead: make(map[net.Conn]*readState), + lock: &sync.RWMutex{}, + } + func(id uint) { + go hub.watch(ctx, id, runWatcher) + }(i) + } + hub.watchersLock.Unlock() + + startedOff := 0 + for range runWatcher { + startedOff++ + if startedOff >= int(hub.watcherCount) { + break + } } var ( - runListener = make(chan bool) - runWatcher = make(chan struct{}) - runProc = make(chan struct{}) - runEvict = make(chan struct{}) + runListener = make(chan bool) + runEvictor = make(chan struct{}) + runProcessor = make(chan struct{}) ) go hub.listen(ctx, addr, runListener) if !<-runListener { return nil, ops.ErrListenerNotStarted } - - go hub.watch(ctx, runWatcher) - go hub.process(ctx, runProc) - go hub.evictSubscribers(ctx, runEvict) - <-runWatcher - <-runProc - <-runEvict + go hub.process(ctx, runProcessor) + go hub.evictSubscribers(ctx, runEvictor) + <-runProcessor + <-runEvictor return &hub, nil } diff --git a/hub/listener.go b/hub/listener.go index 7119048..7f15d91 100644 --- a/hub/listener.go +++ b/hub/listener.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "net" + + "github.com/itzmeanjan/pub0sub/ops" ) func (h *Hub) listen(ctx context.Context, addr string, done chan bool) { @@ -24,6 +26,7 @@ func (h *Hub) listen(ctx context.Context, addr string, done chan bool) { h.addr = lis.Addr().String() done <- true + var nextWatcher uint for { select { @@ -42,12 +45,18 @@ func (h *Hub) listen(ctx context.Context, addr string, done chan bool) { h.Connected <- fmt.Sprintf("%s://%s", conn.RemoteAddr().Network(), conn.RemoteAddr().String()) } - h.enqueuedReadLock.Lock() buf := make([]byte, 5) - h.enqueuedRead[conn] = &enqueuedRead{yes: true, buf: buf} - h.enqueuedReadLock.Unlock() + nextWatcher = (nextWatcher + 1) % h.watcherCount + + h.watchersLock.RLock() + watcher := h.watchers[nextWatcher] + h.watchersLock.RUnlock() + + watcher.lock.Lock() + watcher.ongoingRead[conn] = &readState{buf: buf, opcode: ops.UNSUPPORTED} + watcher.lock.Unlock() - if err := h.watcher.Read(ctx, conn, buf); err != nil { + if err := watcher.eventLoop.Read(ctx, conn, buf); err != nil { log.Printf("[pub0sub] Error : %s\n", err.Error()) return } diff --git a/hub/processor.go b/hub/processor.go index 8836956..dfbc342 100644 --- a/hub/processor.go +++ b/hub/processor.go @@ -63,10 +63,15 @@ func (h *Hub) writeMessage(ctx context.Context, op *ops.OP, msg *ops.Msg) { continue } + h.connectedSubscribersLock.RLock() + defer h.connectedSubscribersLock.RUnlock() + for _, conn := range subs { - if err := h.watcher.Write(ctx, conn, buf.Bytes()); err != nil { - log.Printf("[pub0sub] Error : %s\n", err.Error()) - continue + if subInfo, ok := h.connectedSubscribers[conn]; ok { + if err := h.watchers[subInfo.watcherId].eventLoop.Write(ctx, conn, buf.Bytes()); err != nil { + log.Printf("[pub0sub] Error : %s\n", err.Error()) + continue + } } } } diff --git a/hub/reader.go b/hub/reader.go index ecacefd..cfe900b 100644 --- a/hub/reader.go +++ b/hub/reader.go @@ -10,7 +10,7 @@ import ( "github.com/xtaci/gaio" ) -func (h *Hub) handleRead(ctx context.Context, result gaio.OpResult) error { +func (h *Hub) handleRead(ctx context.Context, id uint, result gaio.OpResult) error { if result.Error != nil { return result.Error } @@ -21,29 +21,28 @@ func (h *Hub) handleRead(ctx context.Context, result gaio.OpResult) error { data := result.Buffer[:result.Size] - // actual message body sent by client & body - // is ready for consumption - // - // Same scenario in {1, 2, 3, 4}, handling different opcodes + h.watchersLock.RLock() + watcher := h.watchers[id] + h.watchersLock.RUnlock() - // 1 - if _, ok := h.pendingPublishers[result.Conn]; ok { - return h.handleMessagePublish(ctx, result.Conn, data[:]) - } + watcher.lock.RLock() + defer watcher.lock.RUnlock() - // 2 - if _, ok := h.pendingNewSubscribers[result.Conn]; ok { - return h.handleNewSubscription(ctx, result.Conn, data[:]) - } + if v, ok := watcher.ongoingRead[result.Conn]; ok && v.envelopeRead { + switch v.opcode { + case ops.PUB_REQ: + return h.handleMessagePublish(ctx, id, result.Conn, data[:]) - // 3 - if _, ok := h.pendingExistingSubscribers[result.Conn]; ok { - return h.handleUpdateSubscription(ctx, result.Conn, data[:]) - } + case ops.NEW_SUB_REQ: + return h.handleNewSubscription(ctx, id, result.Conn, data[:]) + + case ops.ADD_SUB_REQ: + return h.handleUpdateSubscription(ctx, id, result.Conn, data[:]) + + case ops.UNSUB_REQ: + return h.handleUnsubscription(ctx, id, result.Conn, data[:]) - // 4 - if _, ok := h.pendingUnsubscribers[result.Conn]; ok { - return h.handleUnsubscription(ctx, result.Conn, data[:]) + } } // Envelope sent by client @@ -58,33 +57,16 @@ func (h *Hub) handleRead(ctx context.Context, result gaio.OpResult) error { } // remembering when next time data is read from this - // connection, consider it as message payload read + // socket, consider it as message payload read // instead of message envelope read - switch op { - case ops.PUB_REQ: - h.pendingPublishers[result.Conn] = true + // + // Opcode also helps in understanding how to convert byte slice + // into structured message + watcher.ongoingRead[result.Conn].envelopeRead = true + watcher.ongoingRead[result.Conn].opcode = op - case ops.NEW_SUB_REQ: - h.pendingNewSubscribers[result.Conn] = true - - case ops.ADD_SUB_REQ: - h.pendingExistingSubscribers[result.Conn] = true - - case ops.UNSUB_REQ: - h.pendingUnsubscribers[result.Conn] = true - - } - - h.enqueuedReadLock.RLock() - defer h.enqueuedReadLock.RUnlock() - if enqueued, ok := h.enqueuedRead[result.Conn]; ok && enqueued.yes { - enqueued.yes = false - - buf := make([]byte, size) - return h.watcher.Read(ctx, result.Conn, buf) - } - - return ops.ErrIllegalRead + buf := make([]byte, size) + return watcher.eventLoop.Read(ctx, result.Conn, buf) default: return ops.ErrIllegalRead @@ -92,7 +74,7 @@ func (h *Hub) handleRead(ctx context.Context, result gaio.OpResult) error { } } -func (h *Hub) handleMessagePublish(ctx context.Context, conn net.Conn, data []byte) error { +func (h *Hub) handleMessagePublish(ctx context.Context, id uint, conn net.Conn, data []byte) error { // reading message from stream iStream := bytes.NewReader(data[:]) msg := new(ops.Msg) @@ -113,11 +95,12 @@ func (h *Hub) handleMessagePublish(ctx context.Context, conn net.Conn, data []by return err } - delete(h.pendingPublishers, conn) - return h.watcher.Write(ctx, conn, oStream.Bytes()) + h.watchersLock.RLock() + defer h.watchersLock.RUnlock() + return h.watchers[id].eventLoop.Write(ctx, conn, oStream.Bytes()) } -func (h *Hub) handleNewSubscription(ctx context.Context, conn net.Conn, data []byte) error { +func (h *Hub) handleNewSubscription(ctx context.Context, id uint, conn net.Conn, data []byte) error { // reading message from stream iStream := bytes.NewReader(data[:]) msg := new(ops.NewSubscriptionRequest) @@ -130,7 +113,9 @@ func (h *Hub) handleNewSubscription(ctx context.Context, conn net.Conn, data []b // keeping track of active subscriber, so that // when need can run eviction routine targeting // this subscriber ( unique id ) - h.connectedSubscribers[conn] = subId + h.connectedSubscribersLock.Lock() + h.connectedSubscribers[conn] = &subInfo{id: subId, watcherId: id} + h.connectedSubscribersLock.Unlock() // writing message into stream oStream := new(bytes.Buffer) @@ -143,11 +128,12 @@ func (h *Hub) handleNewSubscription(ctx context.Context, conn net.Conn, data []b return err } - delete(h.pendingNewSubscribers, conn) - return h.watcher.Write(ctx, conn, oStream.Bytes()) + h.watchersLock.RLock() + defer h.watchersLock.RUnlock() + return h.watchers[id].eventLoop.Write(ctx, conn, oStream.Bytes()) } -func (h *Hub) handleUpdateSubscription(ctx context.Context, conn net.Conn, data []byte) error { +func (h *Hub) handleUpdateSubscription(ctx context.Context, id uint, conn net.Conn, data []byte) error { // reading message from stream iStream := bytes.NewReader(data) msg := new(ops.AddSubscriptionRequest) @@ -168,11 +154,12 @@ func (h *Hub) handleUpdateSubscription(ctx context.Context, conn net.Conn, data return err } - delete(h.pendingExistingSubscribers, conn) - return h.watcher.Write(ctx, conn, oStream.Bytes()) + h.watchersLock.RLock() + defer h.watchersLock.RUnlock() + return h.watchers[id].eventLoop.Write(ctx, conn, oStream.Bytes()) } -func (h *Hub) handleUnsubscription(ctx context.Context, conn net.Conn, data []byte) error { +func (h *Hub) handleUnsubscription(ctx context.Context, id uint, conn net.Conn, data []byte) error { // reading message from stream iStream := bytes.NewReader(data) msg := new(ops.UnsubcriptionRequest) @@ -193,6 +180,7 @@ func (h *Hub) handleUnsubscription(ctx context.Context, conn net.Conn, data []by return err } - delete(h.pendingUnsubscribers, conn) - return h.watcher.Write(ctx, conn, oStream.Bytes()) + h.watchersLock.RLock() + defer h.watchersLock.RUnlock() + return h.watchers[id].eventLoop.Write(ctx, conn, oStream.Bytes()) } diff --git a/hub/watcher.go b/hub/watcher.go index 8c4d619..182c422 100644 --- a/hub/watcher.go +++ b/hub/watcher.go @@ -8,11 +8,15 @@ import ( "github.com/xtaci/gaio" ) -func (h *Hub) watch(ctx context.Context, done chan struct{}) { - close(done) +func (h *Hub) watch(ctx context.Context, id uint, done chan struct{}) { + // notifying that watcher started + done <- struct{}{} + h.watchersLock.RLock() + watcher := h.watchers[id] + h.watchersLock.RUnlock() defer func() { - if err := h.watcher.Close(); err != nil { + if err := watcher.eventLoop.Close(); err != nil { log.Printf("[pub0sub] Error : %s\n", err.Error()) } }() @@ -23,7 +27,7 @@ func (h *Hub) watch(ctx context.Context, done chan struct{}) { return default: - results, err := h.watcher.WaitIO() + results, err := watcher.eventLoop.WaitIO() if err != nil { log.Printf("[pub0sub] Error : %s\n", err.Error()) return @@ -33,46 +37,50 @@ func (h *Hub) watch(ctx context.Context, done chan struct{}) { switch res.Operation { case gaio.OpRead: - if err := h.handleRead(ctx, res); err != nil { + if err := h.handleRead(ctx, id, res); err != nil { // best effort mechanism, don't ever block if len(h.Disconnected) < cap(h.Disconnected) { h.Disconnected <- fmt.Sprintf("%s://%s", res.Conn.RemoteAddr().Network(), res.Conn.RemoteAddr().String()) } - if id, ok := h.connectedSubscribers[res.Conn]; ok { - h.evict <- id + h.connectedSubscribersLock.Lock() + if subInfo, ok := h.connectedSubscribers[res.Conn]; ok { + h.evict <- subInfo.id delete(h.connectedSubscribers, res.Conn) } + h.connectedSubscribersLock.Unlock() - h.enqueuedReadLock.Lock() - delete(h.enqueuedRead, res.Conn) - h.enqueuedReadLock.Unlock() + watcher.lock.Lock() + delete(watcher.ongoingRead, res.Conn) + watcher.lock.Unlock() - if err := h.watcher.Free(res.Conn); err != nil { + if err := watcher.eventLoop.Free(res.Conn); err != nil { log.Printf("[pub0sub] Error : %s\n", err.Error()) } } case gaio.OpWrite: - if err := h.handleWrite(ctx, res); err != nil { + if err := h.handleWrite(ctx, id, res); err != nil { // best effort mechanism, don't ever block if len(h.Disconnected) < cap(h.Disconnected) { h.Disconnected <- fmt.Sprintf("%s://%s", res.Conn.RemoteAddr().Network(), res.Conn.RemoteAddr().String()) } - if id, ok := h.connectedSubscribers[res.Conn]; ok { - h.evict <- id + h.connectedSubscribersLock.Lock() + if subInfo, ok := h.connectedSubscribers[res.Conn]; ok { + h.evict <- subInfo.id delete(h.connectedSubscribers, res.Conn) } + h.connectedSubscribersLock.Unlock() - h.enqueuedReadLock.Lock() - delete(h.enqueuedRead, res.Conn) - h.enqueuedReadLock.Unlock() + watcher.lock.Lock() + delete(watcher.ongoingRead, res.Conn) + watcher.lock.Unlock() - if err := h.watcher.Free(res.Conn); err != nil { + if err := watcher.eventLoop.Free(res.Conn); err != nil { log.Printf("[pub0sub] Error : %s\n", err.Error()) } diff --git a/hub/writer.go b/hub/writer.go index 18a0012..9ca808a 100644 --- a/hub/writer.go +++ b/hub/writer.go @@ -3,20 +3,24 @@ package hub import ( "context" + "github.com/itzmeanjan/pub0sub/ops" "github.com/xtaci/gaio" ) -func (h *Hub) handleWrite(ctx context.Context, result gaio.OpResult) error { +func (h *Hub) handleWrite(ctx context.Context, id uint, result gaio.OpResult) error { if result.Error != nil { return result.Error } - h.enqueuedReadLock.RLock() - defer h.enqueuedReadLock.RUnlock() + watcher := h.watchers[id] + watcher.lock.RLock() + defer watcher.lock.RUnlock() - if enqueued, ok := h.enqueuedRead[result.Conn]; ok && !enqueued.yes { - enqueued.yes = true - return h.watcher.Read(ctx, result.Conn, enqueued.buf) + if ongoing, ok := watcher.ongoingRead[result.Conn]; ok && ongoing.envelopeRead { + ongoing.envelopeRead = false + ongoing.opcode = ops.UNSUPPORTED + + return watcher.eventLoop.Read(ctx, result.Conn, ongoing.buf) } return nil diff --git a/orderliness_test.go b/orderliness_test.go index 3ca3ecc..cddf576 100644 --- a/orderliness_test.go +++ b/orderliness_test.go @@ -33,8 +33,8 @@ func TestDeliveryOrderliness(t *testing.T) { capacity := uint64(256) topic_1 := "topic_1" topics := []string{topic_1} - end := uint64(1_000_000) - delay := time.Duration(100) * time.Millisecond + end := uint64(10_000) + delay := 100 * time.Millisecond ctx, cancel := context.WithCancel(context.Background()) h, err := hub.New(ctx, addr, capacity)