Skip to content

Commit

Permalink
feat: consume messages from channels
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Dec 27, 2023
1 parent 2b71d07 commit 6755719
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
10 changes: 8 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func main() {
GroupId: groupId,
Log: log,
}
chanPostHandler := messages.ChanPostHandler{
ClientAwk: clientAwk,
GroupId: groupId,
Log: log,
}
callbackHandlers := map[string]service.ArgHandlerFunc{
subscriptions.CmdDescription: subscriptions.DescriptionHandlerFunc(clientAwk, groupId),
subscriptions.CmdExtend: subExtHandler.RequestExtensionDaysCount,
Expand Down Expand Up @@ -272,8 +277,9 @@ func main() {
chat := tgCtx.Chat()
switch chat.Type {
case telebot.ChatChannel:
log.Info(fmt.Sprintf("Started in the public channel %+v", chat))
err = tgCtx.Send("Wow, I'm in a public channel!")
err = chanPostHandler.Publish(tgCtx)
case telebot.ChatChannelPrivate:
err = chanPostHandler.Publish(tgCtx)
case telebot.ChatGroup:
err = subListHandlerFunc(tgCtx)
case telebot.ChatSuperGroup:
Expand Down
60 changes: 60 additions & 0 deletions service/messages/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package messages

import (
"context"
"errors"
"fmt"
"github.com/awakari/bot-telegram/service"
"github.com/awakari/client-sdk-go/api"
"github.com/awakari/client-sdk-go/api/grpc/limits"
"github.com/awakari/client-sdk-go/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"github.com/google/uuid"
"google.golang.org/grpc/metadata"
"gopkg.in/telebot.v3"
"log/slog"
)

type ChanPostHandler struct {
ClientAwk api.Client
GroupId string
Log *slog.Logger
}

func (cp ChanPostHandler) Publish(tgCtx telebot.Context) (err error) {
groupIdCtx := metadata.AppendToOutgoingContext(context.TODO(), service.KeyGroupId, cp.GroupId)
chanUserName := fmt.Sprintf("@%s", tgCtx.Chat().Username)
w, err := cp.ClientAwk.OpenMessagesWriter(groupIdCtx, chanUserName)
evt := pb.CloudEvent{
Id: uuid.NewString(),
Source: chanUserName,
SpecVersion: attrValSpecVersion,
Type: "com.github.awakari.bot-telegram.v1",
}
if err == nil {
defer w.Close()
err = toCloudEvent(tgCtx.Message(), tgCtx.Text(), &evt)
}
if err == nil {
err = cp.publish(tgCtx, w, &evt)
}
return
}

func (cp ChanPostHandler) publish(tgCtx telebot.Context, w model.Writer[*pb.CloudEvent], evt *pb.CloudEvent) (err error) {
var ackCount uint32
ackCount, err = w.WriteBatch([]*pb.CloudEvent{evt})
switch {
case ackCount == 0 && errors.Is(err, limits.ErrReached):
cp.Log.Warn("Message daily publishing limit reached.")
case ackCount == 1:
err = tgCtx.Send(fmt.Sprintf(msgFmtPublished, evt.Id), telebot.ModeHTML)
}
if err == nil {
switch ackCount {
case 0:
err = tgCtx.Send(msgBusy)
}
}
return
}

0 comments on commit 6755719

Please sign in to comment.