Skip to content

Commit

Permalink
allow multi forwarding agents & multi plugin configs
Browse files Browse the repository at this point in the history
  • Loading branch information
aahnik committed Dec 9, 2023
1 parent f0d5859 commit a0a4750
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 58 deletions.
9 changes: 6 additions & 3 deletions tgcf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def main(
mode: Mode = typer.Argument(
..., help="Choose the mode in which you want to run tgcf.", envvar="TGCF_MODE"
),
agent_id: Optional[int] = typer.Argument(
default=0, help="Choose the agent to use for message forwarding."
),
verbose: Optional[bool] = typer.Option( # pylint: disable=unused-argument
None,
"--loud",
Expand Down Expand Up @@ -113,11 +116,11 @@ def main(
if mode == Mode.PAST:
from tgcf.past import forward_job # pylint: disable=import-outside-toplevel

asyncio.run(forward_job())
asyncio.run(forward_job(agent_id))
else:
from tgcf.live import start_sync # pylint: disable=import-outside-toplevel

asyncio.run(start_sync())
asyncio.run(start_sync(agent_id))


# AAHNIK 2021
# AAHNIK 2023
100 changes: 78 additions & 22 deletions tgcf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import sys
from typing import Dict, List, Optional, Union, Any
from typing import Any, Dict, List, Optional, Union

from dotenv import load_dotenv
from pydantic import BaseModel, validator # pylint: disable=no-name-in-module
Expand Down Expand Up @@ -32,6 +32,9 @@ class Forward(BaseModel):
offset: int = 0
end: Optional[int] = 0

agent: int = 0 # the agent id to use for this connection
plugin_cfg: int = 0 # the plugin configuration id to use for this connection


class LiveSettings(BaseModel):
"""Settings to configure how tgcf operates in live mode."""
Expand Down Expand Up @@ -60,15 +63,44 @@ def validate_delay(cls, val): # pylint: disable=no-self-use,no-self-argument
return val


class LoginConfig(BaseModel):
# class LoginConfig(BaseModel):

# API_ID: int = 0
# API_HASH: str = ""


# user_type: int = 0 # 0:bot, 1:user
# phone_no: int = 91
# USERNAME: str = ""
# SESSION_STRING: str = ""
# BOT_TOKEN: str = ""


class TgAPIConfig(BaseModel):
API_ID: int = 0
API_HASH: str = ""


class AgentLoginConfig(BaseModel):
alias: str = ""
user_type: int = 0 # 0:bot, 1:user
phone_no: int = 91
USERNAME: str = ""
SESSION_STRING: str = ""
BOT_TOKEN: str = ""
is_active: bool = False


class AgentForwardingConfig(BaseModel):
show_forwarded_from: bool = False
mode: int = 0 # 0: live, 1:past
live: LiveSettings = LiveSettings()
past: PastSettings = PastSettings()


class LoginConfig(BaseModel):
tg: TgAPIConfig = TgAPIConfig()
agents: List[AgentLoginConfig] = [AgentLoginConfig()]


class BotMessages(BaseModel):
Expand All @@ -82,15 +114,11 @@ class Config(BaseModel):
# pylint: disable=too-few-public-
pid: int = 0
theme: str = "light"
login: LoginConfig = LoginConfig()
login_cfg: LoginConfig = LoginConfig()
admins: List[Union[int, str]] = []
forwards: List[Forward] = []
show_forwarded_from: bool = False
mode: int = 0 # 0: live, 1:past
live: LiveSettings = LiveSettings()
past: PastSettings = PastSettings()

plugins: PluginConfig = PluginConfig()
agent_fwd_cfg: List[AgentForwardingConfig] = [AgentForwardingConfig()]
plugin_cfgs: List[PluginConfig] = [PluginConfig()]
bot_messages = BotMessages()


Expand Down Expand Up @@ -165,38 +193,62 @@ async def get_id(client: TelegramClient, peer):
return await client.get_peer_id(peer)


async def load_active_forwards(agent_id: int, forwards: List[Forward]) -> List[Forward]:
active_forwards: List[Forward] = []
for forward in forwards:
if forward.agent != agent_id:
continue
if not forward.use_this:
continue
active_forwards.append(forward)
return active_forwards


async def load_from_to(
client: TelegramClient, forwards: List[Forward]
) -> Dict[int, List[int]]:
agent_id: int,
client: TelegramClient,
forwards: List[Forward],
) -> Dict[int, Dict[str, List[int] | int]]:
"""Convert a list of Forward objects to a mapping.
The active connections of current agent are included.
Args:
client: Instance of Telegram client (logged in)
forwards: List of Forward objects
Returns:
Dict: key = chat id of source
value = List of chat ids of destinations
value = dict
dest: List of chat ids of destinations
pcgf: id of plugin config to use
Notes:
-> The Forward objects may contain username/phn no/links
-> But this mapping strictly contains signed integer chat ids
-> Chat ids are essential for how storage is implemented
-> Storage is essential for edit, delete and reply syncs
"""
from_to_dict = {}
from_to_dict: dict = {}

async def _(peer):
return await get_id(client, peer)

for forward in forwards:
if forward.agent != agent_id:
continue
if not forward.use_this:
continue
source = forward.source
if not isinstance(source, int) and source.strip() == "":
continue
src = await _(forward.source)
from_to_dict[src] = [await _(dest) for dest in forward.dest]
# from_to_dict[src] = {
# "dest": [], # the list of destination entities
# "pcfg": 0 # id of the plugin config to use
# }
from_to_dict[src] = {}
from_to_dict[src]["dest"] = [await _(dest) for dest in forward.dest]
from_to_dict[src]["pcfg"] = forward.plugin_cfg
logging.info(f"From to dict is {from_to_dict}")
return from_to_dict

Expand Down Expand Up @@ -229,7 +281,7 @@ def read_db():


PASSWORD = os.getenv("PASSWORD", "tgcf")
ADMINS = []
ADMINS: List = []

MONGO_CON_STR = os.getenv("MONGO_CON_STR")
MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "tgcf-config")
Expand All @@ -242,19 +294,23 @@ def read_db():
logging.warn(
"You have not set a password to protect the web access to tgcf.\nThe default password `tgcf` is used."
)
from_to = {}
from_to: Dict[int, Dict[str, int | List[int]]] = {}
is_bot: Optional[bool] = None
logging.info("config.py got executed")


def get_SESSION(section: Any = CONFIG.login, default: str = 'tgcf_bot'):
if section.SESSION_STRING and section.user_type == 1:
def get_SESSION(
agent_id: int, login_cfg: LoginConfig = CONFIG.login_cfg, default: str = "tgcf_bot"
):
# TODO: validate agent_id
agent = login_cfg.agents[agent_id]
if agent.SESSION_STRING and agent.user_type == 1:
logging.info("using session string")
SESSION = StringSession(section.SESSION_STRING)
elif section.BOT_TOKEN and section.user_type == 0:
SESSION = StringSession(agent.SESSION_STRING)
elif agent.BOT_TOKEN and agent.user_type == 0:
logging.info("using bot account")
SESSION = default
SESSION = default + str(agent_id)
else:
logging.warning("Login information not set!")
sys.exit()
return SESSION
return SESSION
47 changes: 30 additions & 17 deletions tgcf/live.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from tgcf.plugins import apply_plugins, load_async_plugins
from tgcf.utils import clean_session_files, send_message

current_agent: int = 0


async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
"""Process new incoming messages."""
Expand All @@ -36,9 +38,10 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
del st.stored[key]
break

dest = config.from_to.get(chat_id)
dest = config.from_to.get(chat_id).get("dest")
pcfg_id = config.from_to.get(chat_id).get("pcfg")

tm = await apply_plugins(message)
tm = await apply_plugins(pcfg_id, message)
if not tm:
return

Expand All @@ -50,7 +53,7 @@ async def new_message_handler(event: Union[Message, events.NewMessage]) -> None:
for d in dest:
if event.is_reply and r_event_uid in st.stored:
tm.reply_to = st.stored.get(r_event_uid).get(d)
fwded_msg = await send_message(d, tm)
fwded_msg = await send_message(current_agent, d, tm)
st.stored[event_uid].update({d: fwded_msg})
tm.clear()

Expand All @@ -67,8 +70,9 @@ async def edited_message_handler(event) -> None:
logging.info(f"Message edited in {chat_id}")

event_uid = st.EventUid(event)
pcfg_id = config.from_to.get(chat_id).get("pcfg")

tm = await apply_plugins(message)
tm = await apply_plugins(pcfg_id, message)

if not tm:
return
Expand All @@ -77,17 +81,20 @@ async def edited_message_handler(event) -> None:

if fwded_msgs:
for _, msg in fwded_msgs.items():
if config.CONFIG.live.delete_on_edit == message.text:
if (
config.CONFIG.agent_fwd_cfg[current_agent].live.delete_on_edit
== message.text
):
await msg.delete()
await message.delete()
else:
await msg.edit(tm.text)
return

dest = config.from_to.get(chat_id)
dest = config.from_to.get(chat_id).get("dest")

for d in dest:
await send_message(d, tm)
await send_message(current_agent, d, tm)
tm.clear()


Expand All @@ -114,26 +121,29 @@ async def deleted_message_handler(event):
}


async def start_sync() -> None:
async def start_sync(agent_id: int) -> None:
"""Start tgcf live sync."""
# clear past session files
clean_session_files()
global current_agent
current_agent = agent_id

# load async plugins defined in plugin_models
await load_async_plugins()

SESSION = get_SESSION()
SESSION = get_SESSION(agent_id)
client = TelegramClient(
SESSION,
CONFIG.login.API_ID,
CONFIG.login.API_HASH,
sequential_updates=CONFIG.live.sequential_updates,
CONFIG.login_cfg.tg.API_ID,
CONFIG.login_cfg.tg.API_HASH,
sequential_updates=CONFIG.agent_fwd_cfg[agent_id].live.sequential_updates,
)
if CONFIG.login.user_type == 0:
if CONFIG.login.BOT_TOKEN == "":
agent = CONFIG.login_cfg.agents[agent_id]
if agent.user_type == 0:
if agent.BOT_TOKEN == "":
logging.warning("Bot token not found, but login type is set to bot.")
sys.exit()
await client.start(bot_token=CONFIG.login.BOT_TOKEN)
await client.start(bot_token=agent.BOT_TOKEN)
else:
await client.start()
config.is_bot = await client.is_bot()
Expand All @@ -145,7 +155,10 @@ async def start_sync() -> None:
ALL_EVENTS.update(command_events)

for key, val in ALL_EVENTS.items():
if config.CONFIG.live.delete_sync is False and key == "deleted":
if (
config.CONFIG.agent_fwd_cfg[agent_id].live.delete_sync is False
and key == "deleted"
):
continue
client.add_event_handler(*val)
logging.info(f"Added event handler for {key}")
Expand All @@ -161,5 +174,5 @@ async def start_sync() -> None:
],
)
)
config.from_to = await config.load_from_to(client, config.CONFIG.forwards)
config.from_to = await config.load_from_to(agent_id, client, config.CONFIG.forwards)
await client.run_until_disconnected()
Loading

0 comments on commit a0a4750

Please sign in to comment.