-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper.py
449 lines (385 loc) · 21.5 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
#%%
from __future__ import annotations #fixes type checking for a class within itself
from selenium import webdriver
#from selenium.webdriver import WebDriver
import logging
from time import sleep, time
from selenium.webdriver.remote.webelement import WebElement
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException, ElementClickInterceptedException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from typing import List
import os
from getpass import getpass
if not os.path.isfile('secret.py'): #TODO: robustify
with open('secret.py', 'w') as f:
f.write('email = "' + input('discord email: ') + '"\n')
f.write('pw = "' + getpass('discord password: ') + '"\n')
f.write('mudae_channel = "' + input('mudae discord channel link: ') + '"\n')
import secret
class HelperBot: #should rename to DiscordMessanger or DiscordDriver - and maybe have it subclass the driver?
"""An object which helps interact with a 'standard' discord page"""
#TODO: make more generally applicable to any discord use - maybe rename from HelperBot
#CONSTANTS
text_field_element_selector = '#app-mount > div.app-1q1i1E > div > div.layers-3iHuyZ.layers-3q14ss > div > div > div > div.content-98HsJk > div.chat-3bRxxu > div > main > form > div > div > div > div > div.textArea-12jD-V.textAreaSlate-1ZzRVj.slateContainer-3Qkn2x > div.markup-2BOw-j.slateTextArea-1Mkdgw.fontSize16Padding-3Wk7zP > div'
def __init__(self):
self.text_field: WebElement
self.driver = webdriver.Chrome()
self.login()
WebDriverWait(self.driver, timeout=15, poll_frequency=1).until(lambda d: d.find_element_by_class_name('container-1r6BKw'))
self.driver.get(secret.mudae_channel)
WebDriverWait(self.driver, timeout=15, poll_frequency=1).until(lambda d: d.find_element_by_css_selector(HelperBot.text_field_element_selector))
#self.text_field = self.driver.find_element_by_css_selector(HelperBot.text_field_element_selector)
self.scroll_to_bottom()
self.username: str
self.username = ''.join(self.driver.find_element_by_class_name('nameTag-3uD-yy').text.split('\n'))
@property
def text_field(self):
WebDriverWait(self.driver, timeout=10, poll_frequency=0.02).until(lambda d: d.find_element_by_css_selector(HelperBot.text_field_element_selector))
return self.driver.find_element_by_css_selector(HelperBot.text_field_element_selector)
def login(self):
self.driver.get('https://discord.com/login')
WebDriverWait(self.driver, timeout=15, poll_frequency=0.2).until(lambda d: d.find_element_by_xpath('//*[@id="app-mount"]/div[2]/div/div[2]/div/div/form/div/div/div[1]/div[3]/div[1]/div/input'))
self.driver.find_element_by_xpath('//*[@id="app-mount"]/div[2]/div/div[2]/div/div/form/div/div/div[1]/div[3]/div[1]/div/input').send_keys(secret.email)
self.driver.find_element_by_xpath('//*[@id="app-mount"]/div[2]/div/div[2]/div/div/form/div/div/div[1]/div[3]/div[2]/div/input').send_keys(secret.pw)
self.driver.find_element_by_xpath('//*[@id="app-mount"]/div[2]/div/div[2]/div/div/form/div/div/div[1]/div[3]/button[2]').click()
def send_message(self, text: str):
"""sends a message to the chat and returns the Message object in the chat"""
most_recent_message = Message.get_context(self.driver)[0]
self.text_field.send_keys(''.join([text, '\n']))
my_message = text #replace text with Message object once found using the helper function
def message_in_chat(driver):
nonlocal most_recent_message
nonlocal my_message
static_context = Message.get_context(driver) #don't want it to append items to context in another thread
for msg in static_context:
if msg.context_index >= most_recent_message.context_index:
most_recent_message = static_context[0]
return False
elif msg.author == self.username and msg.web_element.find_element_by_class_name('messageContent-2qWWxC').text == text:
my_message = msg
return True
#if the message author is me, and the message contents are what i sent, then
WebDriverWait(self.driver, timeout=10, poll_frequency=0.001).until(message_in_chat, 'tried finding sent message in chat')
return my_message
def scroll_to_bottom(self): #TODO: rewrite with js & self.driver.execute_script
self.driver.find_element_by_class_name('public-DraftStyleDefault-block').send_keys('in:mudae-rolls'+Keys.ENTER)
WebDriverWait(self.driver, timeout=10, poll_frequency=0.2).until(lambda d: d.find_elements_by_class_name('hit-1fVM9e'))
self.driver.find_element_by_class_name('hit-1fVM9e').find_element_by_class_name('header-23xsNx').click()
self.driver.find_element_by_class_name('jumpButton-JkYoYK').click()
self.driver.find_element_by_class_name('icon-38sknP').click()
def scroll_chat_down(self):
Message.get_context(self.driver)
#self.driver.find_element_by_class_name('scroller-2LSbBU').click()
#TODO: fix focusing issure
self.driver.execute_script('arguments[0].click()', self.driver.find_element_by_class_name('scroller-2LSbBU'))
webdriver.ActionChains(self.driver).send_keys(Keys.END).perform()
# myabe change Message.get_context to take driver as a param instead of bot (so that i could use 'd' in the lambda function - although i don't know if it matters)
# page has loaded if ~the page is at the bottom of the chat~ or ~the page has loaded any new messages (since all new messages load at once)~
WebDriverWait(self.driver, timeout=20, poll_frequency=0.02).until(lambda d: len(d.find_elements_by_class_name('wrapper-3vR61M')) < 2 or Message._context[0] != Message.get_context(d)[0])
class Message:
"""Wrapper for 'message' WebElements within the discord html"""
_context: List[Message]
_context = [] #holds messages based on what an associated HelperBot sees - in newest frist order
running_get_context = False
#CONSTANTS
message_element_class_name = 'message-2qnXI6'
message_element_group_start_class_name = 'groupStart-23k01U' # a sepecial message element that is at the beginning of multiple messages from the same person
author_element_class_name = 'username-1A8OIy'
bot_verif_element_class_name = 'botText-1526X_'
reactions_element_class_name = 'reaction-1hd86g'
def __init__(self, element: WebElement, driver, context_index=None, group_starter=None):
self._web_element: WebElement
self._web_element = element
#print(self.web_element.text)
#self.web_element.screenshot('message.png')
self.id: str
self.id = self._web_element.get_property('id')
self.driver = driver
self.context_index: int
self.context_index = context_index
self.send_time = time()
self.content: str
self.content = self.web_element.find_element_by_class_name('messageContent-2qWWxC').text
self.is_group_starter: bool
self.is_group_starter = Message.message_element_group_start_class_name in self.web_element.get_attribute('class').split(' ')
self.group_starter: Message
if self.is_group_starter:
self.group_starter = self
else:
self.group_starter = group_starter
self._author: str
if self.is_group_starter:
try:
driver.execute_script('arguments[0].click()', self.web_element.find_element_by_class_name(Message.author_element_class_name))
self._author = ''.join(driver.find_element_by_class_name('nameTag-m8r81H').text.split('\n'))
driver.execute_script('arguments[0].click()', self.web_element.find_element_by_class_name(Message.author_element_class_name))
except NoSuchElementException:
#can occur because of pinned messages
self._author = 'Pin'
#except StaleElementReferenceException: # replace the wait at the beginning of the try with this if it doesn't work
# Message.running_get_context
# self._author = 'Unknown'
else:
self._author = None #None means unknown in this context
self._is_from_bot: bool
if self.is_group_starter:
try:
self.web_element.find_element_by_class_name(Message.bot_verif_element_class_name)
self._is_from_bot = True
except NoSuchElementException:
self._is_from_bot = False
else:
self._is_from_bot = None
self.is_viewed = False
@property
def web_element(self):
return self.driver.find_element_by_id(self.id)
@property
def author(self) -> str:
if self.group_starter is not None and self._author is None:
self._author = self.group_starter._author
return self._author
@property
def is_from_bot(self):
if self.group_starter is not None and self._is_from_bot is None:
self._is_from_bot = self.group_starter._is_from_bot
return self._is_from_bot
@staticmethod
def get_context(driver) -> List[Message]:
#I think that we never want two different threads to be running this at the same time
# - I may be doing this in a completely dumb way
WebDriverWait(driver, timeout=30, poll_frequency=0.001).until_not(lambda d: Message.running_get_context)
Message.running_get_context = True
#get rid of stale messages
fresh_old_messages: List[Message]
fresh_old_messages = []
for old_message in Message._context:
old_message: Message
try:
#use any method for a staleness check
old_message.web_element.is_enabled()
fresh_old_messages.append(old_message)
except StaleElementReferenceException:
old_message.context_index = None # this should not be necessary for anything
except NoSuchElementException: #this could happen if the message is deleted
old_message.context_index = None
#repalce Message._context with the new messages (while not rewriting old messages that are still fresh)
web_elements: List[WebElement]
web_elements = driver.find_elements_by_class_name(Message.message_element_class_name)[::-1] #newest first order
new_context: List[Message]
new_context = []
fresh_old_messages_start_index = None
for i, web_element in enumerate(web_elements):
if i >= 40: #only allow Message._context to hold max of 40 elements
break
#this only fully makes sense if we are never scrolling upward
elif len(fresh_old_messages) > 0 and web_element.get_property('id') == fresh_old_messages[0].id:
new_context.extend(fresh_old_messages)
fresh_old_messages_start_index = i
if len(new_context) > 40:
new_context = new_context[:40]
break
new_message = Message(web_element, driver)
new_context.append(new_message) #appends by newest first
i = len(new_context)-1
while i >= 0:
#update message old indices to match new messages
if fresh_old_messages_start_index is not None and i >= fresh_old_messages_start_index:
new_context[i].context_index = i
i -= 1
continue
#guarantee that new_context[i] has a group_starter
if new_context[i].group_starter is None:
if i+1 < len(new_context):
new_context[i].group_starter = new_context[i+1].group_starter
else: #pop the last element if no group_starter
new_context.pop()
i -= 1
continue
#classify type of message
if new_context[i].is_from_bot:
new_context[i] = MudaeMessage(new_context[i].web_element, driver, context_index=i, group_starter=new_context[i].group_starter)
if LotteryMessage.is_lottery_message(new_context[i]): #check if lottery message
new_context[i] = LotteryMessage(new_context[i].web_element, driver, context_index=i, group_starter=new_context[i].group_starter)
else:
new_context[i].context_index = i
i -= 1
#set context
Message._context = new_context
Message.running_get_context = False
return Message._context
def __repr__(self):
str_rep = \
f'''MESSAGE OBJECT:
\tid - {self.id}
\tauthor - {self.author},
\tis_group_starter - {self.is_group_starter},
\tgroup_starter.id - {self.group_starter.id},
\tcontext_index - {self.context_index},
\tsubclass - {type(self).__name__},
\tcontent - {self.content}
'''
return str_rep
def __eq__(self, other: Message) -> bool:
return self.id == other.id
def __ne__(self, other):
return not self.__eq__(other)
class MudaeMessage(Message): #message from a bot (not necessarily Mudae) - TODO: make more robust to ensure message is from Mudae
def __init__(self, element: WebElement, driver, context_index=None, group_starter=None):
Message.__init__(self, element, driver, context_index=context_index, group_starter=group_starter)
assert self.is_from_bot, 'Message is not from a bot'
class LotteryMessage(MudaeMessage):
#CONSTANTS
character_name_element_class_name = 'embedAuthorName-3mnTWj'
embed_description_element_class_name = 'embedDescription-1Cuq9a'
message_footer_element_class_name = 'embedFooterText-28V_Wb'
def __init__(self, element: WebElement, driver, context_index=None, group_starter=None):
MudaeMessage.__init__(self, element, driver, context_index=context_index, group_starter=group_starter)
assert self.is_lottery_message(), f'This is not a LotteryMessage {self}'
self.character: str
self.character = self.web_element.find_element_by_class_name(LotteryMessage.character_name_element_class_name).text
self.value: int
self.value = int(self.web_element.find_element_by_class_name(LotteryMessage.embed_description_element_class_name).find_element_by_tag_name('strong').text)
self.is_married: bool
try:
message_footer = self.web_element.find_element_by_class_name(LotteryMessage.message_footer_element_class_name).text
self.is_married = 'Belongs to' in message_footer
except NoSuchElementException:
self.is_married = False
def click_reaction(self, bot: HelperBot, index=0):
reaction_elements: List[WebElement]
WebDriverWait(bot.driver, timeout=10, poll_frequency=0.01).until(
lambda d: self.web_element.find_elements_by_class_name(Message.reactions_element_class_name))
reaction_elements = self.web_element.find_elements_by_class_name(Message.reactions_element_class_name)
reaction_start_state = 'reactionMe-wv5HKu' in reaction_elements[index].get_attribute('class')
print(self.web_element.text)
bot.driver.execute_script('arguments[0].click()', reaction_elements[index].find_element_by_class_name('reactionInner-15NvIl'))
print('click')
reaction_end_state = 'reactionMe-wv5HKu' in reaction_elements[index].get_attribute('class')
assert reaction_start_state != reaction_end_state, f'start_state: {reaction_start_state}, end_state: {reaction_end_state}'
def is_lottery_message(self) -> bool:
img_src_text = ''
full_desc_text = ''
try:
images = self.web_element.find_element_by_class_name(LotteryMessage.embed_description_element_class_name).find_elements_by_tag_name('img')
if len(images) == 1:
img_src_text = images[0].get_property('src') #why does get_property work here but not other places??
full_desc_text = self.web_element.find_element_by_class_name(LotteryMessage.embed_description_element_class_name).text
except:
return False
return img_src_text == 'https://cdn.discordapp.com/emojis/469835869059153940.png?v=1' and 'Claims: #' in full_desc_text and 'Likes: #' in full_desc_text
def __repr__(self):
str_rep = \
f'''MESSAGE OBJECT:
\tid - {self.id}
\tauthor - {self.author},
\tis_group_starter - {self.is_group_starter},
\tgroup_starter.id - {self.group_starter.id},
\tcontext_index - {self.context_index},
\tsubclass - {type(self).__name__},
\tcharacter - {self.character}
'''
return str_rep
# %%
bot = HelperBot()
# %% notes
#keep a record of currently active people
#data:
# last message time
# if they have used their roll
# if they have used their claim
#
#create a selector based on myanimelist?
#saved constants
search_chat = 'in:mudae-rolls'
jump_button = 'jumpButton-JkYoYK'
search_input_field = 'public-DraftStyleDefault-block'
search_message = 'hit-1fVM9e'
search_exit = 'icon-38sknP'
chat_section = 'messagesWrapper-1sRNjr'
#unused
time_stamp_element_selector = 'span.timestamp-3ZCmNB > span' # get aria-label property
#%%
#log this data somewhere
from threading import Thread
from typing import Callable
from selenium.common.exceptions import TimeoutException
def send_and_await(message: str, bot: HelperBot, tries: int, sleep_time: float, is_response: Callable[[Message], bool]):
for i in range(tries):
returned_message: Message
#print(i)
returned_message = bot.send_message(message)
#print('mine', returned_message)
def found_response(driver):
possible_responses = Message.get_context(driver)
# i save the index because im worried that Message.get_context will be called outside of the thread (i fixed this so that only one thread can run it at a time)
returned_message_index = returned_message.context_index
if returned_message_index is not None:
possible_responses = possible_responses[:returned_message_index]
for msg in possible_responses:
#check if msg matches outline of the expected response
if is_response(msg): #expected response was found
print(f'response found successfully after {i+1} attempts')
return True
return False
#await response
try:
WebDriverWait(bot.driver, timeout=sleep_time, poll_frequency=0.001).until(found_response)
#success
return
except TimeoutException:
if i+1 >= tries: #weird if condition but i think it makes sense
raise TimeoutException(f'Response message was not found in {tries} attempts')
def interval_actions(bot: HelperBot):
sleep(8)
send_logger = logging.getLogger(__name__)
send_logger.setLevel(logging.DEBUG)
send_handler = logging.FileHandler('sending.log')
send_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
send_logger.addHandler(send_handler)
while True:
start_time = time()
try:
send_and_await('$ma', bot, tries=15, sleep_time=4, is_response=lambda m: isinstance(m, MudaeMessage) and m.content.startswith('{0}, the roulette is limited to'.format(bot.username.split('#')[0])))
send_and_await('$p', bot, tries=3, sleep_time=10, is_response=lambda m: isinstance(m, MudaeMessage) and m.content.startswith('One try per interval of'))
send_and_await('$dk', bot, tries=3, sleep_time=5, is_response=lambda m: isinstance(m, MudaeMessage) and m.content.startswith('Next $dk reset in'))
send_and_await('$daily', bot, tries=3, sleep_time=5, is_response=lambda m: isinstance(m, MudaeMessage) and m.content.startswith('Next $daily reset in'))
except Exception as e:
send_logger.exception('sending thread failed')
end_time = time()
time_delta = end_time - start_time
sleep(3600-time_delta) #sleep for an hour
from wishes import wishlist
def start_loop():
#bot.scroll_chat_down()
Thread(target=interval_actions, args=[bot]).start()
while True:
new_msg = False
for msg in Message.get_context(bot.driver):
#print(msg)
if msg.is_viewed:
break
elif isinstance(msg, LotteryMessage):
msg: LotteryMessage
if msg.is_married:
msg.click_reaction(bot)
print(' - '.join(['married', msg.character]))
elif msg.character in wishlist or msg.value > 200:
msg.click_reaction(bot)
print(' - '.join(['I just married', msg.character]))
else:
print(' - '.join(['not married', msg.character]))
print(msg)
new_msg = True
msg.is_viewed = True
if new_msg:
print('==== end of message chain ====')
bot.scroll_chat_down()
if __name__ == '__main__':
Message._context = [] # for jupyter server debugging
try:
start_loop()
except StaleElementReferenceException as e:
bot.driver.save_screenshot('debug.png')
raise e