-
Notifications
You must be signed in to change notification settings - Fork 1
/
TelegramBot.py
145 lines (108 loc) · 4.59 KB
/
TelegramBot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import sys
from time import sleep
import logging
from telegram import (ReplyKeyboardMarkup, ReplyKeyboardRemove, TelegramError, Bot, ChatAction)
from telegram.ext import (Updater, CommandHandler, MessageHandler, Filters,
ConversationHandler)
from telegram.error import (TelegramError, Unauthorized, BadRequest,
TimedOut, ChatMigrated, NetworkError)
from query import *
from graph import *
#Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
CHOICE, LOCATION, CHOOSING = range(3)
token = sys.argv[1]
#use_context=True to use the new context based callbacks
updater = Updater(token, use_context=True)
bot = Bot(token = token)
def start(update, context):
reply_keyboard = [['Dati dalla posizione','Esci']]
update.message.reply_text(
'Ciao! il mio nome è PandemicTrackerBot, come posso aiutarti?',
reply_markup=ReplyKeyboardMarkup(reply_keyboard, resize_keyboard=True, one_time_keyboard=True))
return CHOICE
def choice(update, context):
user = update.message.from_user
logger.info("Choice of %s: %s", user.first_name, update.message.text)
reply_keyboard = [['Altri Dati','Esci']]
if update.message.text == 'Dati dalla posizione':
update.message.reply_text('Condividimi una posizione', reply_markup=ReplyKeyboardRemove())
return LOCATION
return CHOOSING
def location(update, context):
user = update.message.from_user
user_location = update.message.location
logger.info("Location of %s: %f / %f", user.first_name, user_location.latitude,
user_location.longitude)
bot.sendChatAction(chat_id=update.message.chat_id, action=ChatAction.TYPING)
province = get_province_for(user_location.latitude, user_location.longitude)
station = get_station_for(user_location.latitude, user_location.longitude)
observations = get_observations_for(province[0], station[0])
image = plot_for(province, station, observations)
bot.send_photo(chat_id=update.message.chat_id, photo=image)
reply_keyboard = [['Altri Dati','Esci']]
update.message.reply_text(
'Spero che tu stia bene',
reply_markup=ReplyKeyboardMarkup(reply_keyboard, resize_keyboard=True, one_time_keyboard=True))
return CHOOSING
def choosing(update, context):
user = update.message.from_user
logger.info("Chosing")
reply_keyboard = [['Dati dalla posizione','Esci']]
if update.message.text == 'Altri Dati':
update.message.reply_text(
"Chiedimi pure!",
reply_markup=ReplyKeyboardMarkup(reply_keyboard, resize_keyboard=True, one_time_keyboard=True))
return CHOICE
elif update.message.text == 'Esci':
return ConversationHandler.END
def done(update, context):
# user = update.message.from_user
logger.info("End")
update.message.reply_text('Alla prossima!', reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
def error_handler(update, context):
try:
raise context.error
except Unauthorized:
# remove update.message.chat_id from conversation list
print("a")
except BadRequest:
# handle malformed requests - read more below!
print("b")
except TimedOut:
# handle slow connection problems
print("c")
except NetworkError:
# handle other connection problems
print("d")
except ChatMigrated as e:
# the chat_id of a group has changed, use e.new_chat_id instead
print("e")
except TelegramError:
# handle all other telegram related errors
print("f")
def main():
dp = updater.dispatcher
# conversation handler
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', start)],
states={
CHOICE: [MessageHandler(Filters.regex('^(Dati dalla posizione)$'), choice)],
LOCATION: [MessageHandler(Filters.location, location)],
CHOOSING: [MessageHandler(Filters.regex('^Altri Dati$'), choosing)]
},
fallbacks=[MessageHandler(Filters.regex('^Esci$'), done)]
)
dp.add_handler(conv_handler)
dp.add_error_handler(error_handler)
# Start the Bot
updater.start_polling()
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()