Skip to content

Commit

Permalink
feat: replace twitter call from week in olas skill
Browse files Browse the repository at this point in the history
  • Loading branch information
dvilelaf committed Jun 14, 2024
1 parent 8354b61 commit 5196f9e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 162 deletions.
229 changes: 73 additions & 156 deletions packages/valory/skills/olas_week_abci/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@
from datetime import date, datetime, timedelta
from typing import Dict, Generator, List, Optional, Set, Tuple, Type, cast

from aea.protocols.base import Message
from twitter_text import parse_tweet

from packages.valory.connections.openai.connection import (
PUBLIC_ID as LLM_CONNECTION_PUBLIC_ID,
)
from packages.valory.connections.tweepy.connection import (
PUBLIC_ID as TWEEPY_CONNECTION_PUBLIC_ID,
)
from packages.valory.protocols.llm.message import LlmMessage
from packages.valory.protocols.srr.dialogues import SrrDialogue, SrrDialogues
from packages.valory.protocols.srr.message import SrrMessage
from packages.valory.skills.abstract_round_abci.base import AbstractRound
from packages.valory.skills.abstract_round_abci.behaviours import (
AbstractRoundBehaviour,
Expand All @@ -57,7 +63,7 @@
from packages.valory.skills.olas_week_abci.prompts import tweet_summarizer_prompt
from packages.valory.skills.olas_week_abci.rounds import (
ERROR_API_LIMITS,
ERROR_GENERIC,
ERROR_TWEEPY_CONNECTION,
Event,
OlasWeekDecisionMakingRound,
OlasWeekEvaluationRound,
Expand All @@ -83,6 +89,7 @@
LINK_REGEX = r"https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&\/=]*)"
HIGHLIGHT_LINK_REGEX = rf"☴.*\n{LINK_REGEX}\n"
WEEK_IN_OLAS_REGEX = r".*Week \d+ in Olas.*"
AUTONOLAS_TWITTER_ID = "1450081635559428107"


def extract_headers(header_str: str) -> dict:
Expand Down Expand Up @@ -204,6 +211,38 @@ def _check_twitter_limits(self) -> Tuple:
# Window has not expired and we have not reached the max number of tweets
return False, number_of_tweets_pulled_today, last_tweet_pull_window_reset

def _do_connection_request(
self,
message: Message,
dialogue: Message,
timeout: Optional[float] = None,
) -> Generator[None, None, Message]:
"""Do a request and wait the response, asynchronously."""

self.context.outbox.put_message(message=message)
request_nonce = self._get_request_nonce_from_dialogue(dialogue) # type: ignore
cast(Requests, self.context.requests).request_id_to_callback[
request_nonce
] = self.get_callback_request()
response = yield from self.wait_for_message(timeout=timeout)
return response

def _call_tweepy(
self,
**kwargs,
) -> Generator[None, None, Dict]:
"""Send a request message from the skill context."""
srr_dialogues = cast(SrrDialogues, self.context.srr_dialogues)
srr_message, srr_dialogue = srr_dialogues.create(
counterparty=str(TWEEPY_CONNECTION_PUBLIC_ID),
performative=SrrMessage.Performative.REQUEST,
payload=json.dumps(**kwargs),
)
srr_message = cast(SrrMessage, srr_message)
srr_dialogue = cast(SrrDialogue, srr_dialogue)
response = yield from self._do_connection_request(srr_message, srr_dialogue) # type: ignore
return json.loads(response.payload) # type: ignore


class OlasWeekRandomnessBehaviour(RandomnessBehaviour):
"""Retrieve randomness."""
Expand Down Expand Up @@ -439,7 +478,7 @@ def _sender_act(self) -> Generator:

else:
# Get tweets from Twitter
payload_data = yield from self._get_tweets(
payload_data = yield from self._get_week_tweets(
number_of_tweets_pulled_today=number_of_tweets_pulled_today
)

Expand All @@ -455,15 +494,13 @@ def _sender_act(self) -> Generator:

self.set_done()

def _get_tweets(
def _get_week_tweets(
self,
number_of_tweets_pulled_today: int,
) -> Generator[None, None, Dict]:
"""Get Tweets"""

api_base = self.params.twitter_api_base
api_endpoint = self.params.twitter_tweets_endpoint
"""Get last week's tweets from Twitter"""

# Checl the tweet allowance
number_of_tweets_remaining_today = (
self.params.max_tweet_pulls_allowed - number_of_tweets_pulled_today
)
Expand All @@ -487,138 +524,42 @@ def _get_tweets(

start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")

# Build the args
api_args = self.params.twitter_tweets_args.replace(
"{start_time}", start_time_str
)
api_args = api_args.replace(
"{max_results}", str(number_of_tweets_remaining_today)
)
api_url = api_base + api_endpoint + api_args
headers = dict(Authorization=f"Bearer {self.params.twitter_api_bearer_token}")
self.context.logger.info(f"Searching @autonolas tweets since {start_time_str}")

self.context.logger.info(
f"Retrieving tweets from Twitter API [{api_url}]\nBearer token {self.params.twitter_api_bearer_token[:5]}*******{self.params.twitter_api_bearer_token[-5:]}"
# Call Tweepy conection
response = yield from self._call_tweepy(
action="get_users_tweets",
kwargs={
"id": AUTONOLAS_TWITTER_ID,
"start_time": start_time_str,
},
)
# Check response
if "error" in response:
return {
"tweets": None,
"error": ERROR_TWEEPY_CONNECTION,
"latest_mention_tweet_id": None,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": self.synchronized_data.sleep_until,
}

tweets = {}
next_token = None

# Pagination loop: we read a max of <twitter_max_pages> pages each period
# Each page contains 100 tweets. The default value for twitter_max_pages is 10
for _ in range(self.params.twitter_max_pages):
self.context.logger.info(
f"Retrieving a new page. max_pages={self.params.twitter_max_pages}"
)
url = api_url
# Add the pagination token if it exists
if next_token:
url += f"&pagination_token={next_token}"

# Make the request
response = yield from self.get_http_response(
method="GET", url=url, headers=headers
)

# Check response status
if response.status_code != 200:
header_dict = extract_headers(response.headers)

remaining, limit, reset_ts = [
header_dict.get(header, "?")
for header in [
"x-rate-limit-remaining",
"x-rate-limit-limit",
"x-rate-limit-reset",
]
]
reset = (
datetime.fromtimestamp(int(reset_ts)).strftime("%Y-%m-%d %H:%M:%S")
if reset_ts != "?"
else None
)

self.context.logger.error(
f"Error retrieving tweets from Twitter [{response.status_code}]: {response.body}"
f"API limits: {remaining}/{limit}. Window reset: {reset}"
)

return {
"tweets": None,
"error": ERROR_API_LIMITS
if response.status_code == HTTP_TOO_MANY_REQUESTS
else ERROR_GENERIC,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": reset_ts
if response.status_code == HTTP_TOO_MANY_REQUESTS
else self.synchronized_data.sleep_until,
}

api_data = json.loads(response.body)

# Check the meta field
if "meta" not in api_data:
self.context.logger.error(
f"Twitter API response does not contain the required 'meta' field: {api_data!r}"
)
return {
"tweets": None,
"error": ERROR_GENERIC,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": None, # we reset this on a successful request
}

# Check if there are no more results
if (
"result_count" in api_data["meta"]
and int(api_data["meta"]["result_count"]) == 0
):
break

# Check that the data exists
if "data" not in api_data or "newest_id" not in api_data["meta"]:
self.context.logger.error(
f"Twitter API response does not contain the required 'meta' field: {api_data!r}"
)
return {
"tweets": None,
"error": ERROR_GENERIC,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": None, # we reset this on a successful request
}

if "includes" not in api_data or "users" not in api_data["includes"]:
self.context.logger.error(
f"Twitter API response does not contain the required 'includes/users' field: {api_data!r}"
)
return {
"tweets": None,
"error": ERROR_GENERIC,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": None, # we reset this on a successful request
}

# Add the retrieved tweets
for tweet in api_data["data"]:
tweets[tweet["id"]] = tweet

# Set the author handle
for user in api_data["includes"]["users"]:
if user["id"] == tweet["author_id"]:
tweets[tweet["id"]]["username"] = user["username"]
break
number_of_tweets_pulled_today += 1

if "next_token" in api_data["meta"]:
next_token = api_data["meta"]["next_token"]
continue
# Process tweets
tweets = {t["id"]: t for t in response["tweets"]}
retrieved_tweets = len(response["tweets"])
number_of_tweets_pulled_today += retrieved_tweets
latest_tweet_id = response["tweets"][
0
].id # tweepy sorts by most recent first by default

break
self.context.logger.info(
f"Got {retrieved_tweets} new hashtag tweets until tweet_id={latest_tweet_id}: {tweets.keys()}"
)

self.context.logger.info(f"Got {len(tweets)} new tweets")

return {
"tweets": list(tweets.values()),
"tweets": tweets,
"number_of_tweets_pulled_today": number_of_tweets_pulled_today,
"sleep_until": None, # we reset this on a successful request
}
Expand Down Expand Up @@ -720,7 +661,7 @@ def evaluate_summary(
)
request_llm_message = cast(LlmMessage, request_llm_message)
llm_dialogue = cast(LlmDialogue, llm_dialogue)
llm_response_message = yield from self._do_request(
llm_response_message = yield from self._do_connection_request(
request_llm_message, llm_dialogue
)
data = llm_response_message.value
Expand All @@ -730,30 +671,6 @@ def evaluate_summary(
self.context.logger.info(f"Parsed summary: {summary}")
return summary

def _do_request(
self,
llm_message: LlmMessage,
llm_dialogue: LlmDialogue,
timeout: Optional[float] = None,
) -> Generator[None, None, LlmMessage]:
"""
Do a request and wait the response, asynchronously.
:param llm_message: The request message
:param llm_dialogue: the HTTP dialogue associated to the request
:param timeout: seconds to wait for the reply.
:yield: LLMMessage object
:return: the response message
"""
self.context.outbox.put_message(message=llm_message)
request_nonce = self._get_request_nonce_from_dialogue(llm_dialogue)
cast(Requests, self.context.requests).request_id_to_callback[
request_nonce
] = self.get_callback_request()
# notify caller by propagating potential timeout exception.
response = yield from self.wait_for_message(timeout=timeout)
return response


class OlasWeekRoundBehaviour(AbstractRoundBehaviour):
"""OlasWeekRoundBehaviour"""
Expand Down
2 changes: 1 addition & 1 deletion packages/valory/skills/olas_week_abci/rounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@


MAX_API_RETRIES = 2
ERROR_GENERIC = "generic"
ERROR_API_LIMITS = "too many requests"
ERROR_TWEEPY_CONNECTION = "tweepy connection"


class Event(Enum):
Expand Down
5 changes: 0 additions & 5 deletions packages/valory/skills/olas_week_abci/skill.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,6 @@ models:
tendermint_p2p_url: localhost:26656
tendermint_url: http://localhost:26657
tx_timeout: 10.0
twitter_api_base: https://api.twitter.com/
twitter_api_bearer_token: <default_bearer_token>
twitter_tweets_endpoint: 2/users/1450081635559428107/tweets?
twitter_tweets_args: tweet.fields=author_id,created_at,conversation_id&user.fields=name&expansions=author_id&max_results=50&start_time={start_time}
twitter_max_pages: 1
max_tweet_pulls_allowed: 80
openai_call_window_size: 3600.0
openai_calls_allowed_in_window: 100
Expand Down

0 comments on commit 5196f9e

Please sign in to comment.