Skip to content

Commit

Permalink
feat(tweet): actor to fetch tweet at specified query with pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
bdeneux committed Nov 25, 2022
1 parent a240a38 commit 64e2e6d
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 5 deletions.
24 changes: 24 additions & 0 deletions app/actor/tweet/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tweet

type Response struct {
Data []struct {
Id string `json:"id"`
EditHistoryTweetIds []string `json:"edit_history_tweet_ids"`
AuthorId string `json:"author_id"`
Text string `json:"text"`
} `json:"data"`
Includes struct {
Users []struct {
Username string `json:"username"`
Name string `json:"name"`
Description string `json:"description"`
Id string `json:"id"`
} `json:"users"`
} `json:"includes"`
Meta struct {
NewestId string `json:"newest_id"`
OldestId string `json:"oldest_id"`
ResultCount int `json:"result_count"`
NextToken string `json:"next_token"`
} `json:"meta"`
}
127 changes: 127 additions & 0 deletions app/actor/tweet/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package tweet

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"okp4/nemeton-leaderboard/app/message"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/scheduler"
"github.com/rs/zerolog/log"
)

type SearchActor struct {
Hashtag string
TweeterToken string
Client *http.Client
SinceId string
}

func NewSearchActor(eventStore *actor.PID, mongoURI, dbName, tweeterToken string) (*SearchActor, error) {
return &SearchActor{
Hashtag: "#NemetonOKP4 -is:retweet",
TweeterToken: tweeterToken,
Client: http.DefaultClient,
SinceId: "",
}, nil
}

func (a *SearchActor) Receive(ctx actor.Context) {
switch ctx.Message().(type) {
case *actor.Started:
log.Info().Msg("💬 Start tweeter search")
scheduler.NewTimerScheduler(ctx).SendRepeatedly(0, 10*time.Second, ctx.Self(), &message.SearchTweet{})
case *message.SearchTweet:
log.Info().Msg("🧙‍ Start looking for tweets")
a.searchTweets(a.SinceId, "", "")
case *actor.Stopping:
log.Info().Msg("🛑 Stop tweeter search")
}
}

// searchTweets
func (a *SearchActor) searchTweets(sinceId, nextToken, initialNewestId string) {
tweets, err := a.fetchTweets(sinceId, nextToken)
if err != nil {
log.Error().Err(err).Msg("❌ Failed fetch tweet from twitter API.")
return
}

if tweets.Data == nil {
log.Info().Msg("🐪 No new tweet found this time, try again later")
return
}

a.handleTweets(tweets)

if tweets.Meta.NextToken != "" {
// There is another page, request it.
newestId := tweets.Meta.NewestId
if initialNewestId != "" {
// Keep the initial newest id for save it after pagination
newestId = initialNewestId
}
log.Info().Str("latestTweetId", newestId).Msg("📃 Search tweet on next page")
a.searchTweets(sinceId, tweets.Meta.NextToken, newestId)
} else {
// No new page, save the next sinceId for next scheduled query.
if initialNewestId != "" {
a.SinceId = initialNewestId
} else {
a.SinceId = tweets.Meta.NewestId
}
log.Info().Str("latestTweetId", a.SinceId).Msg("📃 No new page on tweet search. Looking next time for new tweet.")
}
}

func (a *SearchActor) fetchTweets(sinceId, nextToken string) (*Response, error) {
u, err := url.Parse("https://api.twitter.com/2/tweets/search/recent")
if err != nil {
return nil, err
}
query := u.Query()
query.Add("query", a.Hashtag)
query.Add("expansions", "author_id")
query.Add("user.fields", "username")

if len(nextToken) > 0 {
query.Add("next_token", nextToken)
}
if len(sinceId) > 0 {
query.Add("since_id", sinceId)
}

u.RawQuery = query.Encode()

request, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}

request.Header.Add("authorization", fmt.Sprintf("Bearer %s", a.TweeterToken))
r, err := a.Client.Do(request)
if err != nil {
return nil, err
}

defer r.Body.Close()

if r.StatusCode != 200 {
return nil, fmt.Errorf("wrong response from twitter api. Status code : %d", r.StatusCode)
}

var response Response
err = json.NewDecoder(r.Body).Decode(&response)
if err != nil {
return nil, err
}
return &response, nil
}

func (a *SearchActor) handleTweets(tweets *Response) {
// TODO: Parse response to register tweet event
}
3 changes: 3 additions & 0 deletions app/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ type GetBlockResponse struct {

// SyncBlock used for ask synchronization to request new block on chain.
type SyncBlock struct{}

// SearchTweet message to ask actor to launch tweet research.
type SearchTweet struct{}
20 changes: 16 additions & 4 deletions app/system/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"okp4/nemeton-leaderboard/app/actor/event"
"okp4/nemeton-leaderboard/app/actor/graphql"
"okp4/nemeton-leaderboard/app/actor/synchronization"
"okp4/nemeton-leaderboard/app/actor/tweet"

"github.com/asynkron/protoactor-go/actor"
"github.com/rs/zerolog/log"
Expand All @@ -16,10 +17,10 @@ type App struct {
init *actor.PID
}

func Bootstrap(listenAddr, mongoURI, dbName, grpcAddr string, tls credentials.TransportCredentials) *App {
func Bootstrap(listenAddr, mongoURI, dbName, grpcAddr, tweeterToken string, tls credentials.TransportCredentials) *App {
initProps := actor.PropsFromFunc(func(ctx actor.Context) {
if _, ok := ctx.Message().(*actor.Started); ok {
boot(ctx, listenAddr, mongoURI, dbName, grpcAddr, tls)
boot(ctx, listenAddr, mongoURI, dbName, grpcAddr, tweeterToken, tls)
}
})

Expand All @@ -39,7 +40,7 @@ func (app *App) Stop() error {
return app.ctx.StopFuture(app.init).Wait()
}

func boot(ctx actor.Context, listenAddr, mongoURI, dbName, grpcAddr string, tls credentials.TransportCredentials) {
func boot(ctx actor.Context, listenAddr, mongoURI, dbName, grpcAddr, tweeterToken string, tls credentials.TransportCredentials) {
grpcClientProps := actor.PropsFromProducer(func() actor.Actor {
grpcClient, err := cosmos.NewGrpcClient(grpcAddr, tls)
if err != nil {
Expand All @@ -65,7 +66,18 @@ func boot(ctx actor.Context, listenAddr, mongoURI, dbName, grpcAddr string, tls
return sync
})
if _, err := ctx.SpawnNamed(blockSync, "blockSync"); err != nil {
log.Panic().Err(err).Msg("❌Could not create block sync actor")
log.Panic().Err(err).Msg("❌ Could not create block sync actor")
}

tweetProps := actor.PropsFromProducer(func() actor.Actor {
actor, err := tweet.NewSearchActor(eventStorePID, mongoURI, dbName, tweeterToken)
if err != nil {
log.Panic().Err(err).Msg("❌ Could not start tweet actor")
}
return actor
})
if _, err := ctx.SpawnNamed(tweetProps, "tweet"); err != nil {
log.Panic().Err(err).Msg("❌ Could not create tweet sync actor")
}

graphqlProps := actor.PropsFromProducer(func() actor.Actor {
Expand Down
5 changes: 4 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
FlagGrpcAddress = "grpc-address"
FlagNoTLS = "no-tls"
FlagTLSSkipVerify = "tls-skip-verify"
FlagTweeterToken = "tweeter-token"
)

var (
Expand All @@ -30,12 +31,13 @@ var (
grpcAddress string
noTLS bool
tlsSkipVerify bool
tweeterToken string

startCmd = &cobra.Command{
Use: "start",
Short: "Start the leaderboard service",
Run: func(cmd *cobra.Command, args []string) {
app := system.Bootstrap(graphqlAddr, mongoURI, dbName, grpcAddress, getTransportCredentials())
app := system.Bootstrap(graphqlAddr, mongoURI, dbName, grpcAddress, tweeterToken, getTransportCredentials())

kill := make(chan os.Signal, 1)
signal.Notify(kill, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -61,6 +63,7 @@ func init() {
FlagTLSSkipVerify,
false,
"Encryption with the GRPC endpoint but skip certificates verification")
startCmd.PersistentFlags().StringVar(&tweeterToken, FlagTweeterToken, "", "Set the tweeter bearer token")
}

func getTransportCredentials() credentials.TransportCredentials {
Expand Down

0 comments on commit 64e2e6d

Please sign in to comment.