Skip to content

Commit c308394

Browse files
committed
Add active user tracking and optional bridge blocking
1 parent 66aa68f commit c308394

File tree

8 files changed

+182
-2
lines changed

8 files changed

+182
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""add account activity
2+
3+
Revision ID: 97404229e75e
4+
Revises: bfc0a39bfe02
5+
Create Date: 2021-09-07 10:38:41.655301
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '97404229e75e'
14+
down_revision = 'bfc0a39bfe02'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
# ### commands auto generated by Alembic - please adjust! ###
21+
op.create_table('user_activity',
22+
sa.Column('puppet_id', sa.BigInteger(), nullable=False),
23+
sa.Column('first_activity_ts', sa.Integer(), nullable=False),
24+
sa.Column('last_activity_ts', sa.Integer(), nullable=False),
25+
sa.PrimaryKeyConstraint('puppet_id')
26+
)
27+
# ### end Alembic commands ###
28+
29+
30+
def downgrade():
31+
# ### commands auto generated by Alembic - please adjust! ###
32+
op.drop_table('user_activity')
33+
# ### end Alembic commands ###

mautrix_telegram/__main__.py

+39
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
# You should have received a copy of the GNU Affero General Public License
1515
# along with this program. If not, see <https://www.gnu.org/licenses/>.
1616
from typing import Dict, Any
17+
import asyncio
1718

1819
from telethon import __version__ as __telethon_version__
1920
from alchemysession import AlchemySessionContainer
2021

2122
from mautrix.types import UserID, RoomID
2223
from mautrix.bridge import Bridge
2324
from mautrix.util.db import Base
25+
from mautrix.util.opt_prometheus import Gauge
2426

2527
from .web.provisioning import ProvisioningAPI
2628
from .web.public import PublicBridgeWebsite
@@ -29,6 +31,7 @@
2931
from .config import Config
3032
from .context import Context
3133
from .db import init as init_db
34+
from .db.user_activity import UserActivity
3235
from .formatter import init as init_formatter
3336
from .matrix import MatrixHandler
3437
from .portal import Portal, init as init_portal
@@ -41,6 +44,9 @@
4144
except ImportError:
4245
prometheus = None
4346

47+
ACTIVE_USER_METRICS_INTERVAL_S = 5
48+
METRIC_ACTIVE_PUPPETS = Gauge('bridge_active_puppets_total', 'Number of active Telegram users bridged into Matrix')
49+
METRIC_BLOCKING = Gauge('bridge_blocked', 'Is the bridge currently blocking messages')
4450

4551
class TelegramBridge(Bridge):
4652
module = "mautrix_telegram"
@@ -58,6 +64,9 @@ class TelegramBridge(Bridge):
5864
session_container: AlchemySessionContainer
5965
bot: Bot
6066

67+
periodic_active_metrics_task: asyncio.Task
68+
is_blocked: bool = False
69+
6170
def prepare_db(self) -> None:
6271
super().prepare_db()
6372
init_db(self.db)
@@ -92,6 +101,7 @@ def prepare_bridge(self) -> None:
92101
self.add_startup_actions(self.bot.start())
93102
if self.config["bridge.resend_bridge_info"]:
94103
self.add_startup_actions(self.resend_bridge_info())
104+
self.add_startup_actions(self._loop_active_puppet_metric())
95105

96106
async def resend_bridge_info(self) -> None:
97107
self.config["bridge.resend_bridge_info"] = False
@@ -127,6 +137,35 @@ def is_bridge_ghost(self, user_id: UserID) -> bool:
127137
async def count_logged_in_users(self) -> int:
128138
return len([user for user in User.by_tgid.values() if user.tgid])
129139

140+
async def _update_active_puppet_metric(self) -> None:
141+
active_users = UserActivity.get_active_count(
142+
self.config['bridge.limits.puppet_inactivity_days'],
143+
self.config['bridge.limits.min_puppet_activity_days'],
144+
)
145+
146+
block_on_limit_reached = self.config['bridge.limits.block_on_limit_reached']
147+
max_puppet_limit = self.config['bridge.limits.max_puppet_limit']
148+
if block_on_limit_reached is not None and max_puppet_limit is not None:
149+
self.is_blocked = max_puppet_limit < active_users
150+
METRIC_BLOCKING.set(int(self.is_blocked))
151+
self.log.debug(f"Current active puppet count is {active_users}")
152+
METRIC_ACTIVE_PUPPETS.set(active_users)
153+
154+
async def _loop_active_puppet_metric(self) -> None:
155+
while True:
156+
try:
157+
await asyncio.sleep(ACTIVE_USER_METRICS_INTERVAL_S)
158+
except asyncio.CancelledError:
159+
return
160+
self.log.info("Executing periodic active puppet metric check")
161+
try:
162+
await self._update_active_puppet_metric()
163+
except asyncio.CancelledError:
164+
return
165+
except Exception as e:
166+
self.log.exception(f"Error while checking: {e}")
167+
168+
130169
async def manhole_global_namespace(self, user_id: UserID) -> Dict[str, Any]:
131170
return {
132171
**await super().manhole_global_namespace(user_id),

mautrix_telegram/config.py

+5
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
169169

170170
copy("bridge.command_prefix")
171171

172+
copy("bridge.limits.max_puppet_limit")
173+
copy("bridge.limits.min_puppet_activity_days")
174+
copy("bridge.limits.puppet_inactivity_days")
175+
copy("bridge.limits.block_on_limit_reached")
176+
172177
migrate_permissions = ("bridge.permissions" not in self
173178
or "bridge.whitelist" in self
174179
or "bridge.admins" in self)

mautrix_telegram/db/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323
from .puppet import Puppet
2424
from .telegram_file import TelegramFile
2525
from .user import User, UserPortal, Contact
26+
from .user_activity import UserActivity
2627

2728

2829
def init(db_engine: Engine) -> None:
2930
for table in (Portal, Message, User, Contact, UserPortal, Puppet, TelegramFile, UserProfile,
30-
RoomState, BotChat):
31+
RoomState, BotChat, UserActivity):
3132
table.bind(db_engine)

mautrix_telegram/db/user_activity.py

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# mautrix-telegram - A Matrix-Telegram puppeting bridge
2+
# Copyright (C) 2019 Tulir Asokan
3+
# Copyright (C) 2021 Tadeusz Sośnierz
4+
#
5+
# This program is free software: you can redistribute it and/or modify
6+
# it under the terms of the GNU Affero General Public License as published by
7+
# the Free Software Foundation, either version 3 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU Affero General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU Affero General Public License
16+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
17+
from typing import Optional, Iterable
18+
19+
from sqlalchemy import Column, Integer, BigInteger
20+
21+
from mautrix.util.db import Base
22+
from mautrix.util.logging import TraceLogger
23+
24+
from ..types import TelegramID
25+
26+
import logging
27+
import datetime
28+
import time
29+
30+
UPPER_ACTIVITY_LIMIT_MS = 60 * 1000 * 5 # 5 minutes
31+
ONE_DAY_MS = 24 * 60 * 60 * 1000
32+
33+
34+
class UserActivity(Base):
35+
__tablename__ = "user_activity"
36+
37+
log: TraceLogger = logging.getLogger("mau.user_activity")
38+
39+
puppet_id: TelegramID = Column(BigInteger, primary_key=True)
40+
first_activity_ts: Optional[int] = Column(Integer)
41+
last_activity_ts: Optional[int] = Column(Integer)
42+
43+
def update(self, activity_ts: int) -> None:
44+
if self.last_activity_ts > activity_ts:
45+
return
46+
47+
self.last_activity_ts = activity_ts
48+
49+
self.edit(last_activity_ts=self.last_activity_ts)
50+
51+
@classmethod
52+
def update_for_puppet(cls, puppet: 'Puppet', activity_dt: datetime) -> None:
53+
activity_ts = int(activity_dt.timestamp() * 1000)
54+
55+
if (time.time() * 1000) - activity_ts > UPPER_ACTIVITY_LIMIT_MS:
56+
return
57+
58+
cls.log.debug(f"Updating activity time for {puppet.id} to {activity_ts}")
59+
obj = cls._select_one_or_none(cls.c.puppet_id == puppet.id)
60+
if obj:
61+
obj.update(activity_ts)
62+
else:
63+
obj = UserActivity(
64+
puppet_id=puppet.id,
65+
first_activity_ts=activity_ts,
66+
last_activity_ts=activity_ts,
67+
)
68+
obj.insert()
69+
70+
@classmethod
71+
def get_active_count(cls, min_activity_days: int, max_activity_days: Optional[int]) -> int:
72+
current_ms = time.time() / 1000
73+
active_count = 0
74+
for user in cls._select_all():
75+
activity_days = (user.last_activity_ts - user.first_activity_ts / 1000) / ONE_DAY_MS
76+
# If maxActivityTime is not set, they are always active
77+
is_active = max_activity_days is None or (current_ms - user.last_activity_ts) <= (max_activity_days * ONE_DAY_MS)
78+
if is_active and activity_days > min_activity_days:
79+
active_count += 1
80+
return active_count

mautrix_telegram/example-config.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,18 @@ bridge:
443443
- myusername
444444
- 12345678
445445

446+
# Limit usage of the bridge
447+
limits:
448+
# The maximum number of bridge puppets that can be "active" before the limit is reached
449+
max_puppet_limit: 0
450+
# The minimum amount of days a puppet must be active for before they are considered "active".
451+
min_puppet_activity_days: 0
452+
# The number of days after a puppets last activity where they are considered inactive again.
453+
puppet_inactivity_days: 30
454+
# Should the bridge block traffic when a limit has been reached
455+
block_on_limit_reached: false
456+
457+
446458
# Telegram config
447459
telegram:
448460
# Get your own API keys at https://my.telegram.org/apps

mautrix_telegram/portal/matrix.py

+5
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,11 @@ async def _send_bridge_error(self, msg: str) -> None:
369369

370370
async def handle_matrix_message(self, sender: 'u.User', content: MessageEventContent,
371371
event_id: EventID) -> None:
372+
if self.bridge.is_blocked:
373+
self.log.debug(f"Bridge is blocked, dropping matrix event {event_id}")
374+
await self._send_bridge_error(f"\u26a0 The bridge is blocked due to reaching its user limit")
375+
return
376+
372377
try:
373378
await self._handle_matrix_message(sender, content, event_id)
374379
except RPCError as e:

mautrix_telegram/portal/telegram.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
from mautrix.bridge import NotificationDisabler
4545

4646
from ..types import TelegramID
47-
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
47+
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile, UserActivity
4848
from ..util import sane_mimetypes
4949
from ..context import Context
5050
from ..tgclient import TelegramClient
@@ -613,6 +613,10 @@ async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int]
613613

614614
async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
615615
evt: Message) -> None:
616+
if self.bridge.is_blocked:
617+
self.log.debug(f"Bridge is blocked, dropping telegram message {evt.id}")
618+
return
619+
616620
if not self.mxid:
617621
self.log.trace("Got telegram message %d, but no room exists, creating...", evt.id)
618622
await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
@@ -704,6 +708,7 @@ async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet
704708
await intent.redact(self.mxid, event_id)
705709
return
706710

711+
UserActivity.update_for_puppet(sender, evt.date)
707712
self.log.debug("Handled telegram message %d -> %s", evt.id, event_id)
708713
try:
709714
DBMessage(tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=event_id,

0 commit comments

Comments
 (0)