Skip to content

Commit

Permalink
refactor: improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
en9inerd committed Jun 9, 2024
1 parent bb08dbd commit 55e5574
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 108 deletions.
79 changes: 46 additions & 33 deletions tgeraser/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
from .__version__ import VERSION
from .eraser import Eraser
from .exceptions import TgEraserException
from .utils import cast_to_int, get_credentials
from .utils import cast_to_int, get_credentials, parse_time_period


def signal_handler():
"""
Signal handler
"""
print("\nCtrl+C captured, exiting...")
sys.stdout.flush()
os._exit(0)
Expand All @@ -51,63 +54,73 @@ async def main() -> None:
loop.add_signal_handler(signal.SIGINT, signal_handler)

arguments = docopt(__doc__, version=VERSION)
if arguments["--limit"]:
arguments["--limit"] = cast_to_int(arguments["--limit"], "limit")
arguments["--limit"] = (
cast_to_int(arguments["--limit"], "limit") if arguments["--limit"] else None
)
if arguments["--time-period"]:
periods = {
"seconds": 1,
"minutes": 60,
"hours": 3600,
"days": 86400,
"weeks": 604800,
}
period = arguments["--time-period"].split("*")
if period[1] not in periods:
raise TgEraserException("Time period is specified incorrectly.")
period = parse_time_period(arguments["--time-period"], "--time-period")

if arguments["--kill"]:
if os.name != "posix":
raise TgEraserException("You can't use '--kill' option on Windows.")
cmd = subprocess.Popen(["ps", "-A"], stdout=subprocess.PIPE)
out = cmd.communicate()[0]
current_pid = os.getpid()
for line in out.splitlines():
if b"tgeraser" in line:
pid = int(line.split(None, 1)[0])
if current_pid != pid:
os.kill(pid, signal.SIGKILL)
print("Process " + str(pid) + " successfully killed.")
kill_existing_processes()
sys.exit(0)

try:
credentials = await get_credentials(arguments)

kwargs = {
**credentials,
"peers": arguments["--peers"],
"limit": arguments["--limit"],
"wipe_everything": arguments["--wipe-everything"],
"entity_type": arguments["--entity-type"],
}
await run_eraser(kwargs, period if arguments["--time-period"] else None)
except ValueError as err:
print(f"ValueError: {err}")
except TgEraserException as err:
print(f"TgEraserException: {err}")
except Exception as err:
raise TgEraserException(f"An unexpected error occurred: {err}") from err


client = Eraser(**credentials)
async def run_eraser(kwargs, period):
"""
Runs the eraser
"""
client = Eraser(**kwargs)
try:
while True:
await client.init(**kwargs)
await client.init()
await client.run()
if arguments["--time-period"]:
if period:
print(
"\n({0})\tNext erasing will be in {1} {2}.".format(
time.strftime("%Y-%m-%d, %H:%M:%S", time.gmtime()),
period[0],
period[1],
period["value"],
period["unit"],
)
)
await asyncio.sleep(int(period[0]) * periods[period[1]])
await asyncio.sleep(period["time"])
else:
break
finally:
await client.disconnect()
except Exception as err:
raise TgEraserException(err) from err


def kill_existing_processes():
"""
Kills existing background TgEraser processes
"""
if os.name != "posix":
raise TgEraserException("You can't use '--kill' option on Windows.")
cmd = subprocess.Popen(["ps", "-A"], stdout=subprocess.PIPE)
out, _ = cmd.communicate()
current_pid = os.getpid()
for line in out.splitlines():
if b"tgeraser" in line:
pid = int(line.split(None, 1)[0])
if current_pid != pid:
os.kill(pid, signal.SIGKILL)
print("Process " + str(pid) + " successfully killed.")


def entry() -> None:
Expand Down
159 changes: 84 additions & 75 deletions tgeraser/eraser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,46 @@ class Eraser(TelegramClient): # type: ignore
Subclass of TelegramClient class
"""

def __init__(self: TelegramClient, **credentials: Any) -> None:
def __init__(self: TelegramClient, **kwargs: Any) -> None:
super().__init__(
session=credentials["session_name"],
api_id=credentials["api_id"],
api_hash=credentials["api_hash"],
session=kwargs["session_name"],
api_id=kwargs["api_id"],
api_hash=kwargs["api_hash"],
connection=ConnectionTcpAbridged,
device_model=platform.uname().system,
system_version=platform.uname().release,
app_version=VERSION,
)

async def init(self, **kwargs: Any) -> None:
"""
Initializes the client
"""
self.__limit = kwargs["limit"] # Limit to retrieve the top dialogs
self.__limit = kwargs["limit"]
self.__peers = kwargs["peers"].split(",") if kwargs["peers"] else []
self.__wipe_everything = kwargs["wipe_everything"]
self.__entity_type = kwargs["entity_type"]
self.__entities: List[hints.Entity] = []
self.__display_name = ""

# Check connection to the server
async def init(self) -> None:
"""
Initializes the client
"""
await self._connect_to_server()
await self._authorize_user()

async def _connect_to_server(self) -> None:
"""
Connects to the Telegram server
"""
print("Connecting to Telegram servers...")
try:
await self.connect()
except ConnectionError:
print("Initial connection failed. Retrying...")
await self.connect()

# Check authorization
async def _authorize_user(self) -> None:
"""
Authorizes the user
"""
if not await self.is_user_authorized():
print("First run. Sending code request...")

user_phone = await async_input("Enter your phone: ")
await self.sign_in(user_phone)

Expand All @@ -64,8 +70,6 @@ async def init(self, **kwargs: Any) -> None:
code = await async_input("Enter the code you just received: ")
try:
self_user = await self.sign_in(code=code)

# Two-step verification may be enabled
except SessionPasswordNeededError:
password = getpass(
"Two step verification is enabled. "
Expand All @@ -78,90 +82,96 @@ async def run(self) -> None:
"""
Runs deletion of messages from peer
"""
await self._determine_entities()
await self._delete_messages_from_entities()
self.__entities.clear()

async def _determine_entities(self) -> None:
"""
Determines entities to delete messages from
"""
if self.__peers:
for peer in self.__peers:
if peer.isdigit():
peer = cast_to_int(peer, f"peer: {peer}")
try:
self.__entities.append(await self.get_entity(peer))
except ValueError as err:
raise TgEraserException(
f"Specified entity '{peer}' can't be found."
) from err
await self._get_entities_by_peers()
elif self.__wipe_everything:
self.__entities = await self.__filter_entities()
self.__entities = await self._filter_entities()
else:
await self.__get_entity()
await self._get_user_selected_entity()

async def _get_entities_by_peers(self) -> None:
"""
Returns entities by peers
"""
for peer in self.__peers:
if peer.isdigit():
peer = cast_to_int(peer, f"peer: {peer}")
try:
self.__entities.append(await self.get_entity(peer))
except ValueError as err:
raise TgEraserException(
f"Specified entity '{peer}' can't be found."
) from err

async def _delete_messages_from_entities(self) -> None:
"""
Deletes messages from entities
"""
for entity in self.__entities:
self.__display_name = get_display_name(entity)
print_header(f"Getting messages from '{self.__display_name}'...")
display_name = get_display_name(entity)
print_header(f"Getting messages from '{display_name}'...")
messages_to_delete = [
msg.id
for msg in await self.get_messages(
entity, from_user=InputUserSelf(), limit=None, wait_time=None
)
]
print(f"\nFound {len(messages_to_delete)} messages to delete.")
await self._delete_messages(entity, messages_to_delete, display_name)

print_header(f"Deleting messages from '{self.__display_name}'...")
result = await self.delete_messages(entity, messages_to_delete, revoke=True)
number_of_deleted_msgs = sum(
[result[i].pts_count for i in range(len(result))]
)
async def _delete_messages(
self, entity: hints.Entity, messages_to_delete: List[int], display_name: str
) -> None:
"""
Deletes messages
"""
print_header(f"Deleting messages from '{display_name}'...")
result = await self.delete_messages(entity, messages_to_delete, revoke=True)
number_of_deleted_msgs = sum([result[i].pts_count for i in range(len(result))])
print(
f"\nDeleted {number_of_deleted_msgs} messages of {len(messages_to_delete)} in '{display_name}' entity."
)

if number_of_deleted_msgs < len(messages_to_delete):
print(
f"\nDeleted {number_of_deleted_msgs} messages of {len(messages_to_delete)} in '{self.__display_name}' entity."
f"Remaining {len(messages_to_delete) - number_of_deleted_msgs} messages can't be deleted without admin rights because they are service messages."
)

if number_of_deleted_msgs < len(messages_to_delete):
print(
f"Remaining {len(messages_to_delete) - number_of_deleted_msgs} messages can't be deleted without admin rights because they are service messages."
)
print("\n")

print("\n")

self.__entities.clear()

async def __filter_entities(self) -> List[hints.EntityLike]:
async def _filter_entities(self) -> List[hints.EntityLike]:
"""
Returns requested entities
Returns requested filtered entities
"""
entities = [d.entity for d in await self.get_dialogs(limit=self.__limit)]

if self.__entity_type == "any":
return entities
elif self.__entity_type == "user":
return [
entity
for entity in entities
if isinstance(entity, User) and not entity.is_self
]
elif self.__entity_type == "chat":
return [
entity
for entity in entities
if (
isinstance(entity, Chat)
or (isinstance(entity, Channel) and entity.megagroup)
or (isinstance(entity, Channel) and entity.gigagroup)
)
]
elif self.__entity_type == "channel":
return [
entity
for entity in entities
if isinstance(entity, Channel) and not entity.megagroup
]
else:
entity_filters = {
"any": lambda e: True,
"user": lambda e: isinstance(e, User) and not e.is_self,
"chat": lambda e: isinstance(e, Chat)
or (isinstance(e, Channel) and (e.megagroup or e.gigagroup)),
"channel": lambda e: isinstance(e, Channel) and not e.megagroup,
}
if self.__entity_type not in entity_filters:
raise TgEraserException(
f"Error: wrong entity type: '{self.__entity_type}'. Use 'any', 'user', 'chat' or 'channel'."
)
return [
entity for entity in entities if entity_filters[self.__entity_type](entity)
]

async def __get_entity(self) -> None:
async def _get_entity(self) -> None:
"""
Returns chosen peer
"""
entities = await self.__filter_entities()
entities = await self._filter_entities()

if not entities:
raise TgEraserException("You aren't joined to any chat.")
Expand All @@ -172,5 +182,4 @@ async def __get_entity(self) -> None:

num = cast_to_int(await async_input("\nChoose peer: "), "peer")
self.__entities = [entities[num - 1]]
self.__display_name = get_display_name(self.__entities[0])
print("Chosen: " + self.__display_name)
print("Chosen: " + get_display_name(self.__entities[0]))
23 changes: 23 additions & 0 deletions tgeraser/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async def get_credentials(args: Dict[str, Any]) -> Dict[str, Union[str, int]]:
+ "/"
+ (args["<session_name>"] or await choose_session(path_to_creds_dir))
)
except FileNotFoundError as err:
raise TgEraserException(f"Error: {err}") from err
except Exception as e:
raise TgEraserException(f"Error in get_credentials: {e}")

Expand Down Expand Up @@ -112,3 +114,24 @@ def list_sessions(directory: str) -> list[str]:
for file in os.listdir(directory)
if file.endswith(".session")
]


def parse_time_period(time_period: str, option_name: str) -> Dict[str, str | int]:
"""
Parse time period
"""
periods = {
"seconds": 1,
"minutes": 60,
"hours": 3600,
"days": 86400,
"weeks": 604800,
}
period = time_period.split("*")
if period[1] not in periods:
raise TgEraserException(f"'{option_name}' is specified incorrectly.")
return {
"time": int(period[0]) * periods[period[1]],
"unit": period[1],
"value": period[0],
}

0 comments on commit 55e5574

Please sign in to comment.