From 7fae4d8f4931d4db98e7261f602a71c7cc204979 Mon Sep 17 00:00:00 2001 From: "977231903@qq.com" <> Date: Tue, 8 Mar 2022 21:59:00 +0800 Subject: [PATCH] fix kq client --- app/mqueue/cmd/rpc/etc/mqueue.yaml | 10 ++++- app/mqueue/cmd/rpc/internal/config/config.go | 5 +-- .../cmd/rpc/internal/config/kqConfig.go | 6 +++ .../logic/kqPaymenStatusUpdateLogic.go | 11 ++++- .../logic/sendWxMiniSubMessageLogic.go | 11 ++++- .../cmd/rpc/internal/svc/serviceContext.go | 13 +++--- common/kqueue/kqeueSvcClient.go | 44 ------------------- common/kqueue/topic.go | 5 --- 8 files changed, 45 insertions(+), 60 deletions(-) create mode 100644 app/mqueue/cmd/rpc/internal/config/kqConfig.go delete mode 100644 common/kqueue/kqeueSvcClient.go delete mode 100644 common/kqueue/topic.go diff --git a/app/mqueue/cmd/rpc/etc/mqueue.yaml b/app/mqueue/cmd/rpc/etc/mqueue.yaml index 923d530b..865b6679 100644 --- a/app/mqueue/cmd/rpc/etc/mqueue.yaml +++ b/app/mqueue/cmd/rpc/etc/mqueue.yaml @@ -24,6 +24,14 @@ Redis: Type: node Pass: G62m50oigInC30sf -KqServerConf: +#支付成功回调通知 +KqPaymentUpdatePayStatusConf: Brokers: - kafka:9092 + Topic: payment-update-paystatus-topic + +#修改订单状态后发送小程序通知用户 +KqSendWxMiniTplMessageConf: + Brokers: + - kafka:9092 + Topic: send-wx-mini-tpl-message diff --git a/app/mqueue/cmd/rpc/internal/config/config.go b/app/mqueue/cmd/rpc/internal/config/config.go index 0c96f59d..dea11a60 100755 --- a/app/mqueue/cmd/rpc/internal/config/config.go +++ b/app/mqueue/cmd/rpc/internal/config/config.go @@ -5,7 +5,6 @@ import "github.com/zeromicro/go-zero/zrpc" type Config struct { zrpc.RpcServerConf - KqServerConf struct { - Brokers []string - } + KqPaymentUpdatePayStatusConf KqConfig + KqSendWxMiniTplMessageConf KqConfig } diff --git a/app/mqueue/cmd/rpc/internal/config/kqConfig.go b/app/mqueue/cmd/rpc/internal/config/kqConfig.go new file mode 100644 index 00000000..edad2218 --- /dev/null +++ b/app/mqueue/cmd/rpc/internal/config/kqConfig.go @@ -0,0 +1,6 @@ +package config + +type KqConfig struct { + Brokers []string + Topic string +} diff --git a/app/mqueue/cmd/rpc/internal/logic/kqPaymenStatusUpdateLogic.go b/app/mqueue/cmd/rpc/internal/logic/kqPaymenStatusUpdateLogic.go index 263ba3ec..f2447460 100644 --- a/app/mqueue/cmd/rpc/internal/logic/kqPaymenStatusUpdateLogic.go +++ b/app/mqueue/cmd/rpc/internal/logic/kqPaymenStatusUpdateLogic.go @@ -2,6 +2,9 @@ package logic import ( "context" + "github.com/pkg/errors" + "looklook/common/xerr" + "encoding/json" "looklook/app/mqueue/cmd/rpc/internal/svc" "looklook/app/mqueue/cmd/rpc/pb" @@ -32,7 +35,13 @@ func (l *KqPaymenStatusUpdateLogic) KqPaymenStatusUpdate(in *pb.KqPaymenStatusUp PayStatus: in.PayStatus, } - if err := l.svcCtx.KqueueClient.Push(kqueue.PAYMENT_UPDATE_PAYSTATUS, m); err != nil { + //2、序列化 + body, err := json.Marshal(m) + if err != nil { + return nil, errors.Wrapf(xerr.NewErrMsg("kq kqPaymenStatusUpdateLogic task marshal error "), "kq kqPaymenStatusUpdateLogic task marshal error , v : %+v", m) + } + + if err := l.svcCtx.KqueuePaymentUpdatePayStatusClient.Push(string(body)); err != nil { return nil, err } diff --git a/app/mqueue/cmd/rpc/internal/logic/sendWxMiniSubMessageLogic.go b/app/mqueue/cmd/rpc/internal/logic/sendWxMiniSubMessageLogic.go index ebec981f..f36a0aa1 100644 --- a/app/mqueue/cmd/rpc/internal/logic/sendWxMiniSubMessageLogic.go +++ b/app/mqueue/cmd/rpc/internal/logic/sendWxMiniSubMessageLogic.go @@ -2,6 +2,9 @@ package logic import ( "context" + "github.com/pkg/errors" + "looklook/common/xerr" + "encoding/json" "looklook/app/mqueue/cmd/rpc/internal/svc" "looklook/app/mqueue/cmd/rpc/pb" @@ -34,7 +37,13 @@ func (l *SendWxMiniSubMessageLogic) SendWxMiniSubMessage(in *pb.SendWxMiniSubMes Page: in.Page, } - if err := l.svcCtx.KqueueClient.Push(kqueue.SEND_WX_MINI_TPL_MESSAGE, m); err != nil { + //2、序列化 + body, err := json.Marshal(m) + if err != nil { + return nil, errors.Wrapf(xerr.NewErrMsg("kq sendWxMiniSubMessageLogic task marshal error "), "kq sendWxMiniSubMessageLogic task marshal error , v : %+v", m) + } + + if err := l.svcCtx.KqueueSendWxMiniTplMessageClient.Push(string(body)); err != nil { return nil, err } diff --git a/app/mqueue/cmd/rpc/internal/svc/serviceContext.go b/app/mqueue/cmd/rpc/internal/svc/serviceContext.go index 844cb590..cb922a28 100644 --- a/app/mqueue/cmd/rpc/internal/svc/serviceContext.go +++ b/app/mqueue/cmd/rpc/internal/svc/serviceContext.go @@ -1,25 +1,28 @@ package svc import ( - "looklook/app/mqueue/cmd/rpc/internal/config" - "looklook/common/kqueue" - "github.com/hibiken/asynq" + "github.com/zeromicro/go-queue/kq" + "looklook/app/mqueue/cmd/rpc/internal/config" ) type ServiceContext struct { Config config.Config - KqueueClient kqueue.KqueueClient AsynqClient *asynq.Client + + KqueuePaymentUpdatePayStatusClient *kq.Pusher + KqueueSendWxMiniTplMessageClient *kq.Pusher + } func NewServiceContext(c config.Config) *ServiceContext { svc := &ServiceContext{ Config: c, - KqueueClient: kqueue.NewKqueueSvcClient(c.KqServerConf.Brokers), AsynqClient: asynq.NewClient(asynq.RedisClientOpt{Addr: c.Redis.Host, Password: c.Redis.Pass}), + KqueuePaymentUpdatePayStatusClient: kq.NewPusher(c.KqPaymentUpdatePayStatusConf.Brokers,c.KqPaymentUpdatePayStatusConf.Topic), + KqueueSendWxMiniTplMessageClient: kq.NewPusher(c.KqSendWxMiniTplMessageConf.Brokers,c.KqSendWxMiniTplMessageConf.Topic), } return svc diff --git a/common/kqueue/kqeueSvcClient.go b/common/kqueue/kqeueSvcClient.go deleted file mode 100644 index beff9aac..00000000 --- a/common/kqueue/kqeueSvcClient.go +++ /dev/null @@ -1,44 +0,0 @@ -package kqueue - -import ( - "encoding/json" - "looklook/common/xerr" - - "github.com/pkg/errors" - "github.com/zeromicro/go-queue/kq" -) - -type KqueueClient interface { - Push(topic string, v interface{}) error -} - -//封装的asynq业务客户端 -type kqueueSvcClient struct { - Brokers []string -} - -func NewKqueueSvcClient(brokers []string) KqueueClient { - return &kqueueSvcClient{ - Brokers: brokers, - } -} - -//添加 -func (l *kqueueSvcClient) Push(topic string, v interface{}) error { - - //1、初始化pusher - pusher := kq.NewPusher(l.Brokers, topic) - - //2、序列化 - body, err := json.Marshal(v) - if err != nil { - return errors.Wrapf(xerr.NewErrMsg("kq task marshal error "), "【pushKqMsgErrorMarshal】topic : %s , v : %+v", topic, v) - } - - //3、发送消息. - if err := pusher.Push(string(body)); err != nil { - return errors.Wrapf(xerr.NewErrMsg("kq pusher task push error"), "【pushKqMsgErrorPush】topic : %s , v : %+v", topic, v) - } - - return nil -} diff --git a/common/kqueue/topic.go b/common/kqueue/topic.go deleted file mode 100644 index 5405fc65..00000000 --- a/common/kqueue/topic.go +++ /dev/null @@ -1,5 +0,0 @@ -//kafka kq topic -package kqueue - -const PAYMENT_UPDATE_PAYSTATUS = "payment-update-paystatus-topic" //第三方支付回调更改支付状态通知 -const SEND_WX_MINI_TPL_MESSAGE = "send-wx-mini-tpl-message" //发送微信小程序模版消息