Skip to content

Commit

Permalink
fix kq client
Browse files Browse the repository at this point in the history
  • Loading branch information
977231903@qq.com committed Mar 8, 2022
1 parent 349919c commit 7fae4d8
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 60 deletions.
10 changes: 9 additions & 1 deletion app/mqueue/cmd/rpc/etc/mqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ Redis:
Type: node
Pass: G62m50oigInC30sf

KqServerConf:
#支付成功回调通知
KqPaymentUpdatePayStatusConf:
Brokers:
- kafka:9092
Topic: payment-update-paystatus-topic

#修改订单状态后发送小程序通知用户
KqSendWxMiniTplMessageConf:
Brokers:
- kafka:9092
Topic: send-wx-mini-tpl-message
5 changes: 2 additions & 3 deletions app/mqueue/cmd/rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "github.com/zeromicro/go-zero/zrpc"
type Config struct {
zrpc.RpcServerConf

KqServerConf struct {
Brokers []string
}
KqPaymentUpdatePayStatusConf KqConfig
KqSendWxMiniTplMessageConf KqConfig
}
6 changes: 6 additions & 0 deletions app/mqueue/cmd/rpc/internal/config/kqConfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

type KqConfig struct {
Brokers []string
Topic string
}
11 changes: 10 additions & 1 deletion app/mqueue/cmd/rpc/internal/logic/kqPaymenStatusUpdateLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package logic

import (
"context"
"github.com/pkg/errors"
"looklook/common/xerr"
"encoding/json"

"looklook/app/mqueue/cmd/rpc/internal/svc"
"looklook/app/mqueue/cmd/rpc/pb"
Expand Down Expand Up @@ -32,7 +35,13 @@ func (l *KqPaymenStatusUpdateLogic) KqPaymenStatusUpdate(in *pb.KqPaymenStatusUp
PayStatus: in.PayStatus,
}

if err := l.svcCtx.KqueueClient.Push(kqueue.PAYMENT_UPDATE_PAYSTATUS, m); err != nil {
//2、序列化
body, err := json.Marshal(m)
if err != nil {
return nil, errors.Wrapf(xerr.NewErrMsg("kq kqPaymenStatusUpdateLogic task marshal error "), "kq kqPaymenStatusUpdateLogic task marshal error , v : %+v", m)
}

if err := l.svcCtx.KqueuePaymentUpdatePayStatusClient.Push(string(body)); err != nil {
return nil, err
}

Expand Down
11 changes: 10 additions & 1 deletion app/mqueue/cmd/rpc/internal/logic/sendWxMiniSubMessageLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package logic

import (
"context"
"github.com/pkg/errors"
"looklook/common/xerr"
"encoding/json"

"looklook/app/mqueue/cmd/rpc/internal/svc"
"looklook/app/mqueue/cmd/rpc/pb"
Expand Down Expand Up @@ -34,7 +37,13 @@ func (l *SendWxMiniSubMessageLogic) SendWxMiniSubMessage(in *pb.SendWxMiniSubMes
Page: in.Page,
}

if err := l.svcCtx.KqueueClient.Push(kqueue.SEND_WX_MINI_TPL_MESSAGE, m); err != nil {
//2、序列化
body, err := json.Marshal(m)
if err != nil {
return nil, errors.Wrapf(xerr.NewErrMsg("kq sendWxMiniSubMessageLogic task marshal error "), "kq sendWxMiniSubMessageLogic task marshal error , v : %+v", m)
}

if err := l.svcCtx.KqueueSendWxMiniTplMessageClient.Push(string(body)); err != nil {
return nil, err
}

Expand Down
13 changes: 8 additions & 5 deletions app/mqueue/cmd/rpc/internal/svc/serviceContext.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package svc

import (
"looklook/app/mqueue/cmd/rpc/internal/config"
"looklook/common/kqueue"

"github.com/hibiken/asynq"
"github.com/zeromicro/go-queue/kq"
"looklook/app/mqueue/cmd/rpc/internal/config"
)

type ServiceContext struct {
Config config.Config

KqueueClient kqueue.KqueueClient
AsynqClient *asynq.Client

KqueuePaymentUpdatePayStatusClient *kq.Pusher
KqueueSendWxMiniTplMessageClient *kq.Pusher

}

func NewServiceContext(c config.Config) *ServiceContext {

svc := &ServiceContext{
Config: c,
KqueueClient: kqueue.NewKqueueSvcClient(c.KqServerConf.Brokers),
AsynqClient: asynq.NewClient(asynq.RedisClientOpt{Addr: c.Redis.Host, Password: c.Redis.Pass}),
KqueuePaymentUpdatePayStatusClient: kq.NewPusher(c.KqPaymentUpdatePayStatusConf.Brokers,c.KqPaymentUpdatePayStatusConf.Topic),
KqueueSendWxMiniTplMessageClient: kq.NewPusher(c.KqSendWxMiniTplMessageConf.Brokers,c.KqSendWxMiniTplMessageConf.Topic),
}

return svc
Expand Down
44 changes: 0 additions & 44 deletions common/kqueue/kqeueSvcClient.go

This file was deleted.

5 changes: 0 additions & 5 deletions common/kqueue/topic.go

This file was deleted.

0 comments on commit 7fae4d8

Please sign in to comment.