Skip to content

Commit

Permalink
Added update processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mymmrac committed Jul 15, 2022
1 parent ef0cbdc commit c978f97
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ More examples can be seen here:
- [Custom predicates for handlers](examples/handler_custom/main.go)
- [Specific handlers](examples/handler_specific/main.go)
- [Predicate as middleware](examples/middleware_with_predicates/main.go)
- [Update processor](examples/update_processor/main.go)

</details>

Expand Down
53 changes: 53 additions & 0 deletions examples/update_processor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"os"
"sync/atomic"

"github.com/mymmrac/telego"
tu "github.com/mymmrac/telego/telegoutil"
)

func main() {
botToken := os.Getenv("TOKEN")

// Create Bot
bot, err := telego.NewBot(botToken)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// Get updates channel
updates, _ := bot.UpdatesViaLongPulling(nil)

// Stop reviving updates from updates channel
defer bot.StopLongPulling()

fmt.Println("Listening for updates...")

count := int64(0)

// Process updates for something (here to count them)
processedUpdates := tu.UpdateProcessor(updates, 100, func(update telego.Update) telego.Update {
atomic.AddInt64(&count, 1)

currentCount := atomic.LoadInt64(&count)
fmt.Println("Update count:", currentCount)

// Stop bot when processed 3 updates
if currentCount >= 3 {
bot.StopLongPulling()
}

return update
})

// Log update IDs
for update := range processedUpdates {
fmt.Println("Update ID:", update.UpdateID)
}

fmt.Println("Bye")
}
22 changes: 20 additions & 2 deletions telegoutil/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package telegoutil
import (
"io"

"github.com/mymmrac/telego/telegoapi"
"github.com/mymmrac/telego"
ta "github.com/mymmrac/telego/telegoapi"
)

// namedReaderImpl represents simplest implementation of telegoapi.NamedReader
Expand All @@ -21,9 +22,26 @@ func (r namedReaderImpl) Name() string {
}

// NameReader "names" io.Reader and returns valid telegoapi.NamedReader
func NameReader(reader io.Reader, name string) telegoapi.NamedReader {
func NameReader(reader io.Reader, name string) ta.NamedReader {
return namedReaderImpl{
reader: reader,
name: name,
}
}

// UpdateProcessor allows you to process updates and still use updates chan. New updates chan will be closed when
// original chan is closed.
// Note: telego.Update contains pointers so by modifying update you may modify original update.
func UpdateProcessor(updates <-chan telego.Update, buffer uint,
processor func(update telego.Update) telego.Update) <-chan telego.Update {
processedUpdates := make(chan telego.Update, buffer)

go func() {
defer close(processedUpdates)
for update := range updates {
processedUpdates <- processor(update) // TODO: Pass copy of telego.Update
}
}()

return processedUpdates
}
33 changes: 33 additions & 0 deletions telegoutil/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package telegoutil
import (
"io/ioutil"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"

"github.com/mymmrac/telego"
)

func TestNameReader(t *testing.T) {
Expand All @@ -19,3 +22,33 @@ func TestNameReader(t *testing.T) {
assert.Equal(t, text1, string(data))
assert.Equal(t, text2, nr.Name())
}

func TestUpdateProcessor(t *testing.T) {
updates := make(chan telego.Update)

wg := sync.WaitGroup{}

processedUpdates := UpdateProcessor(updates, 10, func(update telego.Update) telego.Update {
wg.Done()
update.UpdateID *= 10
return update
})

const updateCount = 2
wg.Add(updateCount)

updates <- telego.Update{UpdateID: 1}
updates <- telego.Update{UpdateID: 2}

wg.Wait()

count := 0
for update := range processedUpdates {
count++
assert.True(t, update.UpdateID%10 == 0)

if count == updateCount {
close(updates)
}
}
}

0 comments on commit c978f97

Please sign in to comment.