-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
1287 lines (1130 loc) · 42.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sys
from importlib import reload
import socket
import re
from os import environ
from time import sleep as pause
from time import ctime as now
from traceback import print_tb, print_exc
from . import hooks, threads, parse
from .parse import Parser
from threading import Thread, Timer
from typing import TypeVar, Optional, Any, NoReturn, Union, Callable, List, Tuple, Pattern
action_regex = re.compile(r"^ACTION (.*)")
T_Parser = TypeVar('T_Parser', bound=Parser)
class Base(object):
def __init__(self, host: str, **kwargs) -> None:
"""
Constructor
Initializes a new pIRC.Base with provided config variables.
"""
nick = "pIRCBot" if self.__class__ == Base else self.__class__.__name__
if 'nick' in kwargs.keys():
nick = kwargs['nick']
passphrase = environ.get(f"{nick}_PASSPHRASE", None)
# setup default values
self.config = {
# Host to connect.
'host': host,
# Port to connect.
'port': 6667,
# The 'nick' part of 'nick!user@host * name'
'nick': nick,
# The name used to identify the connection to the server.
'ident': nick.lower(),
# The 'user' part of 'nick!user@host * name'
'name': nick,
# Passphrase used to authenticate against auth servers (logging in)
'passphrase': passphrase,
# The custom display name
'realname': "pIRC Bot",
# Channels that the bot will auto-connect to
'channels': [],
# Prefix that the bot looks for when scanning for custom commands
'command': '!',
# determines whether multiple matches are allowed per recieved line/msg
'break_on_match': True,
# Extremely detailed output for logging
'verbose': True,
# Dictionary of keywords that get replaced by its value
# or result of the value if the value is callable
# Keywords searched for in the form of :keyword:
'replace': {},
# Automatically reconnect
'reconnect': True
}
# update with passed config values
self.config.update(kwargs)
self.config['replace'].setdefault('command', self.config['command'])
self._inbuffer = ""
self.socket = None
self.listeners = []
self.queued = []
self.ERROR = 0
self.ulist = {}
self.channels = {}
self.isupport = {}
self._quitting = False
self._running = False
self._registered = False
# init funcs
self._add_listeners()
if self.__class__ == Base:
self.load_hooks()
def load_hooks(self) -> None:
"""
TODO: Documentation
"""
access = self.__class__
if 'hookscripts' in self.config:
access = self
self._hooks = {}
for func in access.__dict__.values():
if callable(func) and hasattr(func, '_type'):
self._hooks.setdefault(func._type.lower(), [])
if hasattr(func, '_thread'):
self._hooks[func._type.lower()].append(
func._thread(func, self))
else:
self._hooks[func._type.lower()].append(func)
self._run_hooks('load')
def trigger(self, match: Union[dict, bool], func: Callable, temp: bool=False) -> None:
"""
TODO: Documentation
"""
self.listeners.append({'match': match, 'func': func, 'temp': temp})
def on_verb(self, verb: str, func: Callable, temp: bool=False) -> None:
"""
TODO: Documentation
"""
self.trigger({'verb': verb}, func, temp)
def on_code(self, num: int, func: Callable, temp: bool=False) -> None:
"""
TODO: Documentation
"""
self.trigger({'verb': '{:03d}'.format(num)}, func, temp)
def on_raw(self, func: Callable, temp: bool=False) -> None:
"""
TODO: Documentation
"""
self.trigger(True, func, temp)
def on(self, verb: Union[str, int, Callable], func: Callable = None) -> None:
"""
TODO: Documentation
"""
if type(verb) == int:
self.on_code(verb, func)
elif type(verb) == str:
self.on_verb(verb, func)
elif callable(verb):
self.on_raw(verb)
def once(self, verb: Union[str, int, Callable], func: Callable = None) -> None:
"""
TODO: Documentation
"""
if type(verb) == int:
self.on_code(verb, func, True)
elif type(verb) == str:
self.on_verb(verb, func, True)
elif callable(verb):
self.on_raw(verb, True)
def off(self, verb: Union[str, int, Callable], func: Callable = None) -> None:
"""
TODO: Documentation
"""
pass
def _add_listeners(self) -> None:
"""
TODO: Documentation
"""
# Custom hooks listeners
self._add_codes()
self._add_commands()
self.on_raw(self._run_RAW) # Catch ALL incoming messages
def _add_codes(self) -> None:
"""
TODO: Documentation
"""
# Default code commands for bot state management
self.on_code(5, self._005_compile_isupport)
self.on_code(353, self._353_compile_ulist)
self.on_code(443, self._443_alt_nick)
# Listener for code command hooks
self.on_verb(re.compile(r'^\d{3}$'), self._run_CODES)
def _add_commands(self) -> None:
"""
TODO: Documentation
"""
# Self management listeners
# TODO: find out if these need to be converted to code commands
self.on_verb('JOIN', self._on_join)
self.on_verb('PART', self._on_part)
self.on_verb('NICK', self._on_nick)
self.on_verb('QUIT', self._on_quit)
self.on_verb('MODE', self._on_mode)
self.on_verb('PING', self._on_ping)
self.on_verb('PONG', self._on_pong)
self.on_verb('ERROR', self._on_error)
# Listeners to run hooks
self.on_verb('NOTICE', self._run_NOTICE)
self.on_verb('PRIVMSG', self._run_PRIVMSG)
self.on_verb('QUIT', self._run_QUIT)
self.on_verb('PART', self._run_PART)
self.on_verb('NICK', self._run_NICK)
self.on_verb('JOIN', self._run_JOIN)
self.on_verb('MODE', self._run_MODE)
self.on_verb('PING', self._run_PING)
self.on_verb('PONG', self._run_PONG)
self.on_verb('ERROR', self._run_ERROR)
def _listen(self) -> None:
"""
Constantly listens to the input from the server. Since the messages come
in pieces, we wait until we receive 1 or more full lines to start parsing.
A new line is defined as ending in \r\n in the RFC, but some servers
separate by \n. This script takes care of both.
"""
while True:
if not self._quitting:
try:
self._inbuffer += self.socket.recv(2048).decode()
except socket.timeout:
pass
# Some IRC daemons disregard the RFC and split lines by \n rather than \r\n.
temp = self._inbuffer.split("\n")
self._inbuffer = temp.pop()
for line in temp:
# Strip \r from \r\n for RFC-compliant IRC servers.
line = line.rstrip('\r')
if len(line) == 0:
continue # skip empty lines
if self.config['verbose']:
print("({0}: {1}) << {2}".format(
self.config['name'],
self.config['host'],
line
))
self._running = True
self._run_listeners(line)
if self.queue:
self._running = True
while len(self.queued) > 0:
func, args, kwargs = self.queued.pop(0)
func(*args, **kwargs)
self._running = False
def _run_hooks(self, key: str, info: Optional[T_Parser] = None, once: Optional[bool] = None) -> bool:
"""
TODO: Documentation
"""
if key in self._hooks:
for n, func in enumerate(self._hooks[key]):
if info is None:
func(self)
if hasattr(func, '_once') and func._once is True:
del self._hooks[key][n]
if self.config['break_on_match']:
return False
elif not hasattr(func, '_match') or info.compare(func._match):
func(self, info.data)
if hasattr(func, '_once') and func._once is True:
del self._hooks[key][n]
if self.config['break_on_match']:
return False
return True
def _run_threads(self) -> True:
"""
TODO: Documentation
"""
if 'thread' in self._hooks:
for n, thread in enumerate(self._hooks['thread']):
if thread.is_shutdown() or thread.is_alive():
self._hooks[n] = thread.copy()
self._hooks[n].start()
else:
thread.start()
else:
return True
def _run_listeners(self, line: str) -> None:
"""
Each listener's associated regular expression is matched against raw IRC
input. If there is a match, the listener's associated function is called
with all the regular expression's matched subgroups.
"""
info = Parser(line)
for i, listener in enumerate(self.listeners):
temp = listener['temp'] is True
callback = listener['func']
if listener['match'] is True:
pass
elif listener['match'] is False:
continue
elif not info.compare(listener['match']):
continue
callback(info)
if temp: del self.listeners[i]
self._run_hooks('once', info, True)
def _run_PRIVMSG(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][1]
match = action_regex.match(info['message'])
if match is not None:
info['message'] = match.groups()[0]
self._run_hooks('action', info)
else:
if info['message'].startswith(self.config['command']):
info['message'] = info['message'][len(self.config['command']):]
if info['target'].startswith('#'):
if not self._run_hooks('chancommand', info):
return
else:
if not self._run_hooks('privcommand', info):
return
if not self._run_hooks('command', info):
return
if info['target'].startswith('#'):
if not self._run_hooks('channel', info):
return
else:
if not self._run_hooks('private', info):
return
if not self._run_hooks('privmsg', info):
return
def _run_NOTICE(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('notice', info):
return
def _run_QUIT(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('quit', info):
return
def _run_PART(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('part', info):
return
def _run_NICK(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('nick', info):
return
def _run_JOIN(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
if not self._run_hooks('join', info):
return
def _run_MODE(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('mode', info):
return
def _run_PING(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = None
info['message'] = info['args'][-1]
if not self._run_hooks('ping', info):
return
def _run_PONG(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('pong', info):
return
def _run_ERROR(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
info['message'] = info['args'][-1]
if not self._run_hooks('error', info):
return
def _run_CODES(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
info['target'] = info['args'][0]
if not self._run_hooks('error', info):
return
def _run_RAW(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
if not self._run_hooks('raw', info):
return
def _on_error(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
if not self._quitting:
self.ERROR += 1
raise Exception(info['args'][-1])
else:
self._quitting = False
def _on_ping(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
self._cmd('PONG', info['args'][-1])
def _on_pong(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
pass
def _005_compile_isupport(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
isupport = {}
for arg in info['args']:
if arg.find(' ') > -1:
break
x = arg.split('=')
if len(x) == 1:
x.append(None)
isupport[x[0]] = x[1]
if 'PREFIX' in isupport:
if 'PREFIX' not in self.isupport:
self.isupport['PREFIX'] = []
match = re.match(r'\((\w+)\)(\S+)', isupport['PREFIX'])
self.isupport['PREFIX'].extend(zip(match.group(1), match.group(2)))
def _353_compile_ulist(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
if info['args'][0] == '=': # TODO: implement type properly
info['args'].pop(0)
channel, args, *rest = info['args']
rest = rest
for user in args.split():
if 'PREFIX' in self.isupport:
u = list(user)
m = modes = ''
while u[0] in [y for x, y in self.isupport['PREFIX']]:
m += u.pop(0)
user = ''.join(u)
for mode, prefix in self.isupport['PREFIX']:
if prefix in m:
modes += mode
self.ulist.setdefault(user, {})
if modes:
self.ulist[user].update({channel: modes})
else:
self.ulist[user].update({channel: ''})
else:
self.ulist[user].update({channel: ''})
def _443_alt_nick(self) -> None:
"""
TODO: Documentation
"""
self.config['nick'] += '_'
self.nick(self.config['nick'])
def _on_mode(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
pass
# self._ulist_modes(info) # placeholder
def _ulist_modes(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
user, modes, *args = info['args']
state = None
offset = 0
modes = list(modes)
for n, mode in enumerate(modes):
if mode in '+-':
state = mode
offset += 1
else:
if not state:
continue
elif state == '+':
self.ulist[args[n-offset]].update(
{user: self.ulist[args[n-offset]][user]+mode}
)
elif state == '-':
self.ulist[args[n-offset]].update(
{user: self.ulist[args[n-offset]]
[user].replace(mode, '')}
)
def _on_join(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
self._manage_ulist(info) # placeholder
pass
def _on_part(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
# self._manage_ulist(info) # placeholder
pass
def _on_nick(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
# self._manage_ulist(info) # placeholder
pass
def _on_quit(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
# self._manage_ulist(info) # placeholder
pass
def _manage_ulist(self, info: T_Parser) -> None:
"""
TODO: Documentation
"""
u = info['source']['user']
args = info['args'][0]
if info['verb'] == 'JOIN':
self.ulist.setdefault(u, {})
self.ulist[u].update({'args': ''})
elif info['verb'] == 'NICK':
if u in self.ulist:
self.ulist[args] = self.ulist[u] # TODO: update args
del self.ulist[u]
if self.config['nick'] == u:
self.config['nick'] = args
elif info['verb'] == 'PART':
if u == self.config['nick']:
for u in self.ulist.keys():
del self.ulist[u][args]
if not len(self.ulist[u]):
del self.ulist[u]
elif u in self.ulist:
del self.ulist[u][args]
if not len(self.ulist[u]):
del self.ulist[u]
elif info['verb'] == 'QUIT':
if u in self.ulist:
del self.ulist[u]
# self.listeners.insert(0, {
# 'temp': True,
# 'func': self._init,
# 'match': {'command': 'PONG', 'args': ['_init_']} # verify
# })
# self.ping('_init_')
def _init(self) -> None:
"""
TODO: Documentation
"""
if self.config['passphrase']:
self._cmd("PRIVMSG", "NickServ", "identify {}".format(
self.config['passphrase']))
self.ERROR = 0
# Initialize (join rooms and start threads) if the bot is not
# auto-identifying, or has just identified.
if self.config['channels']:
self.join(*self.config['channels'])
# TODO: This doesn't ensure that threads run at the right time, e.g.
# after the bot has joined every channel it needs to.
self._run_threads()
self._run_hooks('connect')
def queue(self, func: Callable, *args, **kwargs) -> None:
"""
TODO: Documentation
"""
self.queued.append((func, args, kwargs))
def _cmd(self, cmd: str, *args) -> None:
usecolon = False
for x in args:
if x.find(' ') > -1 or len(x) == 0:
usecolon = True
if not usecolon:
cmd += ' {0}'.format(x)
else:
cmd += ' :{0}'.format(x)
self._raw_cmd(cmd)
def _raw_cmd(self, raw_line: str) -> None:
if self.config['verbose']:
# prints to console, does not support UTF8. Convert to \u style output?
print("({0}: {1}) >> {2}".format(
self.config['name'],
self.config['host'],
raw_line
# "".join([x if ord(x) < 128 else '?' for x in raw_line])
))
try:
print
self.socket.send(str.encode(raw_line+"\r\n"))
except socket.timeout:
print(">>>Socket timed out.")
# Functions that are common use case for sending commands to the server
@hooks.queue()
def message(self, targets: Union[List[str], str], messages: Union[List[Tuple[str, int]], List[str], str]) -> None:
"""
TODO: Documentation
"""
if isinstance(targets, str):
targets = [targets]
if isinstance(messages, str):
messages = [messages]
for y in messages:
z = 1
if isinstance(y, tuple) and len(y) == 2:
y, z = y
for x in targets:
self._cmd("PRIVMSG", x, str(y))
pause(z)
@hooks.queue()
def notice(self, targets: Union[List[str], str], messages: Union[List[Tuple[str, int]], List[str], str]) -> None:
"""
TODO: Documentation
"""
if isinstance(targets, str):
targets = [targets]
if isinstance(messages, str):
messages = [messages]
for y in messages:
z = 1
if isinstance(y, tuple) and len(y) == 2:
y, z = y
for x in targets:
self._cmd("NOTICE", x, str(y))
pause(z)
@hooks.queue()
def me(self, targets: Union[List[str], str], messages: Union[List[Tuple[str, int]], List[str], str]) -> None:
"""
TODO: Documentation
"""
if isinstance(targets, str):
targets = [targets]
if isinstance(messages, str):
messages = [messages]
for y in messages:
z = 1
if isinstance(y, tuple) and len(y) == 2:
y, z = y
for x in targets:
self._cmd("PRIVMSG", x, "ACTION {}".format(y))
pause(z)
@hooks.queue()
def join(self, *channels: Tuple[Union[str, tuple]]) -> None:
"""
TODO: Documentation
"""
for chan in channels:
if isinstance(chan, tuple):
self._cmd("JOIN", *chan)
else:
self._cmd("JOIN", chan)
self._cmd("MODE", chan)
@hooks.queue()
def part(self, *channels: Tuple[str]) -> None:
"""
TODO: Documentation
"""
self._cmd("PART", ','.join(chan for chan in channels))
@hooks.queue()
def nick(self, nick: Optional[str]=None) -> None:
"""
TODO: Documentation
"""
if nick is None:
nick = self.config['name']
self._cmd("NICK", nick)
@hooks.queue()
def ping(self, line: str="timeout") -> None:
"""
TODO: Documentation
"""
self._cmd("PING", line)
@hooks.queue()
def quit(self, message: str="Connection Closed") -> None:
"""
TODO: Documentation
"""
self._cmd("QUIT", message)
self._quitting = True
self._close()
# func that makes the bot's thread pasue for a given amount of seconds
@hooks.queue()
def pause(self, time: int=1) -> None:
"""
TODO: Documentation
"""
pause(time)
# def color(self, matcher) -> str:
# return re.sub(':(\d)(?:,(\d))?:', _color_replace, matcher)
# def _color_replace(self, match):
# if len(match.groups()) > 1:
# if int(match.group(1)) < 16 and int(match.group(2)) < 16:
# return '\x02{0},{1}'.format(match.group(1), match.group(2))
# if len(match.groups()) > 0:
# if int(match.group(1)) < 16:
# return '\x02{0}'.format(match.group(1))
# return ''
@hooks.queue()
def reconnect(self) -> None:
"""
Function that executes an optional connection reset.
Closes socket
Checks for failed attempt count and stalls connection accordingly
If the reconnect config is True it will reconnect, otherwise the
connection and thread will end
"""
if self.socket:
self._close(False)
print("--- {0}: {1} ---".format(
self.config['name'],
self.config['host']
))
if self.config['verbose']:
print("Connection closed.")
self._run_hooks('disconnect')
if self.ERROR >= 10:
print("There have been 10 or more failed attempts to reconnect.")
print(
"Please wait till the bot is able to do so, then press enter to try again.")
input('Press ENTER to continue')
elif self.ERROR:
waittime = 30*self.ERROR+30
print(f"Error occurred (see stack trace). Waiting {waittime} seconds to reconnect.")
pause(waittime)
elif self.config['reconnect']:
# input()
if self.config['verbose']:
print("Waiting 10 seconds to reconnect...")
pause(8)
if self.config['reconnect']:
pause(2)
if self.config['verbose']:
print("Opening new connection...")
return True
else:
self.ERROR = 0
def connect(self) -> None:
'''
Connects to the IRC server with the options defined in `config`
'''
while True:
self.isupport = {}
self.ulist = {}
self._connect()
try:
self._listen()
except (KeyboardInterrupt, SystemExit):
pass
except socket.error:
if not self._quitting:
raise Exception(
"Unexpected socket error. Resetting connection."
)
except:
print(" ")
print("Exception occured:", sys.exc_info()[1])
print(" ")
print_tb(sys.exc_info()[2])
print(" ")
f = open('{0} - BotLog.txt'.format(self.config['name']), 'a')
f.write("\r\n")
f.write(now())
f.write("\r\nConnection: {0}\r\n".format(self.config['host']))
print_exc(None, f)
f.write("\r\n")
f.close()
if self.reconnect() is True:
continue
finally:
self._close()
break
def _connect(self) -> None:
"""
Sets socket connection and negotiates capabilites and registration
"""
self.socket = socket.socket()
self.socket.connect((self.config['host'], self.config['port']))
self.socket.settimeout(1.0)
if self.config['verbose']:
print("({0}: {1}) Connection successful".format(
self.config['name'],
self.config['host']
))
# Setup Connection Initialization
def _CAP_REQ(info):
"""Request capabilities offered by the server"""
allowed = info['args'][-1].split(' ')
requested = ['multi-prefix']
for i, r in enumerate(requested):
if r not in allowed:
del requested[i]
self._cmd('CAP REQ', *requested)
def _CAP_END(info):
"""End capability negotiations"""
self._cmd('CAP END')
def _CONNECTED(info):
"""Do connection post-negotiation commands"""
if len(self.config['channels']):
self.join(*self.config['channels'])
self.trigger({'verb': 'CAP', 'args': ['LS']}, _CAP_REQ, True)
self.trigger({'verb': 'CAP', 'args': ['ACK']}, _CAP_END, True)
self.on_code(376, _CONNECTED, True) #MOTD end, trigger initial actions
# Initiate capability negotiation
self._cmd('CAP LS 302')
# Initiate user registration
if self.config['passphrase']:
self._cmd('PASS', self.config['passphrase'])
self._cmd('USER', self.config['ident'],
'0', '*', self.config['realname'])
self._cmd('NICK', self.config['nick'])
def _close(self, runhooks: bool=True) -> None:
"""
TODO: Documentation
"""
if 'thread' in self._hooks:
for thread in self._hooks['thread']:
thread.shutdown()
if self.socket:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
self.socket = None
if runhooks:
self._run_hooks('close')
@hooks.queue()
def close(self) -> NoReturn:
"""
TODO: Documentation
"""
if self.config['verbose']:
print("Closing connection and thread for {0}:{1}".format(
self.config['name'],
self.config['host']
))
raise SystemExit()
class Bot(Base):
"""
This class is a high-level wrapper for the base bot to allow for dynamic reloading of hooks.
Config Vars
host (string) : address to connect to
port (integer) : port to connect with
name (string) : bot's original name
ident (string) : the 'user' part of 'nick!user@host * name'
nick (string) : the 'nick' part of 'nick!user@host * name'; bot's temporary name
realname (string) : the 'name' part of 'nick!user@host * name'
channels (list) : a list of channels to autojoin on successful connect
command (string) : a (sequence of) character(s) the bot will respond to for a command
passphrase (string) : passed to nickserv for authentication
break_on_match (bool) : determines whether multiple matches are allowed per recieved line
verbose (bool) : determines whether debug info is printed to the console
reconnect (bool) : determines whether to automatically reconnect if an error/exception occurs
replace (dict) : dictionary for custom regex variable replacement; form of ':key:';
if key does not exist in the dict, :key: is removed from the regex
hookscripts (list) : a list of module names that contain custom hooks
reload_regex (string) : custom regex to be used in the default module reload implementation
reload_func (callable) : custom func to be used in the default module reload implementation
reload_override (bool) : determines whether the default module implementation is used
ref (callable) : optional config to allow reference to a parent BotGroup class to grant
interaction with other connections
"""
def __init__(self, host: str, **kwargs) -> None:
default = {
'hookscripts': [],
'reload_override': False,
'ref': None
}
super(Bot, self).__init__(host, **{**default, **kwargs})
if not self.config['reload_override']:
self.config.setdefault('reload_regex', re.compile(
f":{self.config['command']}reload$"))
self.config.setdefault('reload_func', self.load_hooks)
self.trigger(
{'verb': 'PRIVMSG', 'args': [None ,self.config['reload_regex']]},
self.config['reload_func']
)
self.load_hooks()
def load_hooks(self) -> None:
"""
TODO: Documentation
"""
if callable(self.config['hookscripts']):
try:
scripts = iter(self.config['hookscripts'])
except TypeError:
scripts = self.config['hookscripts']()
elif isinstance(self.config['hookscripts'], str):
scripts = [self.config['hookscripts']]
else:
scripts = list(self.config['hookscripts'])
old_funcs = [(k, v) for k, v in self.__dict__.items()
if hasattr(v, '_type')]