diff --git a/docs/twikit.rst b/docs/twikit.rst index b22eb7b7..126c1ee6 100644 --- a/docs/twikit.rst +++ b/docs/twikit.rst @@ -131,6 +131,15 @@ Geo :show-inheritance: :member-order: bysource +Capsolver +------------------- + +.. automodule:: twikit._captcha.capsolver + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource + Utils ------------------- diff --git a/docs/twikit.twikit_async.rst b/docs/twikit.twikit_async.rst index e81962b3..0012bacd 100644 --- a/docs/twikit.twikit_async.rst +++ b/docs/twikit.twikit_async.rst @@ -125,7 +125,16 @@ Notification Geo ------------------- -.. automodule:: twikit.geo +.. automodule:: twikit.twikit_async.geo + :members: + :undoc-members: + :show-inheritance: + :member-order: bysource + +Capsolver +------------------- + +.. automodule:: twikit.twikit_async._captcha.capsolver :members: :undoc-members: :show-inheritance: diff --git a/requirements.txt b/requirements.txt index 1fc2b326..9025831c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ httpx -fake_useragent -filetype \ No newline at end of file +filetype +beautifulsoup4 \ No newline at end of file diff --git a/setup.py b/setup.py index c41f96ef..dd9a520e 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,8 @@ version=version, install_requires=[ 'httpx', - 'fake_useragent', - 'filetype' + 'filetype', + 'beautifulsoup4' ], python_requires='>=3.10', description='Twitter API wrapper for python with **no API key required**.', diff --git a/twikit/__init__.py b/twikit/__init__.py index 1a6a226c..65f48de8 100644 --- a/twikit/__init__.py +++ b/twikit/__init__.py @@ -7,8 +7,9 @@ A Python library for interacting with the Twitter API. """ -__version__ = '1.6.4' +__version__ = '1.7.0' +from ._captcha import Capsolver from .bookmark import BookmarkFolder from .client import Client from .community import (Community, CommunityCreator, CommunityMember, diff --git a/twikit/_captcha/__init__.py b/twikit/_captcha/__init__.py new file mode 100644 index 00000000..85f95d91 --- /dev/null +++ b/twikit/_captcha/__init__.py @@ -0,0 +1,2 @@ +from .base import CaptchaSolver +from .capsolver import Capsolver diff --git a/twikit/_captcha/base.py b/twikit/_captcha/base.py new file mode 100644 index 00000000..b00a6216 --- /dev/null +++ b/twikit/_captcha/base.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, NamedTuple + +from bs4 import BeautifulSoup +from httpx import Response + +from twikit.utils import urlencode + +if TYPE_CHECKING: + from ..client import Client + + +class UnlockHTML(NamedTuple): + authenticity_token: str + assignment_token: str + needs_unlock: bool + start_button: bool + finish_button: bool + delete_button: bool + blob: str + + +class CaptchaSolver: + client: Client + max_attempts: int + + CAPTCHA_URL = 'https://twitter.com/account/access' + CAPTCHA_SITE_KEY = '0152B4EB-D2DC-460A-89A1-629838B529C9' + + def get_unlock_html(self) -> tuple[Response, UnlockHTML]: + headers = { + 'X-Twitter-Client-Language': 'en-US', + 'User-Agent': self.client._user_agent, + 'Upgrade-Insecure-Requests': '1' + } + _, response = self.client.get( + self.CAPTCHA_URL, headers=headers + ) + return response, parse_unlock_html(response.text) + + def ui_metrix(self) -> str: + js, _ = self.client.get( + 'https://twitter.com/i/js_inst?c_name=ui_metrics' + ) + return re.findall(r'return ({.*?});', js, re.DOTALL)[0] + + def confirm_unlock( + self, + authenticity_token: str, + assignment_token: str, + verification_string: str = None, + ui_metrics: bool = False + ) -> tuple[Response, UnlockHTML]: + data = { + 'authenticity_token': authenticity_token, + 'assignment_token': assignment_token, + 'lang': 'en', + 'flow': '', + } + params = {} + if verification_string: + data['verification_string'] = verification_string + data['language_code'] = 'en' + params['lang'] = 'en' + if ui_metrics: + data['ui_metrics'] = self.ui_metrix() + data = urlencode(data) + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Upgrade-Insecure-Requests': '1', + 'Referer': self.CAPTCHA_URL + } + _, response = self.client.post( + self.CAPTCHA_URL, params=params, data=data, headers=headers + ) + return response, parse_unlock_html(response.text) + + +def parse_unlock_html(html: str) -> UnlockHTML: + soup = BeautifulSoup(html, 'lxml') + + authenticity_token = None + authenticity_token_element = soup.find( + 'input', {'name': 'authenticity_token'} + ) + if authenticity_token_element is not None: + authenticity_token: str = authenticity_token_element.get('value') + + assignment_token = None + assignment_token_element = soup.find('input', {'name': 'assignment_token'}) + if assignment_token_element is not None: + assignment_token = assignment_token_element.get('value') + + verification_string = soup.find('input', id='verification_string') + needs_unlock = bool(verification_string) + start_button = bool(soup.find('input', value='Start')) + finish_button = bool(soup.find('input', value='Continue to X')) + delete_button = bool(soup.find('input', value='Delete')) + + iframe = soup.find(id='arkose_iframe') + blob = re.findall(r'data=(.+)', iframe['src'])[0] if iframe else None + + return UnlockHTML( + authenticity_token, + assignment_token, + needs_unlock, + start_button, + finish_button, + delete_button, + blob + ) diff --git a/twikit/_captcha/capsolver.py b/twikit/_captcha/capsolver.py new file mode 100644 index 00000000..3d3a0a6e --- /dev/null +++ b/twikit/_captcha/capsolver.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from time import sleep + +import httpx + +from .base import CaptchaSolver + + +class Capsolver(CaptchaSolver): + """ + You can automatically unlock the account by passing the `captcha_solver` + argument when initialising the :class:`.Client`. + + First, visit https://capsolver.com and obtain your Capsolver API key. + Next, pass the Capsolver instance to the client as shown in the example. + + .. code-block:: python + + from twikit.twikit_async import Capsolver, Client + solver = Capsolver( + api_key='your_api_key', + max_attempts=10 + ) + client = Client(captcha_solver=solver) + + Parameters + ---------- + api_key : :class:`str` + Capsolver API key. + max_attempts : :class:`int`, default=3 + The maximum number of attempts to solve the captcha. + get_result_interval : :class:`float`, default=1.0 + + use_blob_data : :class:`bool`, default=False + """ + def __init__( + self, + api_key: str, + max_attempts: int = 3, + get_result_interval: float = 1.0, + use_blob_data: bool = False + ) -> None: + self.api_key = api_key + self.get_result_interval = get_result_interval + self.max_attempts = max_attempts + self.use_blob_data = use_blob_data + + def create_task(self, task_data: dict) -> dict: + data = { + 'clientKey': self.api_key, + 'task': task_data + } + response = httpx.post( + 'https://api.capsolver.com/createTask', + json=data, + headers={'content-type': 'application/json'} + ).json() + return response + + def get_task_result(self, task_id: str) -> dict: + data = { + 'clientKey': self.api_key, + 'taskId': task_id + } + response = httpx.post( + 'https://api.capsolver.com/getTaskResult', + json=data, + headers={'content-type': 'application/json'} + ).json() + return response + + def solve_funcaptcha(self, blob: str) -> dict: + if self.client.proxy is None: + captcha_type = 'FunCaptchaTaskProxyLess' + else: + captcha_type = 'FunCaptchaTask' + + task_data = { + 'type': captcha_type, + 'websiteURL': 'https://iframe.arkoselabs.com', + 'websitePublicKey': self.CAPTCHA_SITE_KEY, + 'funcaptchaApiJSSubdomain': 'https://client-api.arkoselabs.com', + } + if self.use_blob_data: + task_data['data'] = '{"blob":"%s"}' % blob + task_data['userAgent'] = self.client._user_agent + task = self.create_task(task_data) + while True: + sleep(self.get_result_interval) + result = self.get_task_result(task['taskId']) + if result['status'] in ('ready', 'failed'): + return result diff --git a/twikit/client.py b/twikit/client.py index 41eab8f9..a595c773 100644 --- a/twikit/client.py +++ b/twikit/client.py @@ -5,26 +5,34 @@ import time import warnings from functools import partial -from typing import Generator, Literal +from typing import Any, Generator, Literal import filetype -from fake_useragent import UserAgent +import httpx from httpx import Response from .bookmark import BookmarkFolder +from ._captcha import Capsolver from .community import Community, CommunityMember from .errors import ( + AccountSuspended, + BadRequest, CouldNotTweet, + Forbidden, InvalidMedia, + NotFound, + RequestTimeout, + ServerError, TweetNotAvailable, TwitterException, + TooManyRequests, + Unauthorized, UserNotFound, UserUnavailable, raise_exceptions_from_response ) from .geo import Place, _places_from_response from .group import Group, GroupMessage -from .http import HTTPClient from .list import List from .message import Message from .notification import Notification @@ -55,10 +63,134 @@ ) -class Client: +class BaseClient: + """:meta private:""" + + def __init__( + self, + language: str | None = None, + proxy: str | None = None, + captcha_solver: Capsolver | None = None, + **kwargs + ) -> None: + if 'proxies' in kwargs: + message = ( + "The 'proxies' argument is now deprecated. Use 'proxy' " + "instead. https://github.com/encode/httpx/pull/2879" + ) + warnings.warn(message) + + self.http = httpx.Client(proxy=proxy, **kwargs) + self.language = language + self.proxy = proxy + self.captcha_solver = captcha_solver + if captcha_solver is not None: + captcha_solver.client = self + + self._token = TOKEN + self._user_id = None + self._user_agent = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/122.0.0.0 Safari/537.36') + self._act_as = None + + def request( + self, + method: str, + url: str, + auto_unlock: bool = True, + raise_exception: bool = True, + **kwargs + ) -> tuple[dict | Any, httpx.Response]: + cookies_backup = self.get_cookies().copy() + response = self.http.request(method, url, **kwargs) + self._remove_duplicate_ct0_cookie() + + try: + response_data = response.json() + except json.decoder.JSONDecodeError: + response_data = response.text + + if isinstance(response_data, dict) and 'errors' in response_data: + error_code = response_data['errors'][0]['code'] + error_message = response_data['errors'][0].get('message') + if error_code == 37: + # Account suspended + raise AccountSuspended(error_message) + + if error_code == 326: + # Account unlocking + if self.captcha_solver is None: + raise TwitterException( + 'Your account is locked. Visit ' + 'https://twitter.com/account/access to unlock it.' + ) + if auto_unlock: + self.unlock() + self.set_cookies(cookies_backup, clear_cookies=True) + response = self.http.request(method, url, **kwargs) + self._remove_duplicate_ct0_cookie() + try: + response_data = response.json() + except json.decoder.JSONDecodeError: + response_data = response.text + + status_code = response.status_code + + if status_code >= 400 and raise_exception: + message = f'status: {status_code}, message: "{response.text}"' + if status_code == 400: + raise BadRequest(message, headers=response.headers) + elif status_code == 401: + raise Unauthorized(message, headers=response.headers) + elif status_code == 403: + raise Forbidden(message, headers=response.headers) + elif status_code == 404: + raise NotFound(message, headers=response.headers) + elif status_code == 408: + raise RequestTimeout(message, headers=response.headers) + elif status_code == 429: + raise TooManyRequests(message, headers=response.headers) + elif 500 <= status_code < 600: + raise ServerError(message, headers=response.headers) + else: + raise TwitterException(message, headers=response.headers) + + return response_data, response + + def get(self, url, **kwargs) -> tuple[dict | Any, httpx.Response]: + return self.request('GET', url, **kwargs) + + def post(self, url, **kwargs) -> tuple[dict | Any, httpx.Response]: + return self.request('POST', url, **kwargs) + + def stream(self, *args, **kwargs): + response = self.http.stream(*args, **kwargs) + return response + + def _remove_duplicate_ct0_cookie(self) -> None: + cookies = {} + for cookie in self.http.cookies.jar: + if 'ct0' in cookies and cookie.name == 'ct0': + continue + cookies[cookie.name] = cookie.value + self.http.cookies = list(cookies.items()) + + +class Client(BaseClient): """ A client for interacting with the Twitter API. + Parameters + ---------- + language : :class:`str` | None, default=None + The language code to use in API requests. + proxy : :class:`str` | None, default=None + The proxy server URL to use for request + (e.g., 'http://0.0.0.0:0000'). + captcha_solver : :class:`.Capsolver` | None, default=None + See :class:`.Capsolver`. + Examples -------- >>> client = Client(language='en-US') @@ -69,27 +201,15 @@ class Client: ... password='00000000' ... ) """ - - def __init__( - self, language: str | None = None, - proxies: dict | None = None, **kwargs - ) -> None: - self._token = TOKEN - self.language = language - self.http = HTTPClient(proxies=proxies, **kwargs) - self._user_id = None - self._user_agent = UserAgent().random.strip() - self._act_as = None - def _get_guest_token(self) -> str: headers = self._base_headers headers.pop('X-Twitter-Active-User') headers.pop('X-Twitter-Auth-Type') - response = self.http.post( + response, _ = self.post( Endpoint.GUEST_TOKEN, headers=headers, data={} - ).json() + ) guest_token = response['guest_token'] return guest_token @@ -111,7 +231,7 @@ def _base_headers(self) -> dict[str, str]: headers['Accept-Language'] = self.language headers['X-Twitter-Client-Language'] = self.language - csrf_token = self._get_csrf_token() + csrf_token = self.http.cookies.get('ct0') if csrf_token is not None: headers['X-Csrf-Token'] = csrf_token if self._act_as is not None: @@ -119,17 +239,59 @@ def _base_headers(self) -> dict[str, str]: headers['X-Contributor-Version'] = '1' return headers - def _get_csrf_token(self) -> str: - """ - Retrieves the Cross-Site Request Forgery (CSRF) token from the - current session's cookies. + def unlock(self) -> None: + if self.captcha_solver is None: + raise ValueError('Captcha solver is not provided.') - Returns - ------- - str - The CSRF token as a string. - """ - return self.http.client.cookies.get('ct0') + response, html = self.captcha_solver.get_unlock_html() + + if html.delete_button: + response, html = self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + + if html.start_button or html.finish_button: + response, html = self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + + cookies_backup = self.get_cookies().copy() + max_unlock_attempts = self.captcha_solver.max_attempts + attempt = 0 + while attempt < max_unlock_attempts: + attempt += 1 + + if html.authenticity_token is None: + response, html = self.captcha_solver.get_unlock_html() + + result = self.captcha_solver.solve_funcaptcha(html.blob) + if result['errorId'] == 1: + continue + + self.set_cookies(cookies_backup, clear_cookies=True) + response, html = self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + result['solution']['token'], + ) + + if html.finish_button: + response, html = self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + finished = ( + response.next_request is not None and + response.next_request.url.path == '/' + ) + if finished: + return + raise Exception('could not unlock the account.') def login( self, @@ -166,7 +328,7 @@ def login( ... password='00000000' ... ) """ - self.http.client.cookies.clear() + self.http.cookies.clear() guest_token = self._get_guest_token() headers = self._base_headers | { 'x-guest-token': guest_token @@ -250,7 +412,7 @@ def logout(self) -> Response: """ Logs out of the currently logged-in account. """ - response = self.http.post( + _, response = self.http.post( Endpoint.LOGOUT, headers=self._base_headers ) @@ -262,10 +424,10 @@ def user_id(self) -> str: """ if self._user_id is not None: return self._user_id - response = self.http.get( + response, _ = self.get( Endpoint.SETTINGS, headers=self._base_headers - ).json() + ) screen_name = response['screen_name'] self._user_id = self.get_user_by_screen_name(screen_name).id return self._user_id @@ -292,7 +454,7 @@ def get_cookies(self) -> dict: .load_cookies .save_cookies """ - return dict(self.http.client.cookies) + return dict(self.http.cookies) def save_cookies(self, path: str) -> None: """ @@ -340,8 +502,8 @@ def set_cookies(self, cookies: dict, clear_cookies: bool = False) -> None: .save_cookies """ if clear_cookies: - self.http.client.cookies.clear() - self.http.client.cookies.update(cookies) + self.http.cookies.clear() + self.http.cookies.update(cookies) def load_cookies(self, path: str) -> None: """ @@ -400,11 +562,11 @@ def _search( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.SEARCH_TIMELINE, params=params, headers=self._base_headers - ).json() + ) return response @@ -591,11 +753,11 @@ def get_similar_tweets(self, tweet_id: str) -> list[Tweet]: 'variables': {'tweet_id': tweet_id}, 'features': SIMILAR_POSTS_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.SIMILAR_POSTS, params=params, headers=self._base_headers - ).json() + ) items_ = find_dict(response, 'entries') results = [] if not items_: @@ -710,11 +872,11 @@ def upload_media( } if media_category is not None: params['media_category'] = media_category - response = self.http.post( + response, _ = self.post( endpoint, params=params, headers=self._base_headers - ).json() + ) media_id = response['media_id'] # =========== APPEND ============ segment_index = 0 @@ -738,7 +900,7 @@ def upload_media( 'application/octet-stream', ) } - self.http.post( + self.post( endpoint, params=params, headers=headers, @@ -754,7 +916,7 @@ def upload_media( 'command': 'FINALIZE', 'media_id': media_id, } - self.http.post( + self.post( endpoint, params=params, headers=self._base_headers, @@ -797,11 +959,11 @@ def check_media_status( endpoint = Endpoint.UPLOAD_MEDIA_2 else: endpoint = Endpoint.UPLOAD_MEDIA - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) return response def create_media_metadata( @@ -843,21 +1005,12 @@ def create_media_metadata( data['alt_text'] = {'text': alt_text} if sensitive_warning is not None: data['sensitive_media_warning'] = sensitive_warning - return self.http.post( + _, response = self.post( Endpoint.CREATE_MEDIA_METADATA, json=data, headers=self._base_headers ) - - def get_media(self, url: str) -> bytes: - """Retrieves media bytes. - - Parameters - ---------- - url : str - Media URL - """ - return self.http.get(url, headers=self._base_headers).content + return response def create_poll( self, @@ -904,11 +1057,11 @@ def create_poll( headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = self.http.post( + response, _ = self.post( Endpoint.CREATE_CARD, data=data, headers=headers, - ).json() + ) return response['card_uri'] @@ -948,11 +1101,11 @@ def vote( headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = self.http.post( + response, _ = self.post( Endpoint.VOTE, data=data, headers=headers - ).json() + ) card_data = { 'rest_id': response['card']['url'], @@ -1112,11 +1265,11 @@ def create_tweet( 'queryId': get_query_id(Endpoint.CREATE_TWEET), 'features': features, } - response = self.http.post( + response, _ = self.post( endpoint, json=data, headers=self._base_headers, - ).json() + ) _result = find_dict(response, 'result') if not _result: @@ -1183,11 +1336,11 @@ def create_scheduled_tweet( 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_SCHEDULED_TWEET), } - response = self.http.post( + response, _ = self.post( Endpoint.CREATE_SCHEDULED_TWEET, json=data, headers=self._base_headers, - ).json() + ) return response['data']['tweet']['rest_id'] def delete_tweet(self, tweet_id: str) -> Response: @@ -1215,7 +1368,7 @@ def delete_tweet(self, tweet_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_TWEET) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_TWEET, json=data, headers=self._base_headers @@ -1253,11 +1406,11 @@ def get_user_by_screen_name(self, screen_name: str) -> User: 'features': USER_FEATURES, 'fieldToggles': {'withAuxiliaryUserLabels': False} }) - response = self.http.get( + response, _ = self.get( Endpoint.USER_BY_SCREEN_NAME, params=params, headers=self._base_headers - ).json() + ) if 'user' not in response['data']: raise UserNotFound('The user does not exist.') @@ -1297,11 +1450,11 @@ def get_user_by_id(self, user_id: str) -> User: 'variables': variables, 'features': USER_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.USER_BY_REST_ID, params=params, headers=self._base_headers - ).json() + ) if 'result' not in response['data']['user']: raise TwitterException(f'Invalid user id: {user_id}') user_data = response['data']['user']['result'] @@ -1345,11 +1498,11 @@ def reverse_geocode( if v is None: params.pop(k) - response = self.http.get( + response, _ = self.get( Endpoint.REVERSE_GEOCODE, params=params, headers=self._base_headers - ).json() + ) return _places_from_response(self, response) def search_geo( @@ -1396,11 +1549,11 @@ def search_geo( if v is None: params.pop(k) - response = self.http.get( + response, _ = self.get( Endpoint.SEARCH_GEO, params=params, headers=self._base_headers - ).json() + ) return _places_from_response(self, response) def get_place(self, id: str) -> Place: @@ -1414,10 +1567,10 @@ def get_place(self, id: str) -> Place: ------- :class:`.Place` """ - response = self.http.get( + response, _ = self.get( Endpoint.PLACE_BY_ID.format(id), headers=self._base_headers - ).json() + ) return Place(self, response) def _get_tweet_detail(self, tweet_id: str, cursor: str | None): @@ -1438,11 +1591,11 @@ def _get_tweet_detail(self, tweet_id: str, cursor: str | None): 'features': FEATURES, 'fieldToggles': {'withAuxiliaryUserLabels': False} }) - response = self.http.get( + response, _ = self.get( Endpoint.TWEET_DETAIL, params=params, headers=self._base_headers - ).json() + ) return response def _get_more_replies(self, tweet_id: str, cursor: str) -> Result[Tweet]: @@ -1595,11 +1748,11 @@ def get_scheduled_tweets(self) -> list[ScheduledTweet]: params = flatten_params({ 'variables': {'ascending': True} }) - response = self.http.get( + response, _ = self.get( Endpoint.FETCH_SCHEDULED_TWEETS, params=params, headers=self._base_headers - ).json() + ) tweets = find_dict(response, 'scheduled_tweet_list')[0] return [ScheduledTweet(self, tweet) for tweet in tweets] @@ -1623,7 +1776,7 @@ def delete_scheduled_tweet(self, tweet_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_SCHEDULED_TWEET) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_SCHEDULED_TWEET, json=data, headers=self._base_headers @@ -1647,11 +1800,11 @@ def _get_tweet_engagements( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) items_ = find_dict(response, 'entries') if not items_: return Result([]) @@ -1779,11 +1932,11 @@ def get_community_note(self, note_id: str) -> CommunityNote: 'variables': {'note_id': note_id}, 'features': COMMUNITY_NOTE_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.FETCH_COMMUNITY_NOTE, params=params, headers=self._base_headers - ).json() + ) note_data = response['data']['birdwatch_note_by_rest_id'] if 'data_v1' not in note_data: raise TwitterException(f'Invalid note id: {note_id}') @@ -1874,11 +2027,11 @@ def get_user_tweets( 'Likes': Endpoint.USER_LIKES, }[tweet_type] - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) instructions_ = find_dict(response, 'instructions') if not instructions_: @@ -1989,11 +2142,11 @@ def get_timeline( 'queryId': get_query_id(Endpoint.HOME_TIMELINE), 'features': FEATURES, } - response = self.http.post( + response, _ = self.post( Endpoint.HOME_TIMELINE, json=data, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -2070,11 +2223,11 @@ def get_latest_timeline( 'queryId': get_query_id(Endpoint.HOME_LATEST_TIMELINE), 'features': FEATURES, } - response = self.http.post( + response, _ = self.post( Endpoint.HOME_LATEST_TIMELINE, json=data, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -2122,7 +2275,7 @@ def favorite_tweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.FAVORITE_TWEET) } - response = self.http.post( + _, response = self.post( Endpoint.FAVORITE_TWEET, json=data, headers=self._base_headers @@ -2156,7 +2309,7 @@ def unfavorite_tweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.UNFAVORITE_TWEET) } - response = self.http.post( + _, response = self.post( Endpoint.UNFAVORITE_TWEET, json=data, headers=self._base_headers @@ -2190,7 +2343,7 @@ def retweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id, 'dark_request': False}, 'queryId': get_query_id(Endpoint.CREATE_RETWEET) } - response = self.http.post( + _, response = self.post( Endpoint.CREATE_RETWEET, json=data, headers=self._base_headers @@ -2224,7 +2377,7 @@ def delete_retweet(self, tweet_id: str) -> Response: 'variables': {'source_tweet_id': tweet_id,'dark_request': False}, 'queryId': get_query_id(Endpoint.DELETE_RETWEET) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_RETWEET, json=data, headers=self._base_headers @@ -2265,7 +2418,7 @@ def bookmark_tweet( 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_BOOKMARK) } - response = self.http.post( + _, response = self.post( endpoint, json=data, headers=self._base_headers @@ -2299,7 +2452,7 @@ def delete_bookmark(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.DELETE_BOOKMARK) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_BOOKMARK, json=data, headers=self._base_headers @@ -2360,11 +2513,11 @@ def get_bookmarks( 'variables': variables, 'features': features }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) items_ = find_dict(response, 'entries') if not items_: @@ -2413,7 +2566,7 @@ def delete_all_bookmarks(self) -> Response: 'variables': {}, 'queryId': get_query_id(Endpoint.BOOKMARKS_ALL_DELETE) } - response = self.http.post( + _, response = self.post( Endpoint.BOOKMARKS_ALL_DELETE, json=data, headers=self._base_headers @@ -2442,11 +2595,11 @@ def get_bookmark_folders( if cursor is not None: variables['cursor'] = cursor params = flatten_params({'variables': variables}) - response = self.http.get( + response, _ = self.get( Endpoint.BOOKMARK_FOLDERS, params=params, headers=self._base_headers - ).json() + ) slice = find_dict(response, 'bookmark_collections_slice')[0] results = [] @@ -2496,11 +2649,11 @@ def edit_bookmark_folder( 'variables': variables, 'queryId': get_query_id(Endpoint.EDIT_BOOKMARK_FOLDER) } - response = self.http.post( + response, _ = self.post( Endpoint.EDIT_BOOKMARK_FOLDER, json=data, headers=self._base_headers - ).json() + ) return BookmarkFolder( self, response['data']['bookmark_collection_update'] ) @@ -2526,7 +2679,7 @@ def delete_bookmark_folder(self, folder_id: str) -> Response: 'variables': variables, 'queryId': get_query_id(Endpoint.DELETE_BOOKMARK_FOLDER) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_BOOKMARK_FOLDER, json=data, headers=self._base_headers @@ -2553,11 +2706,11 @@ def create_bookmark_folder(self, name: str) -> BookmarkFolder: 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_BOOKMARK_FOLDER) } - response = self.http.post( + response, _ = self.post( Endpoint.CREATE_BOOKMARK_FOLDER, json=data, headers=self._base_headers - ).json() + ) return BookmarkFolder( self, response['data']['bookmark_collection_create'] ) @@ -2603,7 +2756,7 @@ def follow_user(self, user_id: str) -> Response: headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = self.http.post( + _, response = self.post( Endpoint.CREATE_FRIENDSHIPS, data=data, headers=headers @@ -2651,7 +2804,7 @@ def unfollow_user(self, user_id: str) -> Response: headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = self.http.post( + _, response = self.post( Endpoint.DESTROY_FRIENDSHIPS, data=data, headers=headers @@ -2679,7 +2832,7 @@ def block_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = self.http.post( + _, response = self.post( Endpoint.BLOCK_USER, data=data, headers=headers @@ -2707,7 +2860,7 @@ def unblock_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = self.http.post( + _, response = self.post( Endpoint.UNBLOCK_USER, data=data, headers=headers @@ -2735,7 +2888,7 @@ def mute_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = self.http.post( + _, response = self.post( Endpoint.MUTE_USER, data=data, headers=headers @@ -2763,7 +2916,7 @@ def unmute_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = self.http.post( + _, response = self.post( Endpoint.UNMUTE_USER, data=data, headers=headers @@ -2825,11 +2978,11 @@ def get_trends( } if additional_request_params is not None: params |= additional_request_params - response = self.http.get( + response, _ = self.get( Endpoint.TREND, params=params, headers=self._base_headers - ).json() + ) entry_id_prefix = 'trends' if category == 'trending' else 'Guide' entries = [ @@ -2863,10 +3016,10 @@ def get_available_locations(self) -> list[Location]: ------- list[:class:`.Location`] """ - response = self.http.get( + response, _ = self.get( Endpoint.AVAILABLE_LOCATIONS, headers=self._base_headers - ).json() + ) return [Location(self, data) for data in response] def get_place_trends(self, woeid: int) -> PlaceTrends: @@ -2875,11 +3028,11 @@ def get_place_trends(self, woeid: int) -> PlaceTrends: You can get available woeid using :attr:`.Client.get_available_locations`. """ - response = self.http.get( + response, _ = self.get( Endpoint.PLACE_TRENDS, params={'id': woeid}, headers=self._base_headers - ).json() + ) trend_data = response[0] trends = [PlaceTrend(self, data) for data in trend_data['trends']] trend_data['trends'] = trends @@ -2906,11 +3059,11 @@ def _get_user_friendship( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] results = [] @@ -2951,11 +3104,11 @@ def _get_user_friendship_2( if cursor is not None: params['cursor'] = cursor - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) users = response['users'] results = [] @@ -3150,11 +3303,11 @@ def _get_friendship_ids( if cursor is not None: params['cursor'] = cursor - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) previous_cursor = response['previous_cursor'] next_cursor = response['next_cursor'] @@ -3256,11 +3409,12 @@ def _send_dm( if reply_to is not None: data['reply_to_dm_id'] = reply_to - return self.http.post( + response, _ = self.post( Endpoint.SEND_DM, json=data, headers=self._base_headers - ).json() + ) + return response def _get_dm_history( self, @@ -3276,11 +3430,12 @@ def _get_dm_history( if max_id is not None: params['max_id'] = max_id - return self.http.get( + response, _ = self.get( Endpoint.CONVERSATION.format(conversation_id), params=params, headers=self._base_headers - ).json() + ) + return response def send_dm( self, @@ -3376,7 +3531,7 @@ def add_reaction_to_message( 'variables': variables, 'queryId': get_query_id(Endpoint.MESSAGE_ADD_REACTION) } - response = self.http.post( + _, response = self.post( Endpoint.MESSAGE_ADD_REACTION, json=data, headers=self._base_headers @@ -3422,7 +3577,7 @@ def remove_reaction_from_message( 'variables': variables, 'queryId': get_query_id(Endpoint.MESSAGE_REMOVE_REACTION) } - response = self.http.post( + _, response = self.post( Endpoint.MESSAGE_REMOVE_REACTION, json=data, headers=self._base_headers @@ -3454,7 +3609,7 @@ def delete_dm(self, message_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_DM) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_DM, json=data, headers=self._base_headers @@ -3653,11 +3808,11 @@ def get_group(self, group_id: str) -> Group: 'context': 'FETCH_DM_CONVERSATION_HISTORY', 'include_conversation_info': True, } - response = self.http.get( + response, _ = self.get( Endpoint.CONVERSATION.format(group_id), params=params, headers=self._base_headers - ).json() + ) return Group(self, group_id, response) def add_members_to_group( @@ -3690,7 +3845,7 @@ def add_members_to_group( }, 'queryId': get_query_id(Endpoint.ADD_MEMBER_TO_GROUP) } - response = self.http.post( + _, response = self.post( Endpoint.ADD_MEMBER_TO_GROUP, json=data, headers=self._base_headers @@ -3717,7 +3872,7 @@ def change_group_name(self, group_id: str, name: str) -> Response: }) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = self.http.post( + _, response = self.post( Endpoint.CHANGE_GROUP_NAME.format(group_id), data=data, headers=headers @@ -3764,11 +3919,11 @@ def create_list( 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.CREATE_LIST) } - response = self.http.post( + response, _ = self.post( Endpoint.CREATE_LIST, json=data, headers=self._base_headers - ).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -3803,7 +3958,7 @@ def edit_list_banner(self, list_id: str, media_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.EDIT_LIST_BANNER) } - response = self.http.post( + _, response = self.post( Endpoint.EDIT_LIST_BANNER, json=data, headers=self._base_headers @@ -3830,7 +3985,7 @@ def delete_list_banner(self, list_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.DELETE_LIST_BANNER) } - response = self.http.post( + _, response = self.post( Endpoint.DELETE_LIST_BANNER, json=data, headers=self._base_headers @@ -3884,11 +4039,11 @@ def edit_list( 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.UPDATE_LIST) } - response = self.http.post( + response, _ = self.post( Endpoint.UPDATE_LIST, json=data, headers=self._base_headers - ).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -3921,7 +4076,7 @@ def add_list_member(self, list_id: str, user_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.LIST_ADD_MEMBER) } - response = self.http.post( + _, response = self.post( Endpoint.LIST_ADD_MEMBER, json=data, headers=self._base_headers @@ -3957,7 +4112,7 @@ def remove_list_member(self, list_id: str, user_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.LIST_REMOVE_MEMBER) } - response = self.http.post( + _, response = self.post( Endpoint.LIST_REMOVE_MEMBER, json=data, headers=self._base_headers @@ -4000,11 +4155,11 @@ def get_lists( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.LIST_MANAGEMENT, params=params, headers=self._base_headers - ).json() + ) entries = find_dict(response, 'entries')[0] items = find_dict(entries, 'items') @@ -4044,11 +4199,11 @@ def get_list(self, list_id: str) -> List: 'variables': {'listId': list_id}, 'features': LIST_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.LIST_BY_REST_ID, params=params, headers=self._base_headers - ).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -4100,11 +4255,11 @@ def get_list_tweets( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.LIST_LATEST_TWEETS, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -4140,11 +4295,11 @@ def _get_list_users( 'variables': variables, 'features': FEATURES }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] results = [] @@ -4334,11 +4489,11 @@ def get_notifications( if cursor is not None: params['cursor'] = cursor - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) global_objects = response['globalObjects'] users = { @@ -4424,11 +4579,11 @@ def search_community( params = flatten_params({ 'variables': variables }) - response = self.http.get( + response, _ = self.get( Endpoint.SEARCH_COMMUNITY, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'items_results')[0] communities = [] @@ -4468,11 +4623,11 @@ def get_community(self, community_id: str) -> Community: 'c9s_superc9s_indication_enabled':False } }) - response = self.http.get( + response, _ = self.get( Endpoint.GET_COMMUNITY, params=params, headers=self._base_headers - ).json() + ) community_data = find_dict(response, 'result')[0] return Community(self, community_data) @@ -4535,11 +4690,11 @@ def get_community_tweets( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) entries = find_dict(response, 'entries')[0] if tweet_type == 'Media': @@ -4611,11 +4766,11 @@ def get_communities_timeline( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.COMMUNITIES_TIMELINE, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] tweets = [] for item in items: @@ -4667,11 +4822,11 @@ def join_community(self, community_id: str) -> Community: 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.JOIN_COMMUNITY) } - response = self.http.post( + response, _ = self.post( Endpoint.JOIN_COMMUNITY, json=data, headers=self._base_headers - ).json() + ) community_data = response['data']['community_join'] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4697,11 +4852,11 @@ def leave_community(self, community_id: str) -> Community: 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.LEAVE_COMMUNITY) } - response = self.http.post( + response, _ = self.post( Endpoint.LEAVE_COMMUNITY, json=data, headers=self._base_headers - ).json() + ) community_data = response['data']['community_leave'] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4732,11 +4887,11 @@ def request_to_join_community( 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.REQUEST_TO_JOIN_COMMUNITY) } - response = self.http.post( + response, _ = self.post( Endpoint.REQUEST_TO_JOIN_COMMUNITY, json=data, headers=self._base_headers - ).json() + ) community_data = find_dict(response, 'result')[0] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4759,11 +4914,11 @@ def _get_community_users( 'responsive_web_graphql_timeline_navigation_enabled': True } }) - response = self.http.get( + response, _ = self.get( endpoint, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'items_results')[0] users = [] @@ -4878,11 +5033,11 @@ def search_community_tweet( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = self.http.get( + response, _ = self.get( Endpoint.SEARCH_COMMUNITY_TWEET, params=params, headers=self._base_headers - ).json() + ) items = find_dict(response, 'entries')[0] tweets = [] @@ -4912,10 +5067,10 @@ def _stream(self, topics: set[str]) -> Generator[tuple[str, Payload]]: headers.pop('content-type') params = {'topics': ','.join(topics)} - with self.http.stream( + with self.stream( 'GET', Endpoint.EVENTS, params=params, timeout=None ) as response: - self.http._remove_duplicate_ct0_cookie() + self._remove_duplicate_ct0_cookie() for line in response.iter_lines(): try: data = json.loads(line) @@ -5017,9 +5172,9 @@ def _update_subscriptions( headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' headers['LivePipeline-Session'] = session.id - response = self.http.post( + response, _ = self.post( Endpoint.UPDATE_SUBSCRIPTIONS, data=data, headers=headers - ).json() + ) session.topics |= subscribe session.topics -= unsubscribe diff --git a/twikit/errors.py b/twikit/errors.py index 076f869e..e1c91ad6 100644 --- a/twikit/errors.py +++ b/twikit/errors.py @@ -84,6 +84,11 @@ class UserUnavailable(TwitterException): Exception raised when a user is unavailable. """ +class AccountSuspended(TwitterException): + """ + Exception raised when the account is suspended. + """ + ERROR_CODE_TO_EXCEPTION: dict[int, TwitterException] = { 187: DuplicateTweet, 324: InvalidMedia diff --git a/twikit/http.py b/twikit/http.py deleted file mode 100644 index 8e1bfbdb..00000000 --- a/twikit/http.py +++ /dev/null @@ -1,66 +0,0 @@ -import httpx - -from .errors import ( - TwitterException, - BadRequest, - Unauthorized, - Forbidden, - NotFound, - RequestTimeout, - TooManyRequests, - ServerError -) - - -class HTTPClient: - def __init__(self, **kwargs) -> None: - self.client = httpx.Client(**kwargs) - - def request( - self, - method: str, - url: str, - **kwargs - ) -> httpx.Response: - response = self.client.request(method, url, **kwargs) - status_code = response.status_code - self._remove_duplicate_ct0_cookie() - - if status_code >= 400: - message = f'status: {status_code}, message: "{response.text}"' - if status_code == 400: - raise BadRequest(message, headers=response.headers) - elif status_code == 401: - raise Unauthorized(message, headers=response.headers) - elif status_code == 403: - raise Forbidden(message, headers=response.headers) - elif status_code == 404: - raise NotFound(message, headers=response.headers) - elif status_code == 408: - raise RequestTimeout(message, headers=response.headers) - elif status_code == 429: - raise TooManyRequests(message, headers=response.headers) - elif 500 <= status_code < 600: - raise ServerError(message, headers=response.headers) - else: - raise TwitterException(message, headers=response.headers) - - return response - - def get(self, url, **kwargs) -> httpx.Response: - return self.request('GET', url, **kwargs) - - def post(self, url, **kwargs) -> httpx.Response: - return self.request('POST', url, **kwargs) - - def stream(self, *args, **kwargs): - response = self.client.stream(*args, **kwargs) - return response - - def _remove_duplicate_ct0_cookie(self) -> None: - cookies = {} - for cookie in self.client.cookies.jar: - if 'ct0' in cookies and cookie.name == 'ct0': - continue - cookies[cookie.name] = cookie.value - self.client.cookies = list(cookies.items()) diff --git a/twikit/twikit_async/__init__.py b/twikit/twikit_async/__init__.py index 755c8aa6..365ae1ca 100644 --- a/twikit/twikit_async/__init__.py +++ b/twikit/twikit_async/__init__.py @@ -4,6 +4,7 @@ if os.name == 'nt': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) +from ._captcha import Capsolver from .bookmark import BookmarkFolder from ..errors import * from ..utils import build_query diff --git a/twikit/twikit_async/_captcha/__init__.py b/twikit/twikit_async/_captcha/__init__.py new file mode 100644 index 00000000..85f95d91 --- /dev/null +++ b/twikit/twikit_async/_captcha/__init__.py @@ -0,0 +1,2 @@ +from .base import CaptchaSolver +from .capsolver import Capsolver diff --git a/twikit/twikit_async/_captcha/base.py b/twikit/twikit_async/_captcha/base.py new file mode 100644 index 00000000..9dc7767e --- /dev/null +++ b/twikit/twikit_async/_captcha/base.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, NamedTuple + +from bs4 import BeautifulSoup +from httpx import Response + +from twikit.utils import urlencode + +if TYPE_CHECKING: + from ..client import Client + + +class UnlockHTML(NamedTuple): + authenticity_token: str + assignment_token: str + needs_unlock: bool + start_button: bool + finish_button: bool + delete_button: bool + blob: str + + +class CaptchaSolver: + client: Client + max_attempts: int + + CAPTCHA_URL = 'https://twitter.com/account/access' + CAPTCHA_SITE_KEY = '0152B4EB-D2DC-460A-89A1-629838B529C9' + + async def get_unlock_html(self) -> tuple[Response, UnlockHTML]: + headers = { + 'X-Twitter-Client-Language': 'en-US', + 'User-Agent': self.client._user_agent, + 'Upgrade-Insecure-Requests': '1' + } + _, response = await self.client.get( + self.CAPTCHA_URL, headers=headers + ) + return response, parse_unlock_html(response.text) + + async def ui_metrix(self) -> str: + js, _ = await self.client.get( + 'https://twitter.com/i/js_inst?c_name=ui_metrics' + ) + return re.findall(r'return ({.*?});', js, re.DOTALL)[0] + + async def confirm_unlock( + self, + authenticity_token: str, + assignment_token: str, + verification_string: str = None, + ui_metrics: bool = False + ) -> tuple[Response, UnlockHTML]: + data = { + 'authenticity_token': authenticity_token, + 'assignment_token': assignment_token, + 'lang': 'en', + 'flow': '', + } + params = {} + if verification_string: + data['verification_string'] = verification_string + data['language_code'] = 'en' + params['lang'] = 'en' + if ui_metrics: + data['ui_metrics'] = await self.ui_metrix() + data = urlencode(data) + headers = { + 'Content-Type': 'application/x-www-form-urlencoded', + 'Upgrade-Insecure-Requests': '1', + 'Referer': self.CAPTCHA_URL + } + _, response = await self.client.post( + self.CAPTCHA_URL, params=params, data=data, headers=headers + ) + return response, parse_unlock_html(response.text) + + +def parse_unlock_html(html: str) -> UnlockHTML: + soup = BeautifulSoup(html, 'lxml') + + authenticity_token = None + authenticity_token_element = soup.find( + 'input', {'name': 'authenticity_token'} + ) + if authenticity_token_element is not None: + authenticity_token: str = authenticity_token_element.get('value') + + assignment_token = None + assignment_token_element = soup.find('input', {'name': 'assignment_token'}) + if assignment_token_element is not None: + assignment_token = assignment_token_element.get('value') + + verification_string = soup.find('input', id='verification_string') + needs_unlock = bool(verification_string) + start_button = bool(soup.find('input', value='Start')) + finish_button = bool(soup.find('input', value='Continue to X')) + delete_button = bool(soup.find('input', value='Delete')) + + iframe = soup.find(id='arkose_iframe') + blob = re.findall(r'data=(.+)', iframe['src'])[0] if iframe else None + + return UnlockHTML( + authenticity_token, + assignment_token, + needs_unlock, + start_button, + finish_button, + delete_button, + blob + ) diff --git a/twikit/twikit_async/_captcha/capsolver.py b/twikit/twikit_async/_captcha/capsolver.py new file mode 100644 index 00000000..3d3a0a6e --- /dev/null +++ b/twikit/twikit_async/_captcha/capsolver.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from time import sleep + +import httpx + +from .base import CaptchaSolver + + +class Capsolver(CaptchaSolver): + """ + You can automatically unlock the account by passing the `captcha_solver` + argument when initialising the :class:`.Client`. + + First, visit https://capsolver.com and obtain your Capsolver API key. + Next, pass the Capsolver instance to the client as shown in the example. + + .. code-block:: python + + from twikit.twikit_async import Capsolver, Client + solver = Capsolver( + api_key='your_api_key', + max_attempts=10 + ) + client = Client(captcha_solver=solver) + + Parameters + ---------- + api_key : :class:`str` + Capsolver API key. + max_attempts : :class:`int`, default=3 + The maximum number of attempts to solve the captcha. + get_result_interval : :class:`float`, default=1.0 + + use_blob_data : :class:`bool`, default=False + """ + def __init__( + self, + api_key: str, + max_attempts: int = 3, + get_result_interval: float = 1.0, + use_blob_data: bool = False + ) -> None: + self.api_key = api_key + self.get_result_interval = get_result_interval + self.max_attempts = max_attempts + self.use_blob_data = use_blob_data + + def create_task(self, task_data: dict) -> dict: + data = { + 'clientKey': self.api_key, + 'task': task_data + } + response = httpx.post( + 'https://api.capsolver.com/createTask', + json=data, + headers={'content-type': 'application/json'} + ).json() + return response + + def get_task_result(self, task_id: str) -> dict: + data = { + 'clientKey': self.api_key, + 'taskId': task_id + } + response = httpx.post( + 'https://api.capsolver.com/getTaskResult', + json=data, + headers={'content-type': 'application/json'} + ).json() + return response + + def solve_funcaptcha(self, blob: str) -> dict: + if self.client.proxy is None: + captcha_type = 'FunCaptchaTaskProxyLess' + else: + captcha_type = 'FunCaptchaTask' + + task_data = { + 'type': captcha_type, + 'websiteURL': 'https://iframe.arkoselabs.com', + 'websitePublicKey': self.CAPTCHA_SITE_KEY, + 'funcaptchaApiJSSubdomain': 'https://client-api.arkoselabs.com', + } + if self.use_blob_data: + task_data['data'] = '{"blob":"%s"}' % blob + task_data['userAgent'] = self.client._user_agent + task = self.create_task(task_data) + while True: + sleep(self.get_result_interval) + result = self.get_task_result(task['taskId']) + if result['status'] in ('ready', 'failed'): + return result diff --git a/twikit/twikit_async/client.py b/twikit/twikit_async/client.py index ad851811..bf2d8d4c 100644 --- a/twikit/twikit_async/client.py +++ b/twikit/twikit_async/client.py @@ -5,17 +5,25 @@ import json import warnings from functools import partial -from typing import AsyncGenerator, Literal +from typing import Any, AsyncGenerator, Literal import filetype -from fake_useragent import UserAgent +import httpx from httpx import Response from ..errors import ( + AccountSuspended, + BadRequest, CouldNotTweet, + Forbidden, InvalidMedia, - TwitterException, + NotFound, + RequestTimeout, + ServerError, TweetNotAvailable, + TwitterException, + TooManyRequests, + Unauthorized, UserNotFound, UserUnavailable, raise_exceptions_from_response @@ -40,13 +48,13 @@ urlencode ) from .bookmark import BookmarkFolder +from ._captcha import Capsolver from .community import ( Community, CommunityMember ) from .geo import Place, _places_from_response from .group import Group, GroupMessage -from .http import HTTPClient from .list import List from .message import Message from .notification import Notification @@ -57,12 +65,136 @@ from .utils import Flow, Result -class Client: +class BaseClient: + """:meta private:""" + + def __init__( + self, + language: str | None = None, + proxy: str | None = None, + captcha_solver: Capsolver | None = None, + **kwargs + ) -> None: + if 'proxies' in kwargs: + message = ( + "The 'proxies' argument is now deprecated. Use 'proxy' " + "instead. https://github.com/encode/httpx/pull/2879" + ) + warnings.warn(message) + + self.http = httpx.AsyncClient(proxy=proxy, **kwargs) + self.language = language + self.proxy = proxy + self.captcha_solver = captcha_solver + if captcha_solver is not None: + captcha_solver.client = self + + self._token = TOKEN + self._user_id = None + self._user_agent = ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/122.0.0.0 Safari/537.36') + self._act_as = None + + async def request( + self, + method: str, + url: str, + auto_unlock: bool = True, + raise_exception: bool = True, + **kwargs + ) -> tuple[dict | Any, httpx.Response]: + cookies_backup = self.get_cookies().copy() + response = await self.http.request(method, url, **kwargs) + self._remove_duplicate_ct0_cookie() + + try: + response_data = response.json() + except json.decoder.JSONDecodeError: + response_data = response.text + + if isinstance(response_data, dict) and 'errors' in response_data: + error_code = response_data['errors'][0]['code'] + error_message = response_data['errors'][0].get('message') + if error_code == 37: + # Account suspended + raise AccountSuspended(error_message) + + if error_code == 326: + # Account unlocking + if self.captcha_solver is None: + raise TwitterException( + 'Your account is locked. Visit ' + 'https://twitter.com/account/access to unlock it.' + ) + if auto_unlock: + await self.unlock() + self.set_cookies(cookies_backup, clear_cookies=True) + response = await self.http.request(method, url, **kwargs) + self._remove_duplicate_ct0_cookie() + try: + response_data = response.json() + except json.decoder.JSONDecodeError: + response_data = response.text + + status_code = response.status_code + + if status_code >= 400 and raise_exception: + message = f'status: {status_code}, message: "{response.text}"' + if status_code == 400: + raise BadRequest(message, headers=response.headers) + elif status_code == 401: + raise Unauthorized(message, headers=response.headers) + elif status_code == 403: + raise Forbidden(message, headers=response.headers) + elif status_code == 404: + raise NotFound(message, headers=response.headers) + elif status_code == 408: + raise RequestTimeout(message, headers=response.headers) + elif status_code == 429: + raise TooManyRequests(message, headers=response.headers) + elif 500 <= status_code < 600: + raise ServerError(message, headers=response.headers) + else: + raise TwitterException(message, headers=response.headers) + + return response_data, response + + async def get(self, url, **kwargs) -> tuple[dict | Any, httpx.Response]: + return await self.request('GET', url, **kwargs) + + async def post(self, url, **kwargs) -> tuple[dict | Any, httpx.Response]: + return await self.request('POST', url, **kwargs) + + async def stream(self, *args, **kwargs) -> Response: + response = await self.http.stream(*args, **kwargs) + return response + + def _remove_duplicate_ct0_cookie(self) -> None: + cookies = {} + for cookie in self.http.cookies.jar: + if 'ct0' in cookies and cookie.name == 'ct0': + continue + cookies[cookie.name] = cookie.value + self.http.cookies = list(cookies.items()) + + +class Client(BaseClient): """ A client for interacting with the Twitter API. Since this class is for asynchronous use, methods must be executed using await. + Parameters + ---------- + language : :class:`str` | None, default=None + The language code to use in API requests. + proxy : :class:`str` | None, default=None + The proxy server URL to use for request + (e.g., 'http://0.0.0.0:0000'). + captcha_solver : :class:`.Capsolver` | None, default=None + See :class:`.Capsolver`. + Examples -------- >>> client = Client(language='en-US') @@ -73,27 +205,15 @@ class Client: ... password='00000000' ... ) """ - - def __init__( - self, language: str | None = None, - proxies: dict | None = None, **kwargs - ) -> None: - self._token = TOKEN - self.language = language - self.http = HTTPClient(proxies=proxies, **kwargs) - self._user_id = None - self._user_agent = UserAgent().random.strip() - self._act_as = None - async def _get_guest_token(self) -> str: headers = self._base_headers headers.pop('X-Twitter-Active-User') headers.pop('X-Twitter-Auth-Type') - response = (await self.http.post( + response, _ = await self.post( Endpoint.GUEST_TOKEN, headers=headers, data={} - )).json() + ) guest_token = response['guest_token'] return guest_token @@ -132,7 +252,61 @@ def _get_csrf_token(self) -> str: :class:`str` The CSRF token as a string. """ - return self.http.client.cookies.get('ct0') + return self.http.cookies.get('ct0') + + async def unlock(self) -> None: + if self.captcha_solver is None: + raise ValueError('Captcha solver is not provided.') + + response, html = await self.captcha_solver.get_unlock_html() + + if html.delete_button: + response, html = await self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + + if html.start_button or html.finish_button: + response, html = await self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + + cookies_backup = self.get_cookies().copy() + max_unlock_attempts = self.captcha_solver.max_attempts + attempt = 0 + while attempt < max_unlock_attempts: + attempt += 1 + + if html.authenticity_token is None: + response, html = await self.captcha_solver.get_unlock_html() + + result = self.captcha_solver.solve_funcaptcha(html.blob) + if result['errorId'] == 1: + continue + + self.set_cookies(cookies_backup, clear_cookies=True) + response, html = await self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + result['solution']['token'], + ) + + if html.finish_button: + response, html = await self.captcha_solver.confirm_unlock( + html.authenticity_token, + html.assignment_token, + ui_metrics=True + ) + finished = ( + response.next_request is not None and + response.next_request.url.path == '/' + ) + if finished: + return + raise Exception('could not unlock the account.') async def login( self, @@ -169,7 +343,7 @@ async def login( ... password='00000000' ... ) """ - self.http.client.cookies.clear() + self.http.cookies.clear() guest_token = await self._get_guest_token() headers = self._base_headers | { 'x-guest-token': guest_token @@ -253,7 +427,7 @@ async def logout(self) -> Response: """ Logs out of the currently logged-in account. """ - response = await self.http.post( + response, _ = await self.post( Endpoint.LOGOUT, headers=self._base_headers ) @@ -265,10 +439,10 @@ async def user_id(self) -> str: """ if self._user_id is not None: return self._user_id - response = (await self.http.get( + response, _ = await self.get( Endpoint.SETTINGS, headers=self._base_headers - )).json() + ) screen_name = response['screen_name'] self._user_id = (await self.get_user_by_screen_name(screen_name)).id return self._user_id @@ -295,7 +469,7 @@ def get_cookies(self) -> dict: .load_cookies .save_cookies """ - return dict(self.http.client.cookies) + return dict(self.http.cookies) def save_cookies(self, path: str) -> None: """ @@ -343,8 +517,8 @@ def set_cookies(self, cookies: dict, clear_cookies: bool = False) -> None: .save_cookies """ if clear_cookies: - self.http.client.cookies.clear() - self.http.client.cookies.update(cookies) + self.http.cookies.clear() + self.http.cookies.update(cookies) def load_cookies(self, path: str) -> None: """ @@ -403,11 +577,11 @@ async def _search( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.SEARCH_TIMELINE, params=params, headers=self._base_headers - )).json() + ) return response @@ -596,11 +770,11 @@ async def get_similar_tweets(self, tweet_id: str) -> list[Tweet]: 'variables': {'tweet_id': tweet_id}, 'features': SIMILAR_POSTS_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.SIMILAR_POSTS, params=params, headers=self._base_headers - )).json() + ) items_ = find_dict(response, 'entries') results = [] if not items_: @@ -715,11 +889,11 @@ async def upload_media( } if media_category is not None: params['media_category'] = media_category - response = (await self.http.post( + response, _ = await self.post( endpoint, params=params, headers=self._base_headers - )).json() + ) media_id = response['media_id'] # =========== APPEND ============ segment_index = 0 @@ -746,7 +920,7 @@ async def upload_media( ) } - coro = self.http.post( + coro = self.post( endpoint, params=params, headers=headers, @@ -770,7 +944,7 @@ async def upload_media( 'command': 'FINALIZE', 'media_id': media_id, } - await self.http.post( + await self.post( endpoint, params=params, headers=self._base_headers, @@ -813,11 +987,11 @@ async def check_media_status( endpoint = Endpoint.UPLOAD_MEDIA_2 else: endpoint = Endpoint.UPLOAD_MEDIA - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) return response async def create_media_metadata( @@ -859,21 +1033,12 @@ async def create_media_metadata( data['alt_text'] = {'text': alt_text} if sensitive_warning is not None: data['sensitive_media_warning'] = sensitive_warning - return await self.http.post( + _, response = await self.post( Endpoint.CREATE_MEDIA_METADATA, json=data, headers=self._base_headers ) - - async def get_media(self, url: str) -> bytes: - """Retrieves media bytes. - - Parameters - ---------- - url : str - Media URL - """ - return (await self.http.get(url, headers=self._base_headers)).content + return response async def create_poll( self, @@ -920,11 +1085,11 @@ async def create_poll( headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = (await self.http.post( + response, _ = await self.post( Endpoint.CREATE_CARD, data=data, headers=headers, - )).json() + ) return response['card_uri'] @@ -962,11 +1127,11 @@ async def vote( headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = (await self.http.post( + response, _ = await self.post( Endpoint.VOTE, data=data, headers=headers - )).json() + ) card_data = { 'rest_id': response['card']['url'], @@ -1125,11 +1290,11 @@ async def create_tweet( 'queryId': get_query_id(Endpoint.CREATE_TWEET), 'features': features, } - response = (await self.http.post( + response, _ = await self.post( endpoint, json=data, headers=self._base_headers, - )).json() + ) _result = find_dict(response, 'result') if not _result: @@ -1196,11 +1361,11 @@ async def create_scheduled_tweet( 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_SCHEDULED_TWEET), } - response = (await self.http.post( + response, _ = await self.post( Endpoint.CREATE_SCHEDULED_TWEET, json=data, headers=self._base_headers, - )).json() + ) return response['data']['tweet']['rest_id'] async def delete_tweet(self, tweet_id: str) -> Response: @@ -1228,7 +1393,7 @@ async def delete_tweet(self, tweet_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_TWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_TWEET, json=data, headers=self._base_headers @@ -1266,11 +1431,11 @@ async def get_user_by_screen_name(self, screen_name: str) -> User: 'features': USER_FEATURES, 'fieldToggles': {'withAuxiliaryUserLabels': False} }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.USER_BY_SCREEN_NAME, params=params, headers=self._base_headers - )).json() + ) if 'user' not in response['data']: raise UserNotFound('The user does not exist.') @@ -1310,11 +1475,11 @@ async def get_user_by_id(self, user_id: str) -> User: 'variables': variables, 'features': USER_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.USER_BY_REST_ID, params=params, headers=self._base_headers - )).json() + ) if 'result' not in response['data']['user']: raise TwitterException(f'Invalid user id: {user_id}') user_data = response['data']['user']['result'] @@ -1358,11 +1523,11 @@ async def reverse_geocode( if v is None: params.pop(k) - response = (await self.http.get( + response, _ = await self.get( Endpoint.REVERSE_GEOCODE, params=params, headers=self._base_headers - )).json() + ) return _places_from_response(self, response) async def search_geo( @@ -1409,11 +1574,11 @@ async def search_geo( if v is None: params.pop(k) - response = (await self.http.get( + response, _ = await self.get( Endpoint.SEARCH_GEO, params=params, headers=self._base_headers - )).json() + ) return _places_from_response(self, response) async def get_place(self, id: str) -> Place: @@ -1427,10 +1592,10 @@ async def get_place(self, id: str) -> Place: ------- :class:`.Place` """ - response = (await self.http.get( + response, _ = await self.get( Endpoint.PLACE_BY_ID.format(id), headers=self._base_headers - )).json() + ) return Place(self, response) async def _get_tweet_detail(self, tweet_id: str, cursor: str | None): @@ -1451,11 +1616,11 @@ async def _get_tweet_detail(self, tweet_id: str, cursor: str | None): 'features': FEATURES, 'fieldToggles': {'withAuxiliaryUserLabels': False} }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.TWEET_DETAIL, params=params, headers=self._base_headers - )).json() + ) return response async def _get_more_replies( @@ -1613,11 +1778,11 @@ async def get_scheduled_tweets(self) -> list[ScheduledTweet]: params = flatten_params({ 'variables': {'ascending': True} }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.FETCH_SCHEDULED_TWEETS, params=params, headers=self._base_headers - )).json() + ) tweets = find_dict(response, 'scheduled_tweet_list')[0] return [ScheduledTweet(self, tweet) for tweet in tweets] @@ -1641,7 +1806,7 @@ async def delete_scheduled_tweet(self, tweet_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_SCHEDULED_TWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_SCHEDULED_TWEET, json=data, headers=self._base_headers @@ -1665,11 +1830,11 @@ async def _get_tweet_engagements( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) items_ = find_dict(response, 'entries') if not items_: return Result([]) @@ -1799,11 +1964,11 @@ async def get_community_note(self, note_id: str) -> CommunityNote: 'variables': {'note_id': note_id}, 'features': COMMUNITY_NOTE_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.FETCH_COMMUNITY_NOTE, params=params, headers=self._base_headers - )).json() + ) note_data = response['data']['birdwatch_note_by_rest_id'] if 'data_v1' not in note_data: raise TwitterException(f'Invalid user id: {note_id}') @@ -1895,11 +2060,11 @@ async def get_user_tweets( 'Likes': Endpoint.USER_LIKES, }[tweet_type] - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) instructions_ = find_dict(response, 'instructions') if not instructions_: @@ -2010,11 +2175,11 @@ async def get_timeline( 'queryId': get_query_id(Endpoint.HOME_TIMELINE), 'features': FEATURES, } - response = (await self.http.post( + response, _ = await self.post( Endpoint.HOME_TIMELINE, json=data, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -2091,11 +2256,11 @@ async def get_latest_timeline( 'queryId': get_query_id(Endpoint.HOME_LATEST_TIMELINE), 'features': FEATURES, } - response = (await self.http.post( + response, _ = await self.post( Endpoint.HOME_LATEST_TIMELINE, json=data, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -2143,7 +2308,7 @@ async def favorite_tweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.FAVORITE_TWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.FAVORITE_TWEET, json=data, headers=self._base_headers @@ -2177,7 +2342,7 @@ async def unfavorite_tweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.UNFAVORITE_TWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.UNFAVORITE_TWEET, json=data, headers=self._base_headers @@ -2211,7 +2376,7 @@ async def retweet(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id, 'dark_request': False}, 'queryId': get_query_id(Endpoint.CREATE_RETWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.CREATE_RETWEET, json=data, headers=self._base_headers @@ -2245,7 +2410,7 @@ async def delete_retweet(self, tweet_id: str) -> Response: 'variables': {'source_tweet_id': tweet_id,'dark_request': False}, 'queryId': get_query_id(Endpoint.DELETE_RETWEET) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_RETWEET, json=data, headers=self._base_headers @@ -2286,7 +2451,7 @@ async def bookmark_tweet( 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_BOOKMARK) } - response = await self.http.post( + _, response = await self.post( endpoint, json=data, headers=self._base_headers @@ -2320,7 +2485,7 @@ async def delete_bookmark(self, tweet_id: str) -> Response: 'variables': {'tweet_id': tweet_id}, 'queryId': get_query_id(Endpoint.DELETE_BOOKMARK) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_BOOKMARK, json=data, headers=self._base_headers @@ -2382,11 +2547,11 @@ async def get_bookmarks( 'variables': variables, 'features': features }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) items_ = find_dict(response, 'entries') if not items_: @@ -2433,7 +2598,7 @@ async def delete_all_bookmarks(self) -> Response: 'variables': {}, 'queryId': get_query_id(Endpoint.BOOKMARKS_ALL_DELETE) } - response = await self.http.post( + _, response = await self.post( Endpoint.BOOKMARKS_ALL_DELETE, json=data, headers=self._base_headers @@ -2462,11 +2627,11 @@ async def get_bookmark_folders( if cursor is not None: variables['cursor'] = cursor params = flatten_params({'variables': variables}) - response = (await self.http.get( + response, _ = await self.get( Endpoint.BOOKMARK_FOLDERS, params=params, headers=self._base_headers - )).json() + ) slice = find_dict(response, 'bookmark_collections_slice')[0] results = [] @@ -2516,11 +2681,11 @@ async def edit_bookmark_folder( 'variables': variables, 'queryId': get_query_id(Endpoint.EDIT_BOOKMARK_FOLDER) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.EDIT_BOOKMARK_FOLDER, json=data, headers=self._base_headers - )).json() + ) return BookmarkFolder( self, response['data']['bookmark_collection_update'] ) @@ -2546,7 +2711,7 @@ async def delete_bookmark_folder(self, folder_id: str) -> Response: 'variables': variables, 'queryId': get_query_id(Endpoint.DELETE_BOOKMARK_FOLDER) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_BOOKMARK_FOLDER, json=data, headers=self._base_headers @@ -2573,11 +2738,11 @@ async def create_bookmark_folder(self, name: str) -> BookmarkFolder: 'variables': variables, 'queryId': get_query_id(Endpoint.CREATE_BOOKMARK_FOLDER) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.CREATE_BOOKMARK_FOLDER, json=data, headers=self._base_headers - )).json() + ) return BookmarkFolder( self, response['data']['bookmark_collection_create'] ) @@ -2623,7 +2788,7 @@ async def follow_user(self, user_id: str) -> Response: headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = await self.http.post( + _, response = await self.post( Endpoint.CREATE_FRIENDSHIPS, data=data, headers=headers @@ -2671,7 +2836,7 @@ async def unfollow_user(self, user_id: str) -> Response: headers = self._base_headers | { 'content-type': 'application/x-www-form-urlencoded' } - response = await self.http.post( + _, response = await self.post( Endpoint.DESTROY_FRIENDSHIPS, data=data, headers=headers @@ -2699,7 +2864,7 @@ async def block_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = await self.http.post( + _, response = await self.post( Endpoint.BLOCK_USER, data=data, headers=headers @@ -2727,7 +2892,7 @@ async def unblock_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = await self.http.post( + _, response = await self.post( Endpoint.UNBLOCK_USER, data=data, headers=headers @@ -2755,7 +2920,7 @@ async def mute_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = await self.http.post( + _, response = await self.post( Endpoint.MUTE_USER, data=data, headers=headers @@ -2783,7 +2948,7 @@ async def unmute_user(self, user_id: str) -> Response: data = urlencode({'user_id': user_id}) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = await self.http.post( + _, response = await self.post( Endpoint.UNMUTE_USER, data=data, headers=headers @@ -2845,11 +3010,11 @@ async def get_trends( } if additional_request_params is not None: params |= additional_request_params - response = (await self.http.get( + response, _ = await self.get( Endpoint.TREND, params=params, headers=self._base_headers - )).json() + ) entry_id_prefix = 'trends' if category == 'trending' else 'Guide' entries = [ @@ -2883,10 +3048,10 @@ async def get_available_locations(self) -> list[Location]: ------- list[:class:`.Location`] """ - response = (await self.http.get( + response, _ = await self.get( Endpoint.AVAILABLE_LOCATIONS, headers=self._base_headers - )).json() + ) return [Location(self, data) for data in response] async def get_place_trends(self, woeid: int) -> PlaceTrends: @@ -2895,11 +3060,11 @@ async def get_place_trends(self, woeid: int) -> PlaceTrends: You can get available woeid using :attr:`.Client.get_available_locations`. """ - response = (await self.http.get( + response, _ = await self.get( Endpoint.PLACE_TRENDS, params={'id': woeid}, headers=self._base_headers - )).json() + ) trend_data = response[0] trends = [PlaceTrend(self, data) for data in trend_data['trends']] trend_data['trends'] = trends @@ -2926,11 +3091,11 @@ async def _get_user_friendship( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] results = [] @@ -2971,11 +3136,11 @@ async def _get_user_friendship_2( if cursor is not None: params['cursor'] = cursor - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) users = response['users'] results = [] @@ -3169,11 +3334,11 @@ async def _get_friendship_ids( if cursor is not None: params['cursor'] = cursor - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) previous_cursor = response['previous_cursor'] next_cursor = response['next_cursor'] @@ -3275,11 +3440,12 @@ async def _send_dm( if reply_to is not None: data['reply_to_dm_id'] = reply_to - return (await self.http.post( + response, _ = await self.post( Endpoint.SEND_DM, json=data, headers=self._base_headers - )).json() + ) + return response async def _get_dm_history( self, @@ -3295,11 +3461,12 @@ async def _get_dm_history( if max_id is not None: params['max_id'] = max_id - return (await self.http.get( + response, _ = await self.get( Endpoint.CONVERSATION.format(conversation_id), params=params, headers=self._base_headers - )).json() + ) + return response async def send_dm( self, @@ -3395,7 +3562,7 @@ async def add_reaction_to_message( 'variables': variables, 'queryId': get_query_id(Endpoint.MESSAGE_ADD_REACTION) } - response = await self.http.post( + _, response = await self.post( Endpoint.MESSAGE_ADD_REACTION, json=data, headers=self._base_headers @@ -3441,7 +3608,7 @@ async def remove_reaction_from_message( 'variables': variables, 'queryId': get_query_id(Endpoint.MESSAGE_REMOVE_REACTION) } - response = await self.http.post( + _, response = await self.post( Endpoint.MESSAGE_REMOVE_REACTION, json=data, headers=self._base_headers @@ -3473,7 +3640,7 @@ async def delete_dm(self, message_id: str) -> Response: }, 'queryId': get_query_id(Endpoint.DELETE_DM) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_DM, json=data, headers=self._base_headers @@ -3676,11 +3843,11 @@ async def get_group(self, group_id: str) -> Group: 'context': 'FETCH_DM_CONVERSATION_HISTORY', 'include_conversation_info': True, } - response = (await self.http.get( + response, _ = await self.get( Endpoint.CONVERSATION.format(group_id), params=params, headers=self._base_headers - )).json() + ) return Group(self, group_id, response) async def add_members_to_group( @@ -3713,7 +3880,7 @@ async def add_members_to_group( }, 'queryId': get_query_id(Endpoint.ADD_MEMBER_TO_GROUP) } - response = await self.http.post( + _, response = await self.post( Endpoint.ADD_MEMBER_TO_GROUP, json=data, headers=self._base_headers @@ -3740,7 +3907,7 @@ async def change_group_name(self, group_id: str, name: str) -> Response: }) headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' - response = await self.http.post( + _, response = await self.post( Endpoint.CHANGE_GROUP_NAME.format(group_id), data=data, headers=headers @@ -3787,11 +3954,11 @@ async def create_list( 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.CREATE_LIST) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.CREATE_LIST, json=data, headers=self._base_headers - )).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -3826,7 +3993,7 @@ async def edit_list_banner(self, list_id: str, media_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.EDIT_LIST_BANNER) } - response = await self.http.post( + _, response = await self.post( Endpoint.EDIT_LIST_BANNER, json=data, headers=self._base_headers @@ -3853,7 +4020,7 @@ async def delete_list_banner(self, list_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.DELETE_LIST_BANNER) } - response = await self.http.post( + _, response = await self.post( Endpoint.DELETE_LIST_BANNER, json=data, headers=self._base_headers @@ -3907,11 +4074,11 @@ async def edit_list( 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.UPDATE_LIST) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.UPDATE_LIST, json=data, headers=self._base_headers - )).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -3944,7 +4111,7 @@ async def add_list_member(self, list_id: str, user_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.LIST_ADD_MEMBER) } - response = await self.http.post( + _, response = await self.post( Endpoint.LIST_ADD_MEMBER, json=data, headers=self._base_headers @@ -3980,7 +4147,7 @@ async def remove_list_member(self, list_id: str, user_id: str) -> Response: 'features': LIST_FEATURES, 'queryId': get_query_id(Endpoint.LIST_REMOVE_MEMBER) } - response = await self.http.post( + _, response = await self.post( Endpoint.LIST_REMOVE_MEMBER, json=data, headers=self._base_headers @@ -4023,11 +4190,11 @@ async def get_lists( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.LIST_MANAGEMENT, params=params, headers=self._base_headers - )).json() + ) entries = find_dict(response, 'entries')[0] items = find_dict(entries, 'items') @@ -4067,11 +4234,11 @@ async def get_list(self, list_id: str) -> List: 'variables': {'listId': list_id}, 'features': LIST_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.LIST_BY_REST_ID, params=params, headers=self._base_headers - )).json() + ) list_info = find_dict(response, 'list')[0] return List(self, list_info) @@ -4123,11 +4290,11 @@ async def get_list_tweets( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.LIST_LATEST_TWEETS, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] next_cursor = items[-1]['content']['value'] @@ -4163,11 +4330,11 @@ async def _get_list_users( 'variables': variables, 'features': FEATURES }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] results = [] @@ -4361,11 +4528,11 @@ async def get_notifications( if cursor is not None: params['cursor'] = cursor - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) global_objects = response['globalObjects'] users = { @@ -4452,11 +4619,11 @@ async def search_community( params = flatten_params({ 'variables': variables }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.SEARCH_COMMUNITY, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'items_results')[0] communities = [] @@ -4496,11 +4663,11 @@ async def get_community(self, community_id: str) -> Community: 'c9s_superc9s_indication_enabled':False } }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.GET_COMMUNITY, params=params, headers=self._base_headers - )).json() + ) community_data = find_dict(response, 'result')[0] return Community(self, community_data) @@ -4563,11 +4730,11 @@ async def get_community_tweets( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) entries = find_dict(response, 'entries')[0] if tweet_type == 'Media': @@ -4639,11 +4806,11 @@ async def get_communities_timeline( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.COMMUNITIES_TIMELINE, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] tweets = [] for item in items: @@ -4694,11 +4861,11 @@ async def join_community(self, community_id: str) -> Community: 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.JOIN_COMMUNITY) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.JOIN_COMMUNITY, json=data, headers=self._base_headers - )).json() + ) community_data = response['data']['community_join'] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4724,11 +4891,11 @@ async def leave_community(self, community_id: str) -> Community: 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.LEAVE_COMMUNITY) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.LEAVE_COMMUNITY, json=data, headers=self._base_headers - )).json() + ) community_data = response['data']['community_leave'] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4759,11 +4926,11 @@ async def request_to_join_community( 'features': JOIN_COMMUNITY_FEATURES, 'queryId': get_query_id(Endpoint.REQUEST_TO_JOIN_COMMUNITY) } - response = (await self.http.post( + response, _ = await self.post( Endpoint.REQUEST_TO_JOIN_COMMUNITY, json=data, headers=self._base_headers - )).json() + ) community_data = find_dict(response, 'result')[0] community_data['rest_id'] = community_data['id_str'] return Community(self, community_data) @@ -4786,11 +4953,11 @@ async def _get_community_users( 'responsive_web_graphql_timeline_navigation_enabled': True } }) - response = (await self.http.get( + response, _ = await self.get( endpoint, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'items_results')[0] users = [] @@ -4905,11 +5072,11 @@ async def search_community_tweet( 'variables': variables, 'features': COMMUNITY_TWEETS_FEATURES }) - response = (await self.http.get( + response, _ = await self.get( Endpoint.SEARCH_COMMUNITY_TWEET, params=params, headers=self._base_headers - )).json() + ) items = find_dict(response, 'entries')[0] tweets = [] @@ -4941,10 +5108,10 @@ async def _stream( headers.pop('content-type') params = {'topics': ','.join(topics)} - async with self.http.stream( + async with self.stream( 'GET', Endpoint.EVENTS, params=params, timeout=None ) as response: - self.http._remove_duplicate_ct0_cookie() + self._remove_duplicate_ct0_cookie() async for line in response.aiter_lines(): try: data = json.loads(line) @@ -5048,9 +5215,9 @@ async def _update_subscriptions( headers = self._base_headers headers['content-type'] = 'application/x-www-form-urlencoded' headers['LivePipeline-Session'] = session.id - response = (await self.http.post( + response, _ = await self.post( Endpoint.UPDATE_SUBSCRIPTIONS, data=data, headers=headers - )).json() + ) session.topics |= subscribe session.topics -= unsubscribe diff --git a/twikit/twikit_async/http.py b/twikit/twikit_async/http.py deleted file mode 100644 index 501bebc2..00000000 --- a/twikit/twikit_async/http.py +++ /dev/null @@ -1,66 +0,0 @@ -import httpx - -from ..errors import ( - TwitterException, - BadRequest, - Unauthorized, - Forbidden, - NotFound, - RequestTimeout, - TooManyRequests, - ServerError -) - - -class HTTPClient: - def __init__(self, **kwargs) -> None: - self.client = httpx.AsyncClient(**kwargs) - - async def request( - self, - method: str, - url: str, - **kwargs - ) -> httpx.Response: - response = await self.client.request(method, url, **kwargs) - status_code = response.status_code - self._remove_duplicate_ct0_cookie() - - if status_code >= 400: - message = f'status: {status_code}, message: "{response.text}"' - if status_code == 400: - raise BadRequest(message, headers=response.headers) - elif status_code == 401: - raise Unauthorized(message, headers=response.headers) - elif status_code == 403: - raise Forbidden(message, headers=response.headers) - elif status_code == 404: - raise NotFound(message, headers=response.headers) - elif status_code == 408: - raise RequestTimeout(message, headers=response.headers) - elif status_code == 429: - raise TooManyRequests(message, headers=response.headers) - elif 500 <= status_code < 600: - raise ServerError(message, headers=response.headers) - else: - raise TwitterException(message, headers=response.headers) - - return response - - async def get(self, url, **kwargs) -> httpx.Response: - return await self.request('GET', url, **kwargs) - - async def post(self, url, **kwargs) -> httpx.Response: - return await self.request('POST', url, **kwargs) - - def stream(self, *args, **kwargs): - response = self.client.stream(*args, **kwargs) - return response - - def _remove_duplicate_ct0_cookie(self) -> None: - cookies = {} - for cookie in self.client.cookies.jar: - if 'ct0' in cookies and cookie.name == 'ct0': - continue - cookies[cookie.name] = cookie.value - self.client.cookies = list(cookies.items()) diff --git a/twikit/twikit_async/tweet.py b/twikit/twikit_async/tweet.py index 92448451..36aea5ce 100644 --- a/twikit/twikit_async/tweet.py +++ b/twikit/twikit_async/tweet.py @@ -34,10 +34,10 @@ class Tweet: The tweet ID this tweet is in reply to, if any is_quote_status : :class:`bool` Indicates if the tweet is a quote status. - quote : :class:`Tweet` + quote : :class:`Tweet` | None The Tweet being quoted (if any) - retweeted_tweet : :class:`bool` - Whether the tweet is a retweet + retweeted_tweet : :class:`Tweet` | None + The Tweet being retweeted (if any) possibly_sensitive : :class:`bool` Indicates if the tweet content may be sensitive. possibly_sensitive_editable : :class:`bool` @@ -692,4 +692,4 @@ def __eq__(self, __value: object) -> bool: return isinstance(__value, CommunityNote) and self.id == __value.id def __ne__(self, __value: object) -> bool: - return not self == __value \ No newline at end of file + return not self == __value diff --git a/twikit/twikit_async/user.py b/twikit/twikit_async/user.py index feb1ef65..32b63a5c 100644 --- a/twikit/twikit_async/user.py +++ b/twikit/twikit_async/user.py @@ -379,6 +379,28 @@ async def get_subscriptions(self, count: int = 20) -> Result[User]: """ return await self._client.get_user_subscriptions(self.id, count) + async def get_latest_followers( + self, count: int | None = None, cursor: str | None = None + ) -> Result[User]: + """ + Retrieves the latest followers. + Max count : 200 + """ + return await self._client.get_latest_followers( + self.id, count=count, cursor=cursor + ) + + async def get_latest_friends( + self, count: int | None = None, cursor: str | None = None + ) -> Result[User]: + """ + Retrieves the latest friends (following users). + Max count : 200 + """ + return await self._client.get_latest_friends( + self.id, count=count, cursor=cursor + ) + async def send_dm( self, text: str, media_id: str = None, reply_to = None ) -> Message: diff --git a/twikit/twikit_async/utils.py b/twikit/twikit_async/utils.py index 26210607..6a5dd701 100644 --- a/twikit/twikit_async/utils.py +++ b/twikit/twikit_async/utils.py @@ -98,12 +98,12 @@ async def execute_task(self, *subtask_inputs, **kwargs) -> None: if subtask_inputs is not None: data['subtask_inputs'] = list(subtask_inputs) - response = (await self._client.http.post( + response, _ = await self._client.post( self.endpoint, data=json.dumps(data), headers=self.headers, **kwargs - )).json() + ) self.response = response @property diff --git a/twikit/user.py b/twikit/user.py index 94a977ef..016e78b4 100644 --- a/twikit/user.py +++ b/twikit/user.py @@ -379,6 +379,28 @@ def get_subscriptions(self, count: int = 20) -> Result[User]: """ return self._client.get_user_subscriptions(self.id, count) + def get_latest_followers( + self, count: int | None = None, cursor: str | None = None + ) -> Result[User]: + """ + Retrieves the latest followers. + Max count : 200 + """ + return self._client.get_latest_followers( + self.id, count=count, cursor=cursor + ) + + def get_latest_friends( + self, count: int | None = None, cursor: str | None = None + ) -> Result[User]: + """ + Retrieves the latest friends (following users). + Max count : 200 + """ + return self._client.get_latest_friends( + self.id, count=count, cursor=cursor + ) + def send_dm( self, text: str, media_id: str = None, reply_to = None ) -> Message: diff --git a/twikit/utils.py b/twikit/utils.py index 09f7a461..4e2d312d 100644 --- a/twikit/utils.py +++ b/twikit/utils.py @@ -393,12 +393,12 @@ def execute_task(self, *subtask_inputs, **kwargs) -> None: if subtask_inputs is not None: data['subtask_inputs'] = list(subtask_inputs) - response = self._client.http.post( + response, _ = self._client.post( self.endpoint, data=json.dumps(data), headers=self.headers, **kwargs - ).json() + ) self.response = response @property