Skip to content

Commit

Permalink
fix telegram polling interface
Browse files Browse the repository at this point in the history
Now the polling process is managed by telebot.
  • Loading branch information
RLKRo committed Dec 7, 2023
1 parent 04c02c7 commit ec19921
Showing 1 changed file with 23 additions and 54 deletions.
77 changes: 23 additions & 54 deletions dff/messengers/telegram/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import asyncio
from typing import Any, Optional, List, Tuple, Callable

from telebot import types, logger
from telebot import types, apihelper

from dff.script import Context
from dff.messengers.common import PollingMessengerInterface, PipelineRunnerFunction, CallbackMessengerInterface
from dff.messengers.common import MessengerInterface, PipelineRunnerFunction, CallbackMessengerInterface
from .messenger import TelegramMessenger
from .message import TelegramMessage, Message
from .message import TelegramMessage

try:
from flask import Flask, request, abort
Expand All @@ -24,6 +23,9 @@
request, abort = None, None


apihelper.ENABLE_MIDDLEWARE = True


def extract_telegram_request_and_id(
update: types.Update, messenger: Optional[TelegramMessenger] = None
) -> Tuple[TelegramMessage, int]: # pragma: no cover
Expand Down Expand Up @@ -77,7 +79,7 @@ def extract_telegram_request_and_id(
return message, ctx_id


class PollingTelegramInterface(PollingMessengerInterface): # pragma: no cover
class PollingTelegramInterface(MessengerInterface): # pragma: no cover
"""
Telegram interface that retrieves updates by polling.
Multi-threaded polling is currently not supported.
Expand Down Expand Up @@ -111,60 +113,27 @@ def __init__(
long_polling_timeout: int = 20,
messenger: Optional[TelegramMessenger] = None,
):
self.messenger = messenger if messenger is not None else TelegramMessenger(token)
self.interval = interval
self.messenger = (
messenger if messenger is not None else TelegramMessenger(token, suppress_middleware_excepions=True)
)
self.allowed_updates = allowed_updates
self.interval = interval
self.timeout = timeout
self.long_polling_timeout = long_polling_timeout
self._last_processed_update = -1
self._stop_polling = asyncio.Event()

def _request(self) -> List[Tuple[Message, int]]:
updates = self.messenger.get_updates(
offset=(self.messenger.last_update_id + 1),
allowed_updates=self.allowed_updates,
timeout=self.timeout,
long_polling_timeout=self.long_polling_timeout,
)
update_list = [extract_telegram_request_and_id(update, self.messenger) for update in updates]
return update_list

def _respond(self, response: List[Context]):
for resp in response:
self.messenger.send_response(resp.id, resp.last_response)
update_id = getattr(resp.last_request, "update_id", None)
if update_id is not None:
if update_id > self._last_processed_update:
self._last_processed_update = update_id

def _on_exception(self, e: Exception):
logger.error(e)
self._stop_polling.set()

def forget_processed_updates(self):
"""
Forget updates already processed by the pipeline.
"""
self.messenger.get_updates(
offset=self._last_processed_update + 1,
allowed_updates=self.allowed_updates,
timeout=1,
long_polling_timeout=1,
)

async def connect(self, callback: PipelineRunnerFunction, loop: Optional[Callable] = None, *args, **kwargs):
self._stop_polling.clear()

try:
await super().connect(
callback, loop=loop or (lambda: not self._stop_polling.is_set()), timeout=self.interval
)
finally:
self.forget_processed_updates()

def stop(self):
"""Stop polling."""
self._stop_polling.set()
def dff_middleware(bot_instance, update):
message, ctx_id = extract_telegram_request_and_id(update, self.messenger)

ctx = asyncio.run(callback(message, ctx_id))

bot_instance.send_response(ctx_id, ctx.last_response)

self.messenger.middleware_handler()(dff_middleware)

self.messenger.infinity_polling(
timeout=self.timeout, long_polling_timeout=self.long_polling_timeout, interval=self.interval
)


class CallbackTelegramInterface(CallbackMessengerInterface): # pragma: no cover
Expand Down

0 comments on commit ec19921

Please sign in to comment.