diff --git a/tgcf/cli.py b/tgcf/cli.py index ccb89794..80247e7f 100644 --- a/tgcf/cli.py +++ b/tgcf/cli.py @@ -82,6 +82,9 @@ def main( mode: Mode = typer.Argument( ..., help="Choose the mode in which you want to run tgcf.", envvar="TGCF_MODE" ), + agent_id: Optional[int] = typer.Argument( + default=0, help="Choose the agent to use for message forwarding." + ), verbose: Optional[bool] = typer.Option( # pylint: disable=unused-argument None, "--loud", @@ -113,11 +116,11 @@ def main( if mode == Mode.PAST: from tgcf.past import forward_job # pylint: disable=import-outside-toplevel - asyncio.run(forward_job()) + asyncio.run(forward_job(agent_id)) else: from tgcf.live import start_sync # pylint: disable=import-outside-toplevel - asyncio.run(start_sync()) + asyncio.run(start_sync(agent_id)) -# AAHNIK 2021 +# AAHNIK 2023 diff --git a/tgcf/config.py b/tgcf/config.py index 5779a65a..007f8ad1 100644 --- a/tgcf/config.py +++ b/tgcf/config.py @@ -3,7 +3,7 @@ import logging import os import sys -from typing import Dict, List, Optional, Union, Any +from typing import Any, Dict, List, Optional, Union from dotenv import load_dotenv from pydantic import BaseModel, validator # pylint: disable=no-name-in-module @@ -32,6 +32,9 @@ class Forward(BaseModel): offset: int = 0 end: Optional[int] = 0 + agent: int = 0 # the agent id to use for this connection + plugin_cfg: int = 0 # the plugin configuration id to use for this connection + class LiveSettings(BaseModel): """Settings to configure how tgcf operates in live mode.""" @@ -60,15 +63,44 @@ def validate_delay(cls, val): # pylint: disable=no-self-use,no-self-argument return val -class LoginConfig(BaseModel): +# class LoginConfig(BaseModel): + +# API_ID: int = 0 +# API_HASH: str = "" + +# user_type: int = 0 # 0:bot, 1:user +# phone_no: int = 91 +# USERNAME: str = "" +# SESSION_STRING: str = "" +# BOT_TOKEN: str = "" + + +class TgAPIConfig(BaseModel): API_ID: int = 0 API_HASH: str = "" + + +class AgentLoginConfig(BaseModel): + alias: str = "" user_type: int = 0 # 0:bot, 1:user phone_no: int = 91 USERNAME: str = "" SESSION_STRING: str = "" BOT_TOKEN: str = "" + is_active: bool = False + + +class AgentForwardingConfig(BaseModel): + show_forwarded_from: bool = False + mode: int = 0 # 0: live, 1:past + live: LiveSettings = LiveSettings() + past: PastSettings = PastSettings() + + +class LoginConfig(BaseModel): + tg: TgAPIConfig = TgAPIConfig() + agents: List[AgentLoginConfig] = [AgentLoginConfig()] class BotMessages(BaseModel): @@ -82,15 +114,11 @@ class Config(BaseModel): # pylint: disable=too-few-public- pid: int = 0 theme: str = "light" - login: LoginConfig = LoginConfig() + login_cfg: LoginConfig = LoginConfig() admins: List[Union[int, str]] = [] forwards: List[Forward] = [] - show_forwarded_from: bool = False - mode: int = 0 # 0: live, 1:past - live: LiveSettings = LiveSettings() - past: PastSettings = PastSettings() - - plugins: PluginConfig = PluginConfig() + agent_fwd_cfg: List[AgentForwardingConfig] = [AgentForwardingConfig()] + plugin_cfgs: List[PluginConfig] = [PluginConfig()] bot_messages = BotMessages() @@ -165,10 +193,24 @@ async def get_id(client: TelegramClient, peer): return await client.get_peer_id(peer) +async def load_active_forwards(agent_id: int, forwards: List[Forward]) -> List[Forward]: + active_forwards: List[Forward] = [] + for forward in forwards: + if forward.agent != agent_id: + continue + if not forward.use_this: + continue + active_forwards.append(forward) + return active_forwards + + async def load_from_to( - client: TelegramClient, forwards: List[Forward] -) -> Dict[int, List[int]]: + agent_id: int, + client: TelegramClient, + forwards: List[Forward], +) -> Dict[int, Dict[str, List[int] | int]]: """Convert a list of Forward objects to a mapping. + The active connections of current agent are included. Args: client: Instance of Telegram client (logged in) @@ -176,7 +218,9 @@ async def load_from_to( Returns: Dict: key = chat id of source - value = List of chat ids of destinations + value = dict + dest: List of chat ids of destinations + pcgf: id of plugin config to use Notes: -> The Forward objects may contain username/phn no/links @@ -184,19 +228,27 @@ async def load_from_to( -> Chat ids are essential for how storage is implemented -> Storage is essential for edit, delete and reply syncs """ - from_to_dict = {} + from_to_dict: dict = {} async def _(peer): return await get_id(client, peer) for forward in forwards: + if forward.agent != agent_id: + continue if not forward.use_this: continue source = forward.source if not isinstance(source, int) and source.strip() == "": continue src = await _(forward.source) - from_to_dict[src] = [await _(dest) for dest in forward.dest] + # from_to_dict[src] = { + # "dest": [], # the list of destination entities + # "pcfg": 0 # id of the plugin config to use + # } + from_to_dict[src] = {} + from_to_dict[src]["dest"] = [await _(dest) for dest in forward.dest] + from_to_dict[src]["pcfg"] = forward.plugin_cfg logging.info(f"From to dict is {from_to_dict}") return from_to_dict @@ -229,7 +281,7 @@ def read_db(): PASSWORD = os.getenv("PASSWORD", "tgcf") -ADMINS = [] +ADMINS: List = [] MONGO_CON_STR = os.getenv("MONGO_CON_STR") MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "tgcf-config") @@ -242,19 +294,23 @@ def read_db(): logging.warn( "You have not set a password to protect the web access to tgcf.\nThe default password `tgcf` is used." ) -from_to = {} +from_to: Dict[int, Dict[str, int | List[int]]] = {} is_bot: Optional[bool] = None logging.info("config.py got executed") -def get_SESSION(section: Any = CONFIG.login, default: str = 'tgcf_bot'): - if section.SESSION_STRING and section.user_type == 1: +def get_SESSION( + agent_id: int, login_cfg: LoginConfig = CONFIG.login_cfg, default: str = "tgcf_bot" +): + # TODO: validate agent_id + agent = login_cfg.agents[agent_id] + if agent.SESSION_STRING and agent.user_type == 1: logging.info("using session string") - SESSION = StringSession(section.SESSION_STRING) - elif section.BOT_TOKEN and section.user_type == 0: + SESSION = StringSession(agent.SESSION_STRING) + elif agent.BOT_TOKEN and agent.user_type == 0: logging.info("using bot account") - SESSION = default + SESSION = default + str(agent_id) else: logging.warning("Login information not set!") sys.exit() - return SESSION \ No newline at end of file + return SESSION diff --git a/tgcf/live.py b/tgcf/live.py index e8f9428a..c4216957 100644 --- a/tgcf/live.py +++ b/tgcf/live.py @@ -16,6 +16,8 @@ from tgcf.plugins import apply_plugins, load_async_plugins from tgcf.utils import clean_session_files, send_message +current_agent: int = 0 + async def new_message_handler(event: Union[Message, events.NewMessage]) -> None: """Process new incoming messages.""" @@ -36,9 +38,10 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None: del st.stored[key] break - dest = config.from_to.get(chat_id) + dest = config.from_to.get(chat_id).get("dest") + pcfg_id = config.from_to.get(chat_id).get("pcfg") - tm = await apply_plugins(message) + tm = await apply_plugins(pcfg_id, message) if not tm: return @@ -50,7 +53,7 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None: for d in dest: if event.is_reply and r_event_uid in st.stored: tm.reply_to = st.stored.get(r_event_uid).get(d) - fwded_msg = await send_message(d, tm) + fwded_msg = await send_message(current_agent, d, tm) st.stored[event_uid].update({d: fwded_msg}) tm.clear() @@ -67,8 +70,9 @@ async def edited_message_handler(event) -> None: logging.info(f"Message edited in {chat_id}") event_uid = st.EventUid(event) + pcfg_id = config.from_to.get(chat_id).get("pcfg") - tm = await apply_plugins(message) + tm = await apply_plugins(pcfg_id, message) if not tm: return @@ -77,17 +81,20 @@ async def edited_message_handler(event) -> None: if fwded_msgs: for _, msg in fwded_msgs.items(): - if config.CONFIG.live.delete_on_edit == message.text: + if ( + config.CONFIG.agent_fwd_cfg[current_agent].live.delete_on_edit + == message.text + ): await msg.delete() await message.delete() else: await msg.edit(tm.text) return - dest = config.from_to.get(chat_id) + dest = config.from_to.get(chat_id).get("dest") for d in dest: - await send_message(d, tm) + await send_message(current_agent, d, tm) tm.clear() @@ -114,26 +121,29 @@ async def deleted_message_handler(event): } -async def start_sync() -> None: +async def start_sync(agent_id: int) -> None: """Start tgcf live sync.""" # clear past session files clean_session_files() + global current_agent + current_agent = agent_id # load async plugins defined in plugin_models await load_async_plugins() - SESSION = get_SESSION() + SESSION = get_SESSION(agent_id) client = TelegramClient( SESSION, - CONFIG.login.API_ID, - CONFIG.login.API_HASH, - sequential_updates=CONFIG.live.sequential_updates, + CONFIG.login_cfg.tg.API_ID, + CONFIG.login_cfg.tg.API_HASH, + sequential_updates=CONFIG.agent_fwd_cfg[agent_id].live.sequential_updates, ) - if CONFIG.login.user_type == 0: - if CONFIG.login.BOT_TOKEN == "": + agent = CONFIG.login_cfg.agents[agent_id] + if agent.user_type == 0: + if agent.BOT_TOKEN == "": logging.warning("Bot token not found, but login type is set to bot.") sys.exit() - await client.start(bot_token=CONFIG.login.BOT_TOKEN) + await client.start(bot_token=agent.BOT_TOKEN) else: await client.start() config.is_bot = await client.is_bot() @@ -145,7 +155,10 @@ async def start_sync() -> None: ALL_EVENTS.update(command_events) for key, val in ALL_EVENTS.items(): - if config.CONFIG.live.delete_sync is False and key == "deleted": + if ( + config.CONFIG.agent_fwd_cfg[agent_id].live.delete_sync is False + and key == "deleted" + ): continue client.add_event_handler(*val) logging.info(f"Added event handler for {key}") @@ -161,5 +174,5 @@ async def start_sync() -> None: ], ) ) - config.from_to = await config.load_from_to(client, config.CONFIG.forwards) + config.from_to = await config.load_from_to(agent_id, client, config.CONFIG.forwards) await client.run_until_disconnected() diff --git a/tgcf/past.py b/tgcf/past.py index 5fc3a7fd..755da9c0 100644 --- a/tgcf/past.py +++ b/tgcf/past.py @@ -20,26 +20,31 @@ from tgcf.utils import clean_session_files, send_message -async def forward_job() -> None: +async def forward_job(agent_id: int) -> None: """Forward all existing messages in the concerned chats.""" clean_session_files() # load async plugins defined in plugin_models - await load_async_plugins() - - if CONFIG.login.user_type != 1: + await load_async_plugins() + agent = CONFIG.login_cfg.agents[agent_id] + if agent.user_type != 1: logging.warning( "You cannot use bot account for tgcf past mode. Telegram does not allow bots to access chat history." ) return - SESSION = get_SESSION() + SESSION = get_SESSION(agent_id) async with TelegramClient( - SESSION, CONFIG.login.API_ID, CONFIG.login.API_HASH + SESSION, CONFIG.login_cfg.tg.API_ID, CONFIG.login_cfg.tg.API_HASH ) as client: - config.from_to = await config.load_from_to(client, config.CONFIG.forwards) + active_forwards = await config.load_active_forwards( + agent_id, config.CONFIG.forwards + ) + config.from_to = await config.load_from_to(agent_id, client, active_forwards) client: TelegramClient - for from_to, forward in zip(config.from_to.items(), config.CONFIG.forwards): - src, dest = from_to + for from_to, forward in zip(config.from_to.items(), active_forwards): + src, destV = from_to + dest = destV["dest"] + pcfg_id = destV["pcfg"] last_id = 0 forward: config.Forward logging.info(f"Forwarding messages from {src} to {dest}") @@ -55,7 +60,8 @@ async def forward_job() -> None: if isinstance(message, MessageService): continue try: - tm = await apply_plugins(message) + + tm = await apply_plugins(pcfg_id, message) if not tm: continue st.stored[event_uid] = {} @@ -68,15 +74,17 @@ async def forward_job() -> None: for d in dest: if message.is_reply and r_event_uid in st.stored: tm.reply_to = st.stored.get(r_event_uid).get(d) - fwded_msg = await send_message(d, tm) + fwded_msg = await send_message(agent_id, d, tm) st.stored[event_uid].update({d: fwded_msg.id}) tm.clear() last_id = message.id logging.info(f"forwarding message with id = {last_id}") forward.offset = last_id write_config(CONFIG, persist=False) - time.sleep(CONFIG.past.delay) - logging.info(f"slept for {CONFIG.past.delay} seconds") + time.sleep(CONFIG.agent_fwd_cfg[agent_id].past.delay) + logging.info( + f"slept for {CONFIG.agent_fwd_cfg[agent_id].past.delay} seconds" + ) except FloodWaitError as fwe: logging.info(f"Sleeping for {fwe}") diff --git a/tgcf/plugin_models.py b/tgcf/plugin_models.py index 2e58e7c1..715e4df2 100644 --- a/tgcf/plugin_models.py +++ b/tgcf/plugin_models.py @@ -81,13 +81,16 @@ class Caption(BaseModel): header: str = "" footer: str = "" + class Sender(BaseModel): check: bool = False user_type: int = 0 # 0:bot, 1:user BOT_TOKEN: str = "" SESSION_STRING: str = "" + class PluginConfig(BaseModel): + alias: str = "" filter: Filters = Filters() fmt: Format = Format() mark: MarkConfig = MarkConfig() @@ -98,4 +101,4 @@ class PluginConfig(BaseModel): # List of plugins that need to load asynchronously -ASYNC_PLUGIN_IDS = ['sender'] \ No newline at end of file +ASYNC_PLUGIN_IDS = ["sender"] diff --git a/tgcf/utils.py b/tgcf/utils.py index fc6f741a..300bc0ae 100644 --- a/tgcf/utils.py +++ b/tgcf/utils.py @@ -29,10 +29,12 @@ def platform_info(): \n{platform.architecture()} {platform.processor()}""" -async def send_message(recipient: EntityLike, tm: "TgcfMessage") -> Message: +async def send_message( + agent_id: int, recipient: EntityLike, tm: "TgcfMessage" +) -> Message: """Forward or send a copy, depending on config.""" client: TelegramClient = tm.client - if CONFIG.show_forwarded_from: + if CONFIG.agent_fwd_cfg[agent_id].show_forwarded_from: return await client.forward_messages(recipient, tm.message) if tm.new_file: message = await client.send_file(