Skip to content

Commit 047a8da

Browse files
authored
Merge pull request #1 from vector-im/hs/dont-block-connections-on-startup
Fix the startup regression, move throttling code to __main__, remove block_startup
2 parents 3e6dee3 + d6bbce7 commit 047a8da

File tree

4 files changed

+30
-41
lines changed

4 files changed

+30
-41
lines changed

mautrix_telegram/__main__.py

+29-13
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,14 @@ class TelegramBridge(Bridge):
6161
matrix_class = MatrixHandler
6262

6363
config: Config
64+
context: Context
6465
session_container: AlchemySessionContainer
6566
bot: Bot
6667

6768
periodic_active_metrics_task: asyncio.Task
6869
is_blocked: bool = False
6970

70-
periodic_sync_task: asyncio.Task
71+
periodic_sync_task: asyncio.Task = None
7172

7273
def prepare_db(self) -> None:
7374
super().prepare_db()
@@ -76,29 +77,28 @@ def prepare_db(self) -> None:
7677
engine=self.db, table_base=Base, session=False,
7778
table_prefix="telethon_", manage_tables=False)
7879

79-
def _prepare_website(self, context: Context) -> None:
80+
def _prepare_website(self) -> None:
8081
if self.config["appservice.public.enabled"]:
8182
public_website = PublicBridgeWebsite(self.loop)
8283
self.az.app.add_subapp(self.config["appservice.public.prefix"], public_website.app)
83-
context.public_website = public_website
84+
self.context.public_website = public_website
8485

8586
if self.config["appservice.provisioning.enabled"]:
86-
provisioning_api = ProvisioningAPI(context)
87+
provisioning_api = ProvisioningAPI(self.context)
8788
self.az.app.add_subapp(self.config["appservice.provisioning.prefix"],
8889
provisioning_api.app)
89-
context.provisioning_api = provisioning_api
90+
self.context.provisioning_api = provisioning_api
9091

9192
def prepare_bridge(self) -> None:
9293
self.bot = init_bot(self.config)
93-
context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot)
94-
self._prepare_website(context)
95-
self.matrix = context.mx = MatrixHandler(context)
94+
self.context = Context(self.az, self.config, self.loop, self.session_container, self, self.bot)
95+
self._prepare_website()
96+
self.matrix = self.context.mx = MatrixHandler(self.context)
9697

97-
init_abstract_user(context)
98-
init_formatter(context)
99-
init_portal(context)
100-
self.add_startup_actions(init_puppet(context))
101-
self.add_startup_actions(init_user(context))
98+
init_abstract_user(self.context)
99+
init_formatter(self.context)
100+
init_portal(self.context)
101+
self.add_startup_actions(init_puppet(self.context))
102102

103103
if self.bot:
104104
self.add_startup_actions(self.bot.start())
@@ -109,6 +109,22 @@ def prepare_bridge(self) -> None:
109109
if self.config['bridge.limits.enable_activity_tracking'] is not False:
110110
self.periodic_sync_task = self.loop.create_task(self._loop_active_puppet_metric())
111111

112+
async def start(self) -> None:
113+
await super().start()
114+
115+
semaphore = None
116+
concurrency = self.config['telegram.connection.concurrent_connections_startup']
117+
if concurrency:
118+
semaphore = asyncio.Semaphore(concurrency)
119+
await semaphore.acquire()
120+
121+
async def sem_task(task):
122+
if not semaphore:
123+
return await task
124+
async with semaphore:
125+
return await task
126+
127+
await asyncio.gather(*(sem_task(task) for task in init_user(self.context)))
112128

113129
async def resend_bridge_info(self) -> None:
114130
self.config["bridge.resend_bridge_info"] = False

mautrix_telegram/config.py

-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
212212
copy("telegram.connection.retry_delay")
213213
copy("telegram.connection.flood_sleep_threshold")
214214
copy("telegram.connection.request_retries")
215-
copy("telegram.connection.block_startup")
216215
copy("telegram.connection.concurrent_connections_startup")
217216

218217
copy("telegram.device_info.device_model")

mautrix_telegram/example-config.yaml

-4
Original file line numberDiff line numberDiff line change
@@ -505,10 +505,6 @@ telegram:
505505
# is not recommended, since some requests can always trigger a call fail (such as searching
506506
# for messages).
507507
request_retries: 5
508-
# Should the bridge block startup until all telegram connections have been made for all puppets.
509-
# This should be disable for bridges with a large amount of puppets to prevent an extended startup
510-
# period. Defaults to true
511-
block_startup: true
512508
# How many concurrent connections should be handled on startup. Set to 0 to allow unlimited connections
513509
# Defualts to 0
514510
concurrent_connections_startup: 0

mautrix_telegram/user.py

+1-23
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#
1414
# You should have received a copy of the GNU Affero General Public License
1515
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16-
from asyncio.futures import Future
1716
from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast,
1817
TYPE_CHECKING)
1918
from datetime import datetime, timezone
@@ -684,27 +683,6 @@ def init(context: 'Context') -> Future:
684683
global config
685684
config = context.config
686685
User.bridge = context.bridge
687-
concurrency = config.get("telegram.connection.concurrent_connections_startup", 0)
688-
semaphore = None
689-
block_startup = config.get("telegram.connection.block_startup", True)
690686

691-
if concurrency:
692-
semaphore = asyncio.Semaphore(concurrency)
693-
694-
async def sem_task(task, index):
695-
async with semaphore:
696-
print(f"sem_task: ${index}")
697-
return await task
698-
699-
tasks = (User.from_db(db_user).try_ensure_started()
687+
return (User.from_db(db_user).try_ensure_started()
700688
for db_user in DBUser.all_with_tgid())
701-
future = None
702-
if semaphore:
703-
future = asyncio.gather(*(sem_task(tasks[i], i) for i in range(0, len(tasks))))
704-
else:
705-
future = asyncio.gather(*tasks)
706-
707-
if block_startup:
708-
return future
709-
else:
710-
asyncio.create_task(future)

0 commit comments

Comments
 (0)