Skip to content

Commit

Permalink
my.telegram.telegram_backup: support optional extra_where and optiona…
Browse files Browse the repository at this point in the history
…l media info extraction for Promnesia
  • Loading branch information
karlicoss committed Mar 27, 2023
1 parent 74710b3 commit 8288032
Showing 1 changed file with 86 additions and 6 deletions.
92 changes: 86 additions & 6 deletions my/telegram/telegram_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from dataclasses import dataclass
from datetime import datetime, timezone
from struct import unpack_from, calcsize
import sqlite3
from typing import Dict, Iterator, Optional

Expand Down Expand Up @@ -43,6 +44,7 @@ class Message:
chat: Chat
sender: User
text: str
extra_media_info: Optional[str] = None

@property
def permalink(self) -> str:
Expand All @@ -60,25 +62,41 @@ def permalink(self) -> str:


Chats = Dict[str, Chat]
def _message_from_row(r: sqlite3.Row, *, chats: Chats) -> Message:
def _message_from_row(r: sqlite3.Row, *, chats: Chats, with_extra_media_info: bool) -> Message:
ts = r['time']
# desktop export uses UTC (checked by exporting in winter time vs summer time)
# and telegram_backup timestamps seem same as in desktop export
time = datetime.fromtimestamp(ts, tz=timezone.utc)
chat = chats[r['source_id']]
sender = chats[r['sender_id']]

extra_media_info: Optional[str] = None
if with_extra_media_info and r['has_media'] == 1:
# also it's quite hacky, so at least for now it's just an optional attribute behind the flag
# defensive because it's a bit tricky to correctly parse without a proper api parser..
# maybe later we'll improve it
try:
extra_media_info = _extract_extra_media_info(data=r['data'])
except Exception as e:
pass

return Message(
id=r['message_id'],
time=time,
chat=chat,
sender=User(id=sender.id, name=sender.name),
text=r['text'],
extra_media_info=extra_media_info,
)


def messages() -> Iterator[Message]:
with sqlite_connection(config.export_path, immutable=True, row_factory='row') as db:
def messages(*, extra_where: Optional[str]=None, with_extra_media_info: bool=False) -> Iterator[Message]:
messages_query = 'SELECT * FROM messages WHERE message_type NOT IN ("service_message", "empty_message")'
if extra_where is not None:
messages_query += ' AND ' + extra_where
messages_query += ' ORDER BY time'

with sqlite_connection(config.export_path, immutable=True, row_factory='row') as db:
chats: Chats = {}
for r in db.execute('SELECT * FROM chats ORDER BY id'):
chat = Chat(id=r['id'], name=r['name'], handle=None)
Expand All @@ -98,7 +116,69 @@ def messages() -> Iterator[Message]:
assert chat.id not in chats
chats[chat.id] = chat

for r in db.execute('SELECT * FROM messages WHERE message_type NOT IN ("service_message", "empty_message") ORDER BY time'):
for r in db.execute(messages_query):
# seems like the only remaining have message_type = 'message'
yield _message_from_row(r, chats=chats)

yield _message_from_row(r, chats=chats, with_extra_media_info=with_extra_media_info)


def _extract_extra_media_info(data: bytes) -> Optional[str]:
# ugh... very hacky, but it does manage to extract from 90% of messages that have media
pos = 0

def skip(count: int) -> None:
nonlocal pos
pos += count

def getstring() -> str:
# jesus
# https://core.telegram.org/type/string
if data[pos] == 254:
skip(1)
(sz1, sz2, sz3) = unpack_from('BBB', data, offset=pos)
skip(3)
sz = 256 ** 2 * sz3 + 256 * sz2 + sz1
short = 0
else:
(sz, ) = unpack_from('B', data, offset=pos)
skip(1)
short = 1
assert sz > 0, sz

padding = 0 if (sz + short) % 4 == 0 else 4 - (sz + short) % 4

(ss,) = unpack_from(f'{sz}s{padding}x', data, offset=pos)
skip(sz + padding)
try:
return ss.decode('utf8')
except UnicodeDecodeError as e:
raise RuntimeError(f'Failed to decode {ss}') from e

def debug(count: int=10) -> None:
print([hex(x) for x in data[pos: pos + count]])
print([chr(x) for x in data[pos: pos + count]])

header = 'H2xII8xI'
(flags, mid, src, ts) = unpack_from(header, data, offset=pos)
pos += calcsize(header)

# see https://core.telegram.org/constructor/message
has_media = (flags >> 9) & 1
if has_media == 0:
return None

msg_body = getstring()
skip(20)
url1 = getstring()
url2 = getstring()
ss_type = getstring()
# not sure if assert is really necessary her
# assert ss_type in {
# 'article',
# 'photo',
# 'app',
# 'video',
# }, ss_type
link_title = getstring()
link_title_2 = getstring()
link_description = getstring()
return link_description

0 comments on commit 8288032

Please sign in to comment.