Skip to content

Commit

Permalink
Merge pull request #58 from weni-ai/tweaks-msg-delivery-client-conn
Browse files Browse the repository at this point in the history
Tweaks msg delivery client conn
  • Loading branch information
rasoro authored May 17, 2024
2 parents 633c508 + 8d41276 commit 64f4fa9
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 112 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ jobs:
if: success()
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: true
fail_ci_if_error: false
17 changes: 2 additions & 15 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ilhasoft/wwcs/pkg/history"
"github.com/ilhasoft/wwcs/pkg/metric"
"github.com/ilhasoft/wwcs/pkg/queue"
"github.com/ilhasoft/wwcs/pkg/tasks"
"github.com/ilhasoft/wwcs/pkg/websocket"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -77,14 +76,6 @@ func main() {

queueConn := queue.OpenConnection(queueConfig.Tag, rdb, nil)
defer queueConn.Close()
qout := queueConn.OpenQueue("outgoing")
qoutRetry := queueConn.OpenQueue("outgoing-retry")
qoutRetry.SetPrefetchLimit(queueConfig.ConsumerPrefetchLimit)
qoutRetry.SetPollDuration(time.Duration(queueConfig.RetryPollDuration) * time.Millisecond)
qout.SetPushQueue(qoutRetry)

outQueueConsumer := queue.NewConsumer(qout)
outRetryQueueConsumer := queue.NewConsumer(qoutRetry)

metrics, err := metric.NewPrometheusService()
if err != nil {
Expand All @@ -94,14 +85,10 @@ func main() {
mdb := db.NewDB()
histories := history.NewService(history.NewRepo(mdb, config.Get().DB.ContextTimeout))

clientM := websocket.NewClientManager(rdb)
clientM := websocket.NewClientManager(rdb, int(queueConfig.ClientTTL))

app := websocket.NewApp(websocket.NewPool(), qout, rdb, metrics, histories, clientM)
app := websocket.NewApp(websocket.NewPool(), rdb, metrics, histories, clientM, queueConn)
app.StartConnectionsHeartbeat()

outQueueConsumer.StartConsuming(5, tasks.NewTasks(app).SendMsgToExternalService)
outRetryQueueConsumer.StartConsuming(5, tasks.NewTasks(app).SendMsgToExternalService)

websocket.SetupRoutes(app)

queueConn.NewCleaner()
Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type RedisQueue struct {
Timeout int64 `default:"15" env:"WWC_REDIS_TIMEOUT"`
MaxRetries int64 `default:"3" env:"WWC_REDIS_MAX_RETRIES"`
RetentionLimit int64 `default:"12" env:"WWC_REDIS_QUEUE_RETENTION_LIMIT"`
ClientTTL int64 `default:"12" env:"WWC_REDIS_CLIENT_TTL"`
HealthcheckTimeout int64 `default:"10" env:"WWC_REDIS_HEALTHCHECK_TIMEOUT"`
}

type DB struct {
Name string `default:"weni-webchat" env:"WWC_DB_NAME"`
URI string `default:"mongodb://admin:admin@localhost:27017/" env:"WWC_DB_URI"`
ContextTimeout time.Duration `default:"15" env:"WWC_DB_CONTEXT_TIMEOUT"`
Name string `default:"weni-webchat" env:"WWC_DB_NAME"`
URI string `default:"mongodb://admin:admin@localhost:27017/" env:"WWC_DB_URI"`
ContextTimeout time.Duration `default:"15" env:"WWC_DB_CONTEXT_TIMEOUT"`
HealthcheckTimeout int64 `default:"15" env:"WWC_DB_HEALTHCHECK_TIMEOUT"`
}

// Get all configs from env vars or config file
Expand Down
18 changes: 12 additions & 6 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ var ttDefaultConfigs = Configuration{
Timeout: 15,
MaxRetries: 3,
RetentionLimit: 12,
ClientTTL: 12,
HealthcheckTimeout: 10,
},
SessionTypeToStore: "remote",
DB: DB{
Name: "weni-webchat",
URI: "mongodb://admin:admin@localhost:27017/",
ContextTimeout: 15,
Name: "weni-webchat",
URI: "mongodb://admin:admin@localhost:27017/",
ContextTimeout: 15,
HealthcheckTimeout: 15,
},
}

Expand All @@ -58,12 +61,15 @@ var ttEnvConfigs = Configuration{
Timeout: 15,
MaxRetries: 3,
RetentionLimit: 12,
ClientTTL: 12,
HealthcheckTimeout: 10,
},
SessionTypeToStore: "remote",
DB: DB{
Name: "webchat-db",
URI: "mongodb://4DM1N:P455W0RD@localhost:27017",
ContextTimeout: 15,
Name: "webchat-db",
URI: "mongodb://4DM1N:P455W0RD@localhost:27017",
ContextTimeout: 15,
HealthcheckTimeout: 15,
},
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)

func NewDB() *mongo.Database {
Expand All @@ -23,7 +22,7 @@ func NewDB() *mongo.Database {
panic(err.Error())
}

if err := connection.Ping(ctx, readpref.Primary()); err != nil {
if err := connection.Ping(ctx, nil); err != nil {
log.Error("fail to ping MongoDB", err.Error())
panic(err.Error())
} else {
Expand Down
38 changes: 19 additions & 19 deletions pkg/websocket/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,37 @@ import (

// App encapsulates application with resources.
type App struct {
ClientPool *ClientPool
OutgoingQueue queue.Queue
RDB *redis.Client
Metrics *metric.Service
Histories history.Service
ClientManager ClientManager
ClientPool *ClientPool
RDB *redis.Client
Metrics *metric.Service
Histories history.Service
ClientManager ClientManager
QueueConnectionManager queue.Connection
}

// Create new App instance.
func NewApp(pool *ClientPool, oq queue.Queue, rdb *redis.Client, metrics *metric.Service, histories history.Service, clientM ClientManager) *App {
func NewApp(pool *ClientPool, rdb *redis.Client, metrics *metric.Service, histories history.Service, clientM ClientManager, qconnM queue.Connection) *App {
return &App{
ClientPool: pool,
OutgoingQueue: oq,
RDB: rdb,
Metrics: metrics,
Histories: histories,
ClientManager: clientM,
ClientPool: pool,
RDB: rdb,
Metrics: metrics,
Histories: histories,
ClientManager: clientM,
QueueConnectionManager: qconnM,
}
}

func (a *App) StartConnectionsHeartbeat() error {
go func() {
for range time.Tick(time.Second * ClientTTL / 2) {
clients := a.ClientPool.GetClients()
for range time.Tick(time.Second * time.Duration(a.ClientManager.DefaultClientTTL()) / 2) {
clientsKeys := a.ClientPool.GetClientsKeys()
pipe := a.RDB.Pipeline()
for ck := range clients {
for _, ck := range clientsKeys {
clientConnectionKey := ClientConnectionKeyPrefix + ck
pipe.Expire(context.Background(), clientConnectionKey, time.Second*ClientTTL)
pipe.Expire(context.Background(), clientConnectionKey, time.Second*time.Duration(a.ClientManager.DefaultClientTTL()))
}
if len(clients) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*ClientTTL/2)
if len(clientsKeys) > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.ClientManager.DefaultClientTTL()))
_, err := pipe.Exec(ctx)
if err != nil {
log.Error(err)
Expand Down
36 changes: 12 additions & 24 deletions pkg/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,6 @@ func (c *Client) startQueueConsuming() error {
c.Conn.Close()
return
}

if c.Histories != nil {
err := c.SaveHistory(DirectionIn, incomingPayload.Message)
if err != nil {
log.Error(err)
}
}
})
return nil
}
Expand Down Expand Up @@ -448,24 +441,10 @@ func (c *Client) Redirect(payload OutgoingPayload, to postJSON, app *App) error
return nil
}

body, err := to(c.Callback, presenter)
_, err = to(c.Callback, presenter)
if err != nil {
if body == nil {
return err
}
if app.OutgoingQueue != nil {
sJob := OutgoingJob{
URL: c.Callback,
Payload: presenter,
}
sjm, err := json.Marshal(sJob)
if err != nil {
return err
}
if err = app.OutgoingQueue.PublishEX(queue.KeysExpiration, string(sjm)); err != nil {
return err
}
}
log.Error(err)
return err
}
if messageType == "text" || messageType == "image" || messageType == "video" || messageType == "audio" || messageType == "file" && app != nil {
if app.Metrics != nil {
Expand All @@ -484,6 +463,7 @@ func (c *Client) Redirect(payload OutgoingPayload, to postJSON, app *App) error
err := c.SaveHistory(DirectionOut, presenter.Message)
if err != nil {
log.Error(err)
return err
}
}
}
Expand All @@ -508,6 +488,14 @@ func (c *Client) SaveHistory(direction Direction, msg Message) error {
return c.Histories.Save(hmsg)
}

func SaveHistory(historyService history.Service, clientID string, direction Direction, msg Message, channelUUID string) error {
if channelUUID == "" {
return errors.New("contact channelUUID is empty")
}
hmsg := NewHistoryMessagePayload(direction, clientID, channelUUID, msg)
return historyService.Save(hmsg)
}

func (c *Client) setupClientInfo(payload OutgoingPayload) error {
c.ID = payload.From
c.Callback = payload.Callback
Expand Down
20 changes: 10 additions & 10 deletions pkg/websocket/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ var ttParsePayload = []struct {
func TestParsePayload(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 3})
defer rdb.FlushAll(context.TODO())
cm := NewClientManager(rdb)
app := NewApp(NewPool(), nil, rdb, nil, nil, cm)
cm := NewClientManager(rdb, 4)
app := NewApp(NewPool(), rdb, nil, nil, cm, nil)
client, ws, s := newTestClient(t)
defer client.Conn.Close()
defer ws.Close()
Expand Down Expand Up @@ -111,8 +111,8 @@ var ttCloseSession = []struct {
func TestCloseSession(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 3})
defer rdb.FlushAll(context.TODO())
cm := NewClientManager(rdb)
app := NewApp(NewPool(), nil, rdb, nil, nil, cm)
cm := NewClientManager(rdb, 4)
app := NewApp(NewPool(), rdb, nil, nil, cm, nil)
conn := NewOpenConnection(t)

client := &Client{
Expand Down Expand Up @@ -212,8 +212,8 @@ var ttClientRegister = []struct {
func TestClientRegister(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 3})
defer rdb.FlushAll(context.TODO())
cm := NewClientManager(rdb)
app := NewApp(NewPool(), nil, rdb, nil, nil, cm)
cm := NewClientManager(rdb, 4)
app := NewApp(NewPool(), rdb, nil, nil, cm, nil)
var poolSize int

client, ws, s := newTestClient(t)
Expand Down Expand Up @@ -430,8 +430,8 @@ func toTest(url string, data interface{}) ([]byte, error) {
func TestRedirect(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 3})
defer rdb.FlushAll(context.TODO())
cm := NewClientManager(rdb)
app := NewApp(NewPool(), nil, rdb, nil, nil, cm)
cm := NewClientManager(rdb, 4)
app := NewApp(NewPool(), rdb, nil, nil, cm, nil)
c, ws, s := newTestClient(t)
defer c.Conn.Close()
defer ws.Close()
Expand Down Expand Up @@ -618,8 +618,8 @@ var tcGetHistory = []struct {
func TestGetHistory(t *testing.T) {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 3})
defer rdb.FlushAll(context.TODO())
cm := NewClientManager(rdb)
_ = NewApp(NewPool(), nil, rdb, nil, nil, cm)
cm := NewClientManager(rdb, 4)
_ = NewApp(NewPool(), rdb, nil, nil, cm, nil)
client, ws, s := newTestClient(t)
defer client.Conn.Close()
defer ws.Close()
Expand Down
30 changes: 16 additions & 14 deletions pkg/websocket/clientmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
const (
// ClientConnectionKeyPrefix is the prefix of every key in redis for client connection, key example "client:foo_123"
ClientConnectionKeyPrefix = "client:"
// ClientTTL is the TTL expiration for client connection
ClientTTL = 4
)

// ConnectedClient represents the struct of a client connected with the main infos
Expand All @@ -35,21 +33,23 @@ type ClientManager interface {
GetConnectedClient(string) (*ConnectedClient, error)
AddConnectedClient(ConnectedClient) error
RemoveConnectedClient(string) error
UpdateTTL(string, int) (bool, error)
UpdateClientTTL(string, int) (bool, error)
DefaultClientTTL() int
}

type clientManager struct {
rdb *redis.Client
rdb *redis.Client
clientTTL int
}

// NewClientManager return a instance of a client manager that uses redis client for persistence
func NewClientManager(redis *redis.Client) ClientManager {
return &clientManager{rdb: redis}
func NewClientManager(redis *redis.Client, clientTTL int) ClientManager {
return &clientManager{rdb: redis, clientTTL: clientTTL}
}

// GetConnectedClient returns the ConnectedClient searched by its clientID
func (m *clientManager) GetConnectedClient(clientID string) (*ConnectedClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.clientTTL))
defer cancel()
result, err := m.rdb.Get(ctx, ClientConnectionKeyPrefix+clientID).Result()
if err == redis.Nil {
Expand All @@ -67,7 +67,7 @@ func (m *clientManager) GetConnectedClient(clientID string) (*ConnectedClient, e

// GetConnectedClients returns a slice of connected clients keys
func (m *clientManager) GetConnectedClients() ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.clientTTL))
defer cancel()
ccs, _, err := m.rdb.Scan(ctx, 0, ClientConnectionKeyPrefix+"*", 0).Result()
if err != nil {
Expand All @@ -78,9 +78,9 @@ func (m *clientManager) GetConnectedClients() ([]string, error) {

// AddConnectedClient add a client connection from ConnectedClient with a ttl defined by clientTTL constant
func (m *clientManager) AddConnectedClient(client ConnectedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.clientTTL))
defer cancel()
_, err := m.rdb.Set(ctx, ClientConnectionKeyPrefix+client.ID, client, time.Second*ClientTTL).Result()
_, err := m.rdb.Set(ctx, ClientConnectionKeyPrefix+client.ID, client, time.Second*time.Duration(m.clientTTL)).Result()
if err != nil {
return err
}
Expand All @@ -89,7 +89,7 @@ func (m *clientManager) AddConnectedClient(client ConnectedClient) error {

// RemoveConnectedClient removes the connected client by its clientID
func (m *clientManager) RemoveConnectedClient(clientID string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.clientTTL))
defer cancel()
_, err := m.rdb.Del(ctx, ClientConnectionKeyPrefix+clientID).Result()
if err != nil {
Expand All @@ -98,9 +98,11 @@ func (m *clientManager) RemoveConnectedClient(clientID string) error {
return nil
}

// UpdateTTL updates key expiration
func (m *clientManager) UpdateTTL(clientID string, expiration int) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
// UpdateClientTTL updates key expiration
func (m *clientManager) UpdateClientTTL(clientID string, expiration int) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.clientTTL))
defer cancel()
return m.rdb.Expire(ctx, clientID, time.Second*time.Duration(expiration)).Result()
}

func (m *clientManager) DefaultClientTTL() int { return m.clientTTL }
4 changes: 2 additions & 2 deletions pkg/websocket/clientmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestClientManager(t *testing.T) {
rdbOptions, err := redis.ParseURL(config.Get().RedisQueue.URL)
assert.NoError(t, err)
rdb := redis.NewClient(rdbOptions)
cm := NewClientManager(rdb)
cm := NewClientManager(rdb, 4)

newClientID := "foo_id_123"
newClient := ConnectedClient{ID: newClientID}
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestClientManager(t *testing.T) {

err = cm.AddConnectedClient(newClient)
assert.NoError(t, err)
time.Sleep(time.Second * ClientTTL)
time.Sleep(time.Second * time.Duration(cm.DefaultClientTTL()))

client, err = cm.GetConnectedClient(newClient.ID)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 64f4fa9

Please sign in to comment.