Skip to content

Commit

Permalink
update targets
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Jun 28, 2021
1 parent 1c7343d commit 2c37a5c
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 1,029 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/kr/pty v1.1.8 // indirect
github.com/kubemq-hub/builder v0.7.2
github.com/kubemq-hub/ibmmq-sdk v0.3.8
github.com/kubemq-io/kubemq-go v1.7.0
github.com/kubemq-io/kubemq-go v1.7.2
github.com/labstack/echo/v4 v4.1.17
github.com/lib/pq v1.9.0
github.com/minio/minio-go/v7 v7.0.8
Expand All @@ -75,5 +75,3 @@ require (
)

replace github.com/Azure/azure-service-bus-go => github.com/Azure/azure-service-bus-go v0.10.3

//replace github.com/kubemq-hub/builder => ../builder
19 changes: 8 additions & 11 deletions sources/queue/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubemq Queue Source
# Kubemq Queue Stream Source

Kubemq Queue source provides a queue subscriber for processing source queues.
Kubemq Queue source provides a queue subscriber for processing messages from a queue

## Prerequisites
The following are required to run queue source connector:
Expand All @@ -15,16 +15,14 @@ Queue source connector configuration properties:

| Properties Key | Required | Description | Example |
|:---------------|:---------|:-------------------------------------------------------|:------------|
| address | yes | kubemq server address (gRPC interface) | kubemq-cluster:50000 |
| address | yes | kubemq server address (gRPC interface) | kubemq-cluster:50000 |
| client_id | no | set client id | "client_id" |
| auth_token | no | set authentication token | jwt token |
| channel | yes | set channel to subscribe | |
| sources | no | set how many concurrent sources to subscribe | 1 |
| max_requeue | yes | set how many times to requeue the requests due to target error| "0" no requeue |
| response_channel | no | set send target response to channel | "response.channel" |
| batch_size | no | set how many messages to pull from queue | "1" |
| wait_timeout | no | set how long to wait for messages to arrive in seconds | "60" |

| batch_size | no | set how many messages to pull from queue | "1" |
| wait_timeout | no | set how long to wait for messages to arrive in seconds | "5" |


Example:
Expand All @@ -33,18 +31,17 @@ Example:
bindings:
- name: kubemq-queue-elastic-search
source:
kind: kubemq.queue
kind: kubemq.queue-stream
name: kubemq-queue
properties:
address: "kubemq-cluster:50000"
client_id: "kubemq-queue-elastic-search-connector"
auth_token: ""
channel: "queue.elastic-search"
response_channel: "queue.response.elastic"
sources: "1"
response_channel: "queue.response.elastic"
batch_size: "1"
wait_timeout: "60"
max_requeue: 0
wait_timeout: "5"
target:
kind: stores.elastic-search
name: target-elastic-search
Expand Down
167 changes: 83 additions & 84 deletions sources/queue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,52 @@ package queue

import (
"context"
"github.com/kubemq-hub/builder/connector/common"
"github.com/kubemq-hub/kubemq-targets/middleware"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"time"

"errors"
"fmt"
"github.com/kubemq-hub/builder/connector/common"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/middleware"
"github.com/kubemq-hub/kubemq-targets/pkg/logger"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go/queues_stream"
"time"
)

var (
errInvalidTarget = errors.New("invalid controller received, cannot be null")
)

const (
retriesInterval = 1 * time.Second
)

type Client struct {
opts options
clients []*kubemq.Client
log *logger.Logger
target middleware.Middleware
isStopped bool
requeueCache *requeue
opts options
log *logger.Logger
target middleware.Middleware
isStopped bool
}

func (c *Client) getQueuesClient(ctx context.Context, id int) (*queues_stream.QueuesStreamClient, error) {
return queues_stream.NewQueuesStreamClient(ctx,
queues_stream.WithAddress(c.opts.host, c.opts.port),
queues_stream.WithClientId(c.opts.clientId),
queues_stream.WithCheckConnection(true),
queues_stream.WithAutoReconnect(true),
queues_stream.WithAuthToken(c.opts.authToken),
queues_stream.WithConnectionNotificationFunc(
func(msg string) {
c.log.Infof(fmt.Sprintf("connection: %d, %s", id, msg))
}),
)

}
func New() *Client {
return &Client{}

}
func (c *Client) Connector() *common.Connector {
return Connector()
}
func (c *Client) onError(err error) {
c.log.Error(err.Error())
}
func (c *Client) Init(ctx context.Context, cfg config.Spec, log *logger.Logger) error {
c.log = log
if c.log == nil {
Expand All @@ -48,23 +58,6 @@ func (c *Client) Init(ctx context.Context, cfg config.Spec, log *logger.Logger)
if err != nil {
return err
}
c.requeueCache = newRequeue(c.opts.maxRequeue)
for i := 0; i < c.opts.sources; i++ {
clientId := c.opts.clientId
if c.opts.sources > 1 {
clientId = fmt.Sprintf("%s-%d", clientId, i)
}
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress(c.opts.host, c.opts.port),
kubemq.WithClientId(clientId),
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
kubemq.WithAuthToken(c.opts.authToken),
kubemq.WithCheckConnection(true))
if err != nil {
return err
}
c.clients = append(c.clients, client)
}

return nil
}
Expand All @@ -75,36 +68,28 @@ func (c *Client) Start(ctx context.Context, target middleware.Middleware) error
} else {
c.target = target
}
for i := 0; i < len(c.clients); i++ {
go c.run(ctx, c.clients[i])
for i := 0; i < c.opts.sources; i++ {
client, err := c.getQueuesClient(ctx, i+1)
if err != nil {
return err
}
go c.run(ctx, client)
}
return nil
}

func (c *Client) run(ctx context.Context, client *kubemq.Client) {
func (c *Client) run(ctx context.Context, client *queues_stream.QueuesStreamClient) {
defer func() {
_ = client.Close()
}()
for {
if c.isStopped {
return
}
queueMessages, err := c.getQueueMessages(ctx, client)
err := c.processQueueMessage(ctx, client)
if err != nil {
c.log.Error(err.Error())
time.Sleep(retriesInterval)
continue
}
for _, message := range queueMessages {
resp, err := c.processQueueMessage(ctx, message, client)
if err != nil {
c.log.Error(err.Error())
time.Sleep(time.Second)
continue
}
if resp != nil && c.opts.responseChannel != "" {
_, errSend := client.SetQueueMessage(resp.ToQueueMessage()).SetChannel(c.opts.responseChannel).Send(ctx)
if errSend != nil {
c.log.Errorf("error sending response to a queue, %s", errSend.Error())
}
}
time.Sleep(time.Second)
}
select {
case <-ctx.Done():
Expand All @@ -114,46 +99,60 @@ func (c *Client) run(ctx context.Context, client *kubemq.Client) {
}
}
}
func (c *Client) getQueueMessages(ctx context.Context, client *kubemq.Client) ([]*kubemq.QueueMessage, error) {
receiveResult, err := client.NewReceiveQueueMessagesRequest().
func (c *Client) processQueueMessage(ctx context.Context, client *queues_stream.QueuesStreamClient) error {
pr := queues_stream.NewPollRequest().
SetChannel(c.opts.channel).
SetMaxNumberOfMessages(c.opts.batchSize).
SetWaitTimeSeconds(c.opts.waitTimeout).
Send(ctx)
if err != nil {
return nil, err
}
return receiveResult.Messages, nil
}

func (c *Client) processQueueMessage(ctx context.Context, msg *kubemq.QueueMessage, client *kubemq.Client) (*types.Response, error) {
req, err := types.ParseRequest(msg.Body)
SetMaxItems(c.opts.batchSize).
SetWaitTimeout(c.opts.waitTimeout * 1000).
SetAutoAck(false).
SetOnErrorFunc(c.onError)
pollResp, err := client.Poll(ctx, pr)
if err != nil {
return types.NewResponse().SetError(fmt.Errorf("invalid request format, %w", err)), nil
return err
}
resp, err := c.target.Do(ctx, req)
if err == nil {
c.requeueCache.remove(msg.MessageID)
return resp, nil
if !pollResp.HasMessages() {
return nil
}
if c.requeueCache.isRequeue(msg.MessageID) {
_, requeueErr := client.SetQueueMessage(msg).Send(ctx)
if requeueErr != nil {
c.requeueCache.remove(msg.MessageID)
c.log.Errorf("message id %s wasn't requeue due to an error , %s", msg.MessageID, requeueErr.Error())
return types.NewResponse().SetError(err), nil

for _, message := range pollResp.Messages {
req, err := types.ParseRequest(message.Body)
if err != nil {
_ = message.Ack()
return fmt.Errorf("invalid request format, %w", err)
}
resp, err := c.target.Do(ctx, req)
if err != nil {
if message.Policy.MaxReceiveCount != message.Attributes.ReceiveCount {
return message.NAck()
}
if c.opts.responseChannel != "" {
errResp := types.NewResponse().SetError(err)
_, errSend := client.Send(ctx, errResp.ToQueueStreamMessage().SetChannel(c.opts.responseChannel))
if errSend != nil {
c.log.Errorf("error sending response to a queue, %s", errSend.Error())
}
}
return nil
}

err = message.Ack()
if err != nil {
return err
}

if resp != nil {
if c.opts.responseChannel != "" {
_, errSend := client.Send(ctx, resp.ToQueueStreamMessage().SetChannel(c.opts.responseChannel))
if errSend != nil {
c.log.Errorf("error sending response to a queue, %s", errSend.Error())
}
}
}
c.log.Infof("message id %s, requeued back to channel", msg.MessageID)
return nil, nil
} else {
return types.NewResponse().SetError(err), nil
}
return nil
}

func (c *Client) Stop() error {
c.isStopped = true
for _, client := range c.clients {
_ = client.Close()
}
return nil
}
Loading

0 comments on commit 2c37a5c

Please sign in to comment.