diff --git a/userge/config.py b/userge/config.py index 7fca00328..1d4b2a8fe 100644 --- a/userge/config.py +++ b/userge/config.py @@ -34,6 +34,7 @@ class Config: HU_STRING_SESSION = os.environ.get("HU_STRING_SESSION", None) OWNER_ID = tuple(filter(lambda x: x, map(int, os.environ.get("OWNER_ID", "0").split()))) LOG_CHANNEL_ID = int(os.environ.get("LOG_CHANNEL_ID")) + AUTH_CHATS = (OWNER_ID[0], LOG_CHANNEL_ID) if OWNER_ID else (LOG_CHANNEL_ID,) DB_URI = os.environ.get("DATABASE_URL") LANG = os.environ.get("PREFERRED_LANGUAGE") DOWN_PATH = os.environ.get("DOWN_PATH") diff --git a/userge/core/methods/messages/edit_message_text.py b/userge/core/methods/messages/edit_message_text.py index 73908da0b..9febca49f 100644 --- a/userge/core/methods/messages/edit_message_text.py +++ b/userge/core/methods/messages/edit_message_text.py @@ -83,7 +83,7 @@ async def edit_message_text(self, # pylint: disable=arguments-differ Raises: RPCError: In case of a Telegram RPC error. """ - if text and chat_id != Config.LOG_CHANNEL_ID: + if text and chat_id not in Config.AUTH_CHATS: text = secure_text(str(text)) msg = await super().edit_message_text(chat_id=chat_id, message_id=message_id, diff --git a/userge/core/methods/messages/send_as_file.py b/userge/core/methods/messages/send_as_file.py index 48c9f005d..1d6a0d61a 100644 --- a/userge/core/methods/messages/send_as_file.py +++ b/userge/core/methods/messages/send_as_file.py @@ -70,7 +70,7 @@ async def send_as_file(self, Returns: On success, the sent Message is returned. """ - if text and chat_id != Config.LOG_CHANNEL_ID: + if text and chat_id not in Config.AUTH_CHATS: text = secure_text(str(text)) async with aiofiles.open(filename, "w+", encoding="utf8") as out_file: await out_file.write(text) diff --git a/userge/core/methods/messages/send_message.py b/userge/core/methods/messages/send_message.py index 8299b6401..2891c12ca 100644 --- a/userge/core/methods/messages/send_message.py +++ b/userge/core/methods/messages/send_message.py @@ -92,7 +92,7 @@ async def send_message(self, # pylint: disable=arguments-differ Returns: :obj:`Message`: On success, the sent text message or True is returned. """ - if text and chat_id != Config.LOG_CHANNEL_ID: + if text and chat_id not in Config.AUTH_CHATS: text = secure_text(str(text)) msg = await super().send_message(chat_id=chat_id, text=text, diff --git a/userge/plugins/bot/bot_forwards.py b/userge/plugins/bot/bot_forwards.py index c2222bb9c..05ecaf433 100644 --- a/userge/plugins/bot/bot_forwards.py +++ b/userge/plugins/bot/bot_forwards.py @@ -215,6 +215,7 @@ async def broadcast_(_, message: Message): if len(blocked_users) != 0: b_info += f"\n\nšŸ˜• {len(blocked_users)} users blocked your bot recently" await br_cast.edit(b_info) + await CHANNEL.log(b_info) if blocked_users: for buser in blocked_users: await BOT_START.find_one_and_delete({"user_id": buser}) diff --git a/userge/plugins/bot/buttons.py b/userge/plugins/bot/buttons.py index 8a22edc5f..80d71b9fd 100644 --- a/userge/plugins/bot/buttons.py +++ b/userge/plugins/bot/buttons.py @@ -1,22 +1,45 @@ """ Create Buttons Through Bots """ # IMPROVED BY code-rgb -# By @krishna_singhal +import json import os import re -from html_telegraph_poster.upload_images import upload_image from pyrogram.errors import BadRequest, MessageEmpty, UserIsBot from pyrogram.types import ReplyKeyboardRemove -from userge import Config, Message, get_collection, userge +from userge import Config, Message, userge from userge.utils import get_file_id_and_ref from userge.utils import parse_buttons as pb -BUTTON_BASE = get_collection("TEMP_BUTTON") BTN = r"\[([^\[]+?)\](\[buttonurl:(?:/{0,2})(.+?)(:same)?\])|\[([^\[]+?)\](\(buttonurl:(?:/{0,2})(.+?)(:same)?\))" BTNX = re.compile(BTN) +PATH = "./userge/xcache/inline_db.json" +CHANNEL = userge.getCLogger(__name__) + + +class Inline_DB: + def __init__(self): + if not os.path.exists(PATH): + d = {} + json.dump(d, open(PATH, "w")) + self.db = json.load(open(PATH)) + + def save_msg(self, rnd_id: int, msg_content: str, media_valid: bool, media_id: int): + self.db[rnd_id] = { + "msg_content": msg_content, + "media_valid": media_valid, + "media_id": media_id, + } + self.save() + + def save(self): + with open(PATH, "w") as outfile: + json.dump(self.db, outfile, indent=4) + + +InlineDB = Inline_DB() @userge.on_cmd( @@ -74,34 +97,41 @@ async def create_button(msg: Message): }, ) async def inline_buttons(message: Message): - """ Create Buttons Through Inline Bots """ - if Config.BOT_TOKEN is None: - await message.err( - "First Create a Inline Bot via @Botfather to Create Buttons..." - ) - return - replied = message.reply_to_message - if not (replied and (replied.text or replied.caption)): - await message.err("Reply a text Msg") - return - await message.edit("Creating an inline button...") - if replied.caption: - text = replied.caption - text = check_brackets(text) - dls_loc = await down_image(message) - photo_url = str(upload_image(dls_loc)) - BUTTON_BASE.insert_one({"msg_data": text, "photo_url": photo_url}) - os.remove(dls_loc) - else: - text = replied.text - text = check_brackets(text) - BUTTON_BASE.insert_one({"msg_data": text}) + await message.edit("Creating an Inline Button...") + reply = message.reply_to_message + msg_content = None + media_valid = False + media_id = 0 + if reply: + media_valid = bool(get_file_id_and_ref(reply)[0]) + + if message.input_str: + msg_content = message.input_str + if media_valid: + media_id = (await reply.forward(Config.LOG_CHANNEL_ID)).message_id + + elif reply: + if media_valid: + media_id = (await reply.forward(Config.LOG_CHANNEL_ID)).message_id + msg_content = reply.caption.html if reply.caption else None + elif reply.text: + msg_content = reply.text.html + + if not msg_content: + return await message.err("Content not found", del_in=5) + + rnd_id = userge.rnd_id() + msg_content = check_brackets(msg_content) + InlineDB.save_msg(rnd_id, msg_content, media_valid, media_id) + bot = await userge.bot.get_me() - x = await userge.get_inline_bot_results(bot.username, "buttonnn") + + x = await userge.get_inline_bot_results(bot.username, f"btn_{rnd_id}") await userge.send_inline_bot_result( - chat_id=message.chat.id, query_id=x.query_id, result_id=x.results[0].id + chat_id=message.chat.id, + query_id=x.query_id, + result_id=x.results[0].id, ) - await BUTTON_BASE.drop() await message.delete() @@ -120,16 +150,6 @@ def check_brackets(text): return text -async def down_image(message): - message.reply_to_message - if not os.path.isdir(Config.DOWN_PATH): - os.makedirs(Config.DOWN_PATH) - dls = await userge.download_media( - message=message.reply_to_message, file_name=Config.DOWN_PATH - ) - return os.path.join(Config.DOWN_PATH, os.path.basename(dls)) - - @userge.on_cmd( "noformat", about={ diff --git a/userge/plugins/bot/inline_help.py b/userge/plugins/bot/inline_help.py index cdb7704d1..6219433c5 100644 --- a/userge/plugins/bot/inline_help.py +++ b/userge/plugins/bot/inline_help.py @@ -56,6 +56,10 @@ "help_txt": "**Send Saved Spoiler Via Inline**\n For more info see `.help spoiler`\n\n>>> `spoiler [ID]`", "i_q": "spoiler", }, + "btn": { + "help_txt": "**Get your 15 recently creeated inline message in the inline query, so you can post it in any channel or group effortlessly**\n For Creating inline messages see `.help .ibutton`\n\n>>> `btn`", + "i_q": "btn", + }, } diff --git a/userge/plugins/help.py b/userge/plugins/help.py index 8a86d5a68..41b921530 100644 --- a/userge/plugins/help.py +++ b/userge/plugins/help.py @@ -34,6 +34,8 @@ from .fun.stylish import font_gen from .misc.redditdl import reddit_thumb_link +INLINE_DB = {} +CHANNEL = userge.getCLogger(__name__) MEDIA_TYPE, MEDIA_URL = None, None PATH = "userge/xcache" _CATEGORY = { @@ -49,7 +51,6 @@ } # Database SAVED_SETTINGS = get_collection("CONFIGS") -BUTTON_BASE = get_collection("TEMP_BUTTON") # TODO use json cache REPO_X = InlineQueryResultArticle( title="Repo", input_message_content=InputTextMessageContent("**Here's how to setup USERGE-X** "), @@ -939,28 +940,78 @@ async def inline_answer(_, inline_query: InlineQuery): ) ) - if string == "buttonnn": - async for data in BUTTON_BASE.find(): - button_data = data["msg_data"] - text, buttons = pb(button_data) - try: - photo_url = data["photo_url"] - except KeyError: - photo_url = None - if photo_url: - results.append( - InlineQueryResultPhoto( - photo_url=photo_url, caption=text, reply_markup=buttons - ) - ) - else: - results.append( - InlineQueryResultArticle( - title=text, - input_message_content=InputTextMessageContent(text), - reply_markup=buttons, - ) - ) + if "btn_" in str_y[0] or str_y[0] == "btn": + + inline_db_path = "./userge/xcache/inline_db.json" + if os.path.exists(inline_db_path): + with open(inline_db_path, "r") as data_file: + view_db = json.load(data_file) + + data_count_n = 1 + reverse_list = list(view_db) + reverse_list.reverse() + for butt_ons in reverse_list: + if data_count_n > 15: + view_db.pop(butt_ons, None) + data_count_n += 1 + + with open(inline_db_path, "w") as data_file: + json.dump(view_db, data_file) + + if str_y[0] == "btn": + inline_storage = list(view_db) + else: + rnd_id = (str_y[0].split("_", 1))[1] + inline_storage = [rnd_id] + + if len(inline_storage) == 0: + return + + for inline_content in inline_storage: + inline_db = view_db.get(inline_content, None) + if inline_db: + if ( + inline_db["media_valid"] + and int(inline_db["media_id"]) != 0 + ): + saved_msg = await userge.bot.get_messages( + Config.LOG_CHANNEL_ID, int(inline_db["media_id"]) + ) + media_data = get_file_id_and_ref(saved_msg) + + textx, buttonsx = pb(inline_db["msg_content"]) + + if inline_db["media_valid"]: + if saved_msg.photo: + results.append( + InlineQueryResultCachedPhoto( + file_id=media_data[0], + file_ref=media_data[1], + caption=textx, + reply_markup=buttonsx, + ) + ) + else: + results.append( + InlineQueryResultCachedDocument( + title=textx, + file_id=media_data[0], + file_ref=media_data[1], + caption=textx, + description="Inline Button", + reply_markup=buttonsx, + ) + ) + else: + results.append( + InlineQueryResultArticle( + title=textx, + input_message_content=InputTextMessageContent( + textx + ), + reply_markup=buttonsx, + ) + ) if str_y[0].lower() == "stylish": if len(str_y) == 2: diff --git a/userge/plugins/misc/gdrive.py b/userge/plugins/misc/gdrive.py index 07d6ee54b..ae4b3213a 100644 --- a/userge/plugins/misc/gdrive.py +++ b/userge/plugins/misc/gdrive.py @@ -396,7 +396,7 @@ def _download_file(self, path: str, name: str, **kwargs) -> None: d_file_obj = MediaIoBaseDownload(d_f, request, chunksize=50 * 1024 * 1024) c_time = time.time() done = False - while done is False: + while not done: status, done = d_file_obj.next_chunk(num_retries=5) if self._is_canceled: raise ProcessCanceled @@ -756,12 +756,11 @@ def _get_file_id(self, filter_str: bool = False) -> tuple: link = self._message.filtered_input_str found = _GDRIVE_ID.search(link) if found and "folder" in link: - out = (found.group(1), "folder") + return found.group(1), "folder" elif found: - out = (found.group(1), "file") + return found.group(1), "file" else: - out = (link, "unknown") - return out + return link, "unknown" async def setup(self) -> None: """ Setup GDrive """ @@ -933,7 +932,7 @@ async def upload(self) -> None: except Exception as e_e: await self._message.err(e_e) return - file_path = dl_loc if dl_loc else self._message.input_str + file_path = dl_loc or self._message.input_str if not os.path.exists(file_path): await self._message.err("invalid file path provided?") return @@ -964,7 +963,7 @@ async def upload(self) -> None: out = f"**ERROR** : `{self._output._get_reason()}`" # pylint: disable=protected-access elif self._output is not None and not self._is_canceled: out = f"**Uploaded Successfully** __in {m_s} seconds__\n\n{self._output}" - elif self._output is not None and self._is_canceled: + elif self._output is not None: out = self._output else: out = "`failed to upload.. check logs?`" @@ -994,7 +993,7 @@ async def download(self) -> None: out = ( f"**Downloaded Successfully** __in {m_s} seconds__\n\n`{self._output}`" ) - elif self._output is not None and self._is_canceled: + elif self._output is not None: out = self._output else: out = "`failed to download.. check logs?`" @@ -1025,7 +1024,7 @@ async def copy(self) -> None: out = f"**ERROR** : `{self._output._get_reason()}`" # pylint: disable=protected-access elif self._output is not None and not self._is_canceled: out = f"**Copied Successfully** __in {m_s} seconds__\n\n{self._output}" - elif self._output is not None and self._is_canceled: + elif self._output is not None: out = self._output else: out = "`failed to copy.. check logs?`" @@ -1159,13 +1158,7 @@ async def del_perms(self) -> None: ) -@userge.on_cmd( - "gsetup", - about={"header": "Setup GDrive Creds"}, - allow_groups=False, - allow_private=False, - allow_bots=False, -) +@userge.on_cmd("gsetup", about={"header": "Setup GDrive Creds"}) async def gsetup_(message: Message): """ setup creds """ await Worker(message).setup() diff --git a/userge/utils/spotify/spotify_db_class.py b/userge/utils/spotify/spotify_db_class.py index d961c4db7..00655f63c 100644 --- a/userge/utils/spotify/spotify_db_class.py +++ b/userge/utils/spotify/spotify_db_class.py @@ -2,42 +2,42 @@ class Database: - def __init__(self): - try: - self.db = json.load(open("./userge/xcache/spotify_database.json")) - except FileNotFoundError: - print("You need to run generate.py first, please read the Readme.") - + def __init__(self): + try: + self.db = json.load(open("./userge/xcache/spotify_database.json")) + except FileNotFoundError: + print("You need to run generate.py first, please read the Readme.") + - def save_token(self, token): - self.db["access_token"] = token - self.save() + def save_token(self, token): + self.db["access_token"] = token + self.save() - def save_refresh(self, token): - self.db["refresh_token"] = token - self.save() + def save_refresh(self, token): + self.db["refresh_token"] = token + self.save() - def save_bio(self, bio): - self.db["bio"] = bio - self.save() + def save_bio(self, bio): + self.db["bio"] = bio + self.save() - def save_spam(self, which, what): - self.db[which + "_spam"] = what + def save_spam(self, which, what): + self.db[which + "_spam"] = what - def return_token(self): - return self.db["access_token"] + def return_token(self): + return self.db["access_token"] - def return_refresh(self): - return self.db["refresh_token"] + def return_refresh(self): + return self.db["refresh_token"] - def return_bio(self): - return self.db["bio"] + def return_bio(self): + return self.db["bio"] - def return_spam(self, which): - return self.db[which + "_spam"] + def return_spam(self, which): + return self.db[which + "_spam"] - def save(self): - with open('./userge/xcache/spotify_database.json', 'w') as outfile: - json.dump(self.db, outfile, indent=4, sort_keys=True) + def save(self): + with open('./userge/xcache/spotify_database.json', 'w') as outfile: + json.dump(self.db, outfile, indent=4, sort_keys=True) \ No newline at end of file diff --git a/userge/utils/tools.py b/userge/utils/tools.py index 9e839e026..5bee87a20 100644 --- a/userge/utils/tools.py +++ b/userge/utils/tools.py @@ -94,7 +94,7 @@ async def take_screen_shot(video_file: str, duration: int, path: str = '') -> Op _LOG.info('[[[Extracting a frame from %s ||| Video duration => %s]]]', video_file, duration) ttl = duration // 2 thumb_image_path = path or os.path.join(userge.Config.DOWN_PATH, f"{basename(video_file)}.jpg") - command = f"ffmpeg -ss {ttl} -i '{video_file}' -vframes 1 '{thumb_image_path}'" + command = f'''ffmpeg -ss {ttl} -i "{video_file}" -vframes 1 "{thumb_image_path}"''' err = (await runcmd(command))[1] if err: _LOG.error(err)