Skip to content

Commit

Permalink
Merge pull request #590 from libotony/subscribe-txpool
Browse files Browse the repository at this point in the history
add pending tx subscription API
  • Loading branch information
libotony committed Jul 10, 2023
2 parents fa1359d + 572416c commit 6b1c28d
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func New(
Mount(router, "/debug")
node.New(nw).
Mount(router, "/node")
subs := subscriptions.New(repo, origins, backtraceLimit)
subs := subscriptions.New(repo, origins, backtraceLimit, txPool)
subs.Mount(router, "/subscriptions")

if pprofOn {
Expand Down
70 changes: 70 additions & 0 deletions api/subscriptions/pending_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2023 The VeChainThor developers

// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>
package subscriptions

import (
"sync"

"github.com/vechain/thor/tx"
"github.com/vechain/thor/txpool"
)

type pendingTx struct {
txPool *txpool.TxPool
listeners map[chan *tx.Transaction]struct{}
mu sync.RWMutex
}

func newPendingTx(txPool *txpool.TxPool) *pendingTx {
p := &pendingTx{
txPool: txPool,
listeners: make(map[chan *tx.Transaction]struct{}),
}

return p
}

func (p *pendingTx) Subscribe(ch chan *tx.Transaction) {
p.mu.Lock()
defer p.mu.Unlock()

p.listeners[ch] = struct{}{}
}

func (p *pendingTx) Unsubscribe(ch chan *tx.Transaction) {
p.mu.Lock()
defer p.mu.Unlock()

delete(p.listeners, ch)
}

func (p *pendingTx) DispatchLoop(done <-chan struct{}) {
txCh := make(chan *txpool.TxEvent)
sub := p.txPool.SubscribeTxEvent(txCh)
defer sub.Unsubscribe()

for {
select {
case txEv := <-txCh:
if txEv.Executable == nil || !*txEv.Executable {
continue
}
p.mu.RLock()
func() {
for lsn := range p.listeners {
select {
case lsn <- txEv.Tx:
case <-done:
return
default: // broadcast in a non-blocking manner, so there's no guarantee that all subscriber receives it
}
}
}()
p.mu.RUnlock()
case <-done:
return
}
}
}
101 changes: 84 additions & 17 deletions api/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"github.com/vechain/thor/block"
"github.com/vechain/thor/chain"
"github.com/vechain/thor/thor"
"github.com/vechain/thor/tx"
"github.com/vechain/thor/txpool"
)

const txQueueSize = 20

type Subscriptions struct {
backtraceLimit uint32
repo *chain.Repository
upgrader *websocket.Upgrader
pendingTx *pendingTx
done chan struct{}
wg sync.WaitGroup
}
Expand All @@ -43,8 +48,8 @@ const (
pingPeriod = (pongWait * 7) / 10
)

func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32) *Subscriptions {
return &Subscriptions{
func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32, txpool *txpool.TxPool) *Subscriptions {
sub := &Subscriptions{
backtraceLimit: backtraceLimit,
repo: repo,
upgrader: &websocket.Upgrader{
Expand All @@ -62,8 +67,17 @@ func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32)
return false
},
},
done: make(chan struct{}),
pendingTx: newPendingTx(txpool),
done: make(chan struct{}),
}

sub.wg.Add(1)
go func() {
defer sub.wg.Done()

sub.pendingTx.DispatchLoop(sub.done)
}()
return sub
}

func (s *Subscriptions) handleBlockReader(w http.ResponseWriter, req *http.Request) (*blockReader, error) {
Expand Down Expand Up @@ -188,33 +202,63 @@ func (s *Subscriptions) handleSubject(w http.ResponseWriter, req *http.Request)
return utils.HTTPError(errors.New("not found"), http.StatusNotFound)
}

conn, err := s.upgrader.Upgrade(w, req, nil)
conn, closed, err := s.setupConn(w, req)
// since the conn is hijacked here, no error should be returned in lines below
if err != nil {
log.Debug("upgrade to websocket", "err", err)
return nil
}

err = s.pipe(conn, reader, closed)
s.closeConn(conn, err)
return nil
}

func (s *Subscriptions) handlePendingTransactions(w http.ResponseWriter, req *http.Request) error {
s.wg.Add(1)
defer s.wg.Done()

conn, closed, err := s.setupConn(w, req)
// since the conn is hijacked here, no error should be returned in lines below
if err != nil {
log.Debug("upgrade to websocket", "err", err)
return nil
}
defer s.closeConn(conn, err)

pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()

txCh := make(chan *tx.Transaction, txQueueSize)
s.pendingTx.Subscribe(txCh)
defer func() {
if err := conn.Close(); err != nil {
log.Debug("close websocket", "err", err)
}
s.pendingTx.Unsubscribe(txCh)
close(txCh)
}()

var closeMsg []byte
if err := s.pipe(conn, reader); err != nil {
closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())
} else {
closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "")
for {
select {
case tx := <-txCh:
err = conn.WriteJSON(&PendingTxIDMessage{ID: tx.ID()})
if err != nil {
return nil
}
case <-s.done:
return nil
case <-closed:
return nil
case <-pingTicker.C:
conn.WriteMessage(websocket.PingMessage, nil)
}
}
}

if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Debug("write close message", "err", err)
func (s *Subscriptions) setupConn(w http.ResponseWriter, req *http.Request) (*websocket.Conn, chan struct{}, error) {
conn, err := s.upgrader.Upgrade(w, req, nil)
if err != nil {
return nil, nil, err
}
return nil
}

func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
closed := make(chan struct{})
// start read loop to handle close event
s.wg.Add(1)
Expand All @@ -233,6 +277,28 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
}
}
}()

return conn, closed, nil
}

func (s *Subscriptions) closeConn(conn *websocket.Conn, err error) {
var closeMsg []byte
if err != nil {
closeMsg = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())
} else {
closeMsg = websocket.FormatCloseMessage(websocket.CloseGoingAway, "")
}

if err := conn.WriteMessage(websocket.CloseMessage, closeMsg); err != nil {
log.Debug("write close message", "err", err)
}

if err := conn.Close(); err != nil {
log.Debug("close websocket", "err", err)
}
}

func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader, closed chan struct{}) error {
ticker := s.repo.NewTicker()
pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()
Expand Down Expand Up @@ -316,4 +382,5 @@ func (s *Subscriptions) Mount(root *mux.Router, pathPrefix string) {
sub := root.PathPrefix(pathPrefix).Subrouter()

sub.Path("/{subject}").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handleSubject))
sub.Path("/txpool/pending").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions))
}
4 changes: 4 additions & 0 deletions api/subscriptions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,7 @@ type Beat2Message struct {
K uint8 `json:"k"`
Obsolete bool `json:"obsolete"`
}

type PendingTxIDMessage struct {
ID thor.Bytes32 `json:"id"`
}
3 changes: 1 addition & 2 deletions api/utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ func WrapHandlerFunc(f HandlerFunc) http.HandlerFunc {

// content types
const (
JSONContentType = "application/json; charset=utf-8"
OctetStreamContentType = "application/octet-stream"
JSONContentType = "application/json; charset=utf-8"
)

// ParseJSON parse a JSON object using strict mode.
Expand Down

0 comments on commit 6b1c28d

Please sign in to comment.