-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrt.py
842 lines (727 loc) · 33.8 KB
/
rt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
""" Python library :term:`API` to Request Tracker's :term:`REST` interface.
Implements functions needed by `Malicious Domain Manager
<https://git.nic.cz/redmine/projects/mdm>`_, but is not directly connected with
it, so this library can also be use separatly.
Description of Request Tracker REST API: http://requesttracker.wikia.com/wiki/REST
Provided functionality: login to RT, logout, getting, creating and editing
tickets, getting attachments, getting history of ticket, replying to ticket
requestors, adding comments, getting and editing ticket links, searching,
providing lists of last updated tickets and tickets with new correspondence
and merging tickets.
"""
__license__ = """ Copyright (C) 2012 CZ.NIC, z.s.p.o.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
__docformat__ = "reStructuredText en"
__authors__ = [
'"Jiri Machalek" <jiri.machalek@nic.cz>'
]
import re
import os
import requests
DEFAULT_QUEUE = 'General'
""" Default queue used by |mdm|. """
class Rt:
""" :term:`API` for Request Tracker according to
http://requesttracker.wikia.com/wiki/REST. Interface is based on REST
architecture, which is based on HTTP/1.1 protocol. This library is
therefore mainly sending and parsing special HTTP messages.
.. note:: Use only ASCII LF as newline (``\\n``). Time is returned in UTC.
All strings returned are encoded in UTF-8 and the same is
expected as input for string values.
"""
def __init__(self, url, default_login=None, default_password=None):
""" API initialization.
:keyword url: Base URL for Request Tracker API.
E.g.: http://tracker.example.com/REST/1.0/
:keyword default_login: Default RT login used by self.login if no
other credentials are provided
:keyword default_password: Default RT password
"""
self.url = url
self.default_login = default_login
self.default_password = default_password
self.session = requests.session()
self.login_result = None
def __request(self, selector, post_data={}, files=[], without_login=False):
""" General request for :term:`API`.
:keyword selector: End part of URL which completes self.url parameter
set during class inicialization.
E.g.: ``ticket/123456/show``
:keyword post_data: Dictionary with POST method fields
:keyword files: List of pairs (filename, file-like object) describing
files to attach as multipart/form-data
(list is necessary to keep files ordered)
:keyword without_login: Turns off checking last login result
(usually needed just for login itself)
:returns: Requested messsage including state line in form
``RT/3.8.7 200 Ok\\n``
:rtype: string
:raises Exception: In case that request is called without previous
login or any other connection error.
"""
try:
url = str(os.path.join(self.url, selector))
if self.login_result or without_login:
if not files:
if post_data:
response = self.session.post(url, data=post_data)
else:
response = self.session.get(url)
else:
files_data = {}
for i in xrange(len(files)):
files_data['attachment_%d' % (i+1)] = files[i]
response = self.session.post(url, data=post_data, files=files_data)
return response.content
else:
raise Exception('Log in required')
except requests.exceptions.ConnectionError as e:
raise Exception(e.args[0].message)
def __get_status_code(self, msg):
""" Select status code given message.
:returns: Status code
:rtype: int
"""
return int(msg.split('\n')[0].split(' ')[1])
def login(self, login=None, password=None):
""" Login with default or supplied credetials.
:keyword login: Username used for RT, if not supplied together with
*password* :py:attr:`~Rt.default_login` and
:py:attr:`~Rt.default_password` are used instead
:keyword password: Similarly as *login*
:returns: ``True``
Successful login
``False``
Otherwise
:raises Exception: In case that credentials are not supplied neither
during inicialization or call of this method.
"""
if (login is not None) and (password is not None):
login_data = {'user':login, 'pass':password}
elif (self.default_login is not None) and (self.default_password is not None):
login_data = {'user':self.default_login, 'pass':self.default_password}
else:
raise Exception('Credentials required')
self.login_result = self.__get_status_code(self.__request('',
post_data=login_data,
without_login=True)) == 200
return self.login_result
def logout(self):
""" Logout of user.
:returns: ``True``
Successful logout
``False``
Logout failed (mainly because user was not login)
"""
ret = False
if self.login_result == True:
ret = self.__get_status_code(self.__request('logout')) == 200
self.login_result = None
return ret
def new_correspondence(self, queue=DEFAULT_QUEUE):
""" Obtains tickets changed by other users than the system one.
:keyword queue: Queue where to search
:returns: List of tickets which were last updated by other user than
the system one ordered in decreasing order by LastUpdated.
Each ticket is dictionary, the same as in
:py:meth:`~Rt.get_ticket`.
"""
msgs = self.__request('search/ticket?query=Queue=\'%s\'+AND+(LastUpdatedBy!=\'%s\')&orderby=-LastUpdated&format=l' % (queue, self.default_login))
msgs = msgs.split('\n--\n')
items = []
try:
for i in xrange(len(msgs)):
pairs = {}
msg = msgs[i].split('\n')
for i in xrange(len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
if len(pairs) > 0:
items.append(pairs)
return items
except:
return []
def last_updated(self, since, queue=DEFAULT_QUEUE):
""" Obtains tickets changed after given date.
:param since: Date as string in form '2011-02-24'
:keyword queue: Queue where to search
:returns: List of tickets with LastUpdated parameter later than
*since* ordered in decreasing order by LastUpdated.
Each tickets is dictionary, the same as in
:py:meth:`~Rt.get_ticket`.
"""
msgs = self.__request('search/ticket?query=(Queue=\'%s\')+AND+(LastUpdatedBy!=\'%s\')+AND+(LastUpdated>\'%s\')&orderby=-LastUpdated&format=l' % (queue, self.default_login, since))
msgs = msgs.split('\n--\n')
items = []
try:
for i in xrange(len(msgs)):
pairs = {}
msg = msgs[i].split('\n')
for i in xrange(len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
if len(pairs)>0:
items.append(pairs)
return items
except:
return []
def search(self, Queue=DEFAULT_QUEUE, **kwargs):
""" Search arbitrary needles in given fields and queue.
Example::
>>> tracker = Rt('http://tracker.example.com/REST/1.0/', 'rt-username', 'top-secret')
>>> tracker.login()
>>> tickets = tracker.search(CF_Domain='example.com')
:keyword Queue: Queue where to search
:keyword kwargs: Other arguments possible to set:
Requestors, Subject, Cc, AdminCc, Owner, Status,
Priority, InitialPriority, FinalPriority,
TimeEstimated, Starts, Due, Text,... (according to RT
fields)
Setting value for this arguments constrain search
results for only tickets exactly matching all
arguments.
Custom fields CF.{<CustomFieldName>} could be set
with keywords CF_CustomFieldName.
:returns: List of matching tickets. Each ticket is the same dictionary
as in :py:meth:`~Rt.get_ticket`.
:raises Exception: Unexpected format of returned message.
"""
query = 'search/ticket?query=(Queue=\'%s\')' % (Queue,)
for key in kwargs:
if key[:3] != 'CF_':
query += "+AND+(%s=\'%s\')" % (key, kwargs[key])
else:
query += "+AND+(CF.{%s}=\'%s\')" % (key[3:], kwargs[key])
query += "&format=l"
msgs = self.__request(query)
msgs = msgs.split('\n--\n')
items = []
try:
if not hasattr(self, 'requestors_pattern'):
self.requestors_pattern = re.compile('Requestors:')
for i in xrange(len(msgs)):
pairs = {}
msg = msgs[i].split('\n')
req_id = [id for id in xrange(len(msg)) if self.requestors_pattern.match(msg[id]) is not None]
if len(req_id)==0:
raise Exception('Non standard ticket.')
else:
req_id = req_id[0]
for i in xrange(req_id):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
requestors = [msg[req_id][12:]]
req_id += 1
while (req_id < len(msg)) and (msg[req_id][:12] == ' '*12):
requestors.append(msg[req_id][12:])
req_id += 1
pairs['Requestors'] = requestors
for i in xrange(req_id,len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
if len(pairs) > 0:
items.append(pairs)
return items
except:
return []
def get_ticket(self, ticket_id):
""" Fetch ticket by its ID.
:param ticket_id: ID of demanded ticket
:returns: Dictionary with key, value pairs for ticket with
*ticket_id*. List of keys:
* id
* Queue
* Owner
* Creator
* Subject
* Status
* Priority
* InitialPriority
* FinalPriority
* Requestors
* Cc
* AdminCc
* Created
* Starts
* Started
* Due
* Resolved
* Told
* TimeEstimated
* TimeWorked
* TimeLeft
:raises Exception: Unexpected format of returned message.
"""
msg = self.__request('ticket/%s/show' % (str(ticket_id),))
if(self.__get_status_code(msg) == 200):
pairs = {}
msg = msg.split('\n')
if not hasattr(self, 'requestors_pattern'):
self.requestors_pattern = re.compile('Requestors:')
req_id = [id for id in xrange(len(msg)) if self.requestors_pattern.match(msg[id]) is not None]
if len(req_id)==0:
raise Exception('Non standard ticket.')
else:
req_id = req_id[0]
for i in xrange(req_id):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
requestors = [msg[req_id][12:]]
req_id += 1
while (req_id < len(msg)) and (msg[req_id][:12] == ' '*12):
requestors.append(msg[req_id][12:])
req_id += 1
pairs['Requestors'] = requestors
for i in xrange(req_id,len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon+1:].strip()
return pairs
else:
raise Exception('Connection error')
def create_ticket(self, Queue=DEFAULT_QUEUE, **kwargs):
""" Create new ticket and set given parameters.
Example of message sended to ``http://tracker.example.com/REST/1.0/ticket/new``::
content=id: ticket/new
Queue: General
Owner: Nobody
Requestors: somebody@example.com
Subject: Ticket created through REST API
Text: Lorem Ipsum
In case of success returned message has this form::
RT/3.8.7 200 Ok
# Ticket 123456 created.
# Ticket 123456 updated.
Otherwise::
RT/3.8.7 200 Ok
# Required: id, Queue
+ list of some key, value pairs, probably default values.
:keyword Queue: Queue where to create ticket
:keyword kwargs: Other arguments possible to set:
Requestors, Subject, Cc, AdminCc, Owner, Status,
Priority, InitialPriority, FinalPriority,
TimeEstimated, Starts, Due, Text,... (according to RT
fields)
Custom fields CF.{<CustomFieldName>} could be set
with keywords CF_CustomFieldName.
:returns: ID of new ticket or ``-1``, if creating failed
"""
post_data = 'id: ticket/new\nQueue: %s\n'%(Queue)
for key in kwargs:
if key[:3] != 'CF_':
post_data += "%s: %s\n"%(key, kwargs[key])
else:
post_data += "CF.{%s}: %s\n"%(key[3:], kwargs[key])
msg = self.__request('ticket/new', {'content':post_data})
state = msg.split('\n')[2]
res = re.search(' [0-9]+ ',state)
if res is not None:
return int(state[res.start():res.end()])
else:
return -1
def edit_ticket(self, ticket_id, **kwargs):
""" Edit ticket values.
:param ticket_id: ID of ticket to edit
:keyword kwargs: Other arguments possible to set:
Requestors, Subject, Cc, AdminCc, Owner, Status,
Priority, InitialPriority, FinalPriority,
TimeEstimated, Starts, Due, Text,... (according to RT
fields)
Custom fields CF.{<CustomFieldName>} could be set
with keywords CF_CustomFieldName.
:returns: ``True``
Operation was successful
``False``
Ticket with given ID does not exist or unknown parameter
was set (in this case all other valid fields are changed)
"""
post_data = ''
for key in kwargs:
if key[:3] != 'CF_':
post_data += "%s: %s\n"%(key, kwargs[key])
else:
post_data += "CF.{%s}: %s\n" % (key[3:], kwargs[key])
msg = self.__request('ticket/%s/edit' % (str(ticket_id)), {'content':post_data})
state = msg.split('\n')[2]
if not hasattr(self, 'update_pattern'):
self.update_pattern = re.compile('^# Ticket [0-9]+ updated.$')
return self.update_pattern.match(state) is not None
def get_history(self, ticket_id, transaction_id=None):
""" Get set of history items.
:param ticket_id: ID of ticket
:keyword transaction_id: If set to None, all history items are
returned, if set to ID of valid transaction
just one history item is returned
:returns: List of history items ordered increasingly by time of event.
Each history item is dictionary with following keys:
Description, Creator, Data, Created, TimeTaken, NewValue,
Content, Field, OldValue, Ticket, Type, id, Attachments
All these fields are strings, just 'Attachments' holds list
of pairs (attachment_id,filename_with_size).
:raises Exception: Unexpected format of returned message.
"""
if transaction_id is None:
# We are using "long" format to get all history items at once.
# Each history item is then separated by double dash.
msgs = self.__request('ticket/%s/history?format=l' % (str(ticket_id),))
else:
msgs = self.__request('ticket/%s/history/id/%s' % (str(ticket_id), str(transaction_id)))
msgs = msgs.split('\n--\n')
items = []
try:
if not hasattr(self, 'content_pattern'):
self.content_pattern = re.compile('Content:')
if not hasattr(self, 'attachments_pattern'):
self.attachments_pattern = re.compile('Attachments:')
for i in xrange(len(msgs)):
pairs = {}
msg = msgs[i].split('\n')
cont_id = [id for id in xrange(len(msg)) if self.content_pattern.match(msg[id]) is not None]
if len(cont_id) == 0:
raise Exception('Unexpected history entry. \
Missing line starting with `Content:`.')
else:
cont_id = cont_id[0]
atta_id = [id for id in xrange(len(msg)) if self.attachments_pattern.match(msg[id]) is not None]
if len(atta_id) == 0:
raise Exception('Unexpected attachment part of history entry. \
Missing line starting with `Attachements:`.')
else:
atta_id = atta_id[0]
for i in xrange(cont_id):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
content = msg[cont_id][9:]
cont_id += 1
while (cont_id < len(msg)) and (msg[cont_id][:9] == ' ' * 9):
content += '\n'+msg[cont_id][9:]
cont_id += 1
pairs['Content'] = content
for i in xrange(cont_id, atta_id):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
attachments = []
for i in xrange(atta_id + 1, len(msg)):
colon = msg[i].find(': ')
if colon > 0:
attachments.append((int(msg[i][:colon].strip()),
msg[i][colon + 1:].strip()))
pairs['Attachments'] = attachments
items.append(pairs)
return items
except:
return []
def reply(self, ticket_id, text='', cc='', bcc='', files=[]):
""" Sends email message to the contacts in ``Requestors`` field of
given ticket with subject as is set in ``Subject`` field.
Form of message according to documentation::
id: <ticket-id>
Action: correspond
Text: the text comment
second line starts with the same indentation as first
Cc: <...>
Bcc: <...>
TimeWorked: <...>
Attachment: an attachment filename/path
:param ticket_id: ID of ticket to which message belongs
:keyword text: Content of email message
:keyword cc: Carbon copy just for this reply
:keyword bcc: Blind carbon copy just for this reply
:keyword files: List of pairs (filename, file-like object) describing
files to attach as multipart/form-data
:returns: ``True``
Operation was successful
``False``
Sending failed (status code != 200)
"""
post_data = {'content':"""id: %s
Action: correspond
Text: %s
Cc: %s
Bcc: %s"""%(str(ticket_id), re.sub(r'\n', r'\n ', text), cc, bcc)}
for file_pair in files:
post_data['content'] += "\nAttachment: %s" % (file_pair[0],)
msg = self.__request('ticket/%s/comment' % (str(ticket_id),),
post_data, files)
return self.__get_status_code(msg) == 200
def comment(self, ticket_id, text='', cc='', bcc='', files=[]):
""" Adds comment to the given ticket.
Form of message according to documentation::
id: <ticket-id>
Action: comment
Text: the text comment
second line starts with the same indentation as first
Attachment: an attachment filename/path
:param ticket_id: ID of ticket to which comment belongs
:keyword text: Content of comment
:keyword files: List of pairs (filename, file-like object) describing
files to attach as multipart/form-data
:returns: ``True``
Operation was successful
``False``
Sending failed (status code != 200)
"""
post_data = {'content':"""id: %s
Action: comment
Text: %s""" % (str(ticket_id), re.sub(r'\n', r'\n ', text))}
for file_pair in files:
post_data['content'] += "\nAttachment: %s" % (file_pair[0],)
msg = self.__request('ticket/%s/comment' % (str(ticket_id),),
post_data, files)
return self.__get_status_code(msg) == 200
def get_attachments_ids(self, ticket_id):
""" Get IDs of attachments for given ticket.
:param ticket_id: ID of ticket
:returns: List of IDs (type int) of attachments belonging to given
ticket
"""
at = self.__request('ticket/%s/attachments' % (str(ticket_id),))
if (len(at) != 0) and (self.__get_status_code(at) == 200):
atlines = at.split('\n')
if len(atlines) >= 4:
return [int(re.sub(r'[^0-9]*([0-9]+):.*', r'\1', line)) for line in atlines[4:] if len(line) > 0]
else:
return []
else:
return []
def get_attachment(self, ticket_id, attachment_id):
""" Get attachment.
:param ticket_id: ID of ticket
:param attachment_id: ID of attachment for obtain
:returns: Attachment as dictionary with these keys:
* Transaction
* ContentType
* Parent
* Creator
* Created
* Filename
* Content
* Headers
* MessageId
* ContentEncoding
* id
* Subject
All these fields are strings, just 'Headers' holds another
dictionary with attachment headers as strings e.g.:
* Delivered-To
* From
* Return-Path
* Content-Length
* To
* X-Seznam-User
* X-QM-Mark
* Domainkey-Signature
* RT-Message-ID
* X-RT-Incoming-Encryption
* X-Original-To
* Message-ID
* X-Spam-Status
* In-Reply-To
* Date
* Received
* X-Country
* X-Spam-Checker-Version
* X-Abuse
* MIME-Version
* Content-Type
* Subject
.. warning:: Content-Length parameter is set after opening
ticket in web interface!
Set of headers available depends on mailservers sending
emails not on Request Tracker!
:raises Exception: Unexpected format of returned message.
"""
msg = self.__request('ticket/%s/attachments/%s' % (str(ticket_id), str(attachment_id)))
msg = msg.split('\n')[2:]
if not hasattr(self, 'headers_pattern'):
self.headers_pattern = re.compile('Headers:')
head_id = [id for id in xrange(len(msg)) if self.headers_pattern.match(msg[id]) is not None]
if len(head_id) == 0:
raise Exception('Unexpected headers part of attachment entry. \
Missing line starting with `Headers:`.')
else:
head_id = head_id[0]
msg[head_id] = re.sub(r'^Headers: (.*)$', r'\1', msg[head_id])
if not hasattr(self, 'content_pattern'):
self.content_pattern = re.compile('Content:')
cont_id = [id for id in xrange(len(msg)) if self.content_pattern.match(msg[id]) is not None]
if len(cont_id) == 0:
raise Exception('Unexpected content part of attachment entry. \
Missing line starting with `Content:`.')
else:
cont_id = cont_id[0]
pairs = {}
for i in xrange(head_id):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
headers = {}
for i in xrange(head_id, cont_id):
colon = msg[i].find(': ')
if colon > 0:
headers[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
pairs['Headers'] = headers
content = msg[cont_id][9:]
for i in xrange(cont_id+1, len(msg)):
if msg[i][:9] == (' ' * 9):
content += '\n' + msg[i][9:]
pairs['Content'] = content
return pairs
def get_attachment_content(self, ticket_id, attachment_id):
""" Get content of attachment without headers.
This function is necessary to use for binary attachment,
as it can contain ``\\n`` chars, which would disrupt parsing
of message if :py:meth:`~Rt.get_attachment` is used.
Format of message::
RT/3.8.7 200 Ok\n\nStart of the content...End of the content\n\n\n
:param ticket_id: ID of ticket
:param attachment_id: ID of attachment
Returns: string with content of attachment
"""
msg = self.__request('ticket/%s/attachments/%s/content' % (str(ticket_id), str(attachment_id)))
return msg[re.search('\n', msg).start() + 2:-3]
def get_user(self, user_id):
""" Get user details.
:param user_id: Identification of user by username (str) or user ID
(int)
:returns: User details as strings in dictionary with these keys for RT
users:
* Lang
* RealName
* Privileged
* Disabled
* Gecos
* EmailAddress
* Password
* id
* Name
Or these keys for external users (e.g. Requestors replying
to email from RT:
* RealName
* Disabled
* EmailAddress
* Password
* id
* Name
:raises Exception: In case that returned status code is not 200
"""
msg = self.__request('user/%s' % (str(user_id),))
if(self.__get_status_code(msg) == 200):
pairs = {}
msg = msg.split('\n')[2:]
for i in xrange(len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
return pairs
else:
raise Exception('Connection error')
def get_queue(self, queue_id):
""" Get queue details.
:param queue_id: Identification of queue by name (str) or queue ID
(int)
:returns: Queue details as strings in dictionary with these keys
(if queue exists):
* id
* Name
* Description
* CorrespondAddress
* CommentAddress
* InitialPriority
* FinalPriority
* DefaultDueIn
:raises Exception: In case that returned status code is not 200
"""
msg = self.__request('queue/%s' % str(queue_id))
if(self.__get_status_code(msg) == 200):
pairs = {}
msg = msg.split('\n')[2:]
for i in xrange(len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
return pairs
else:
raise Exception('Connection error')
def get_links(self, ticket_id):
""" Gets the ticket links for a single ticket.
:param ticket_id: ticket ID
:returns: Links as strings in dictionary with these keys
(just those which are defined):
* id
* Members
* MemberOf
* RefersTo
* ReferredToBy
* DependsOn
* DependedOnBy
:raises Exception: In case that returned status code is not 200
"""
msg = self.__request('ticket/%s/links/show' % (str(ticket_id),))
if(self.__get_status_code(msg) == 200):
pairs = {}
msg = msg.split('\n')[2:]
for i in xrange(len(msg)):
colon = msg[i].find(': ')
if colon > 0:
pairs[msg[i][:colon].strip()] = msg[i][colon + 1:].strip()
return pairs
else:
raise Exception('Connection error')
def edit_ticket_links(self, ticket_id, **kwargs):
""" Edit ticket links.
:param ticket_id: ID of ticket to edit
:keyword kwargs: Other arguments possible to set: DependsOn,
DependedOnBy, RefersTo, ReferredToBy, Members,
MemberOf. Each value should be either ticker ID or
external link. Int types are converted. Use empty
string as value to delete existing link.
:returns: ``True``
Operation was successful
``False``
Ticket with given ID does not exist or unknown parameter
was set (in this case all other valid fields are changed)
"""
post_data = ''
for key in kwargs:
post_data += "%s: %s\n"%(key, str(kwargs[key]))
msg = self.__request('ticket/%s/links' % (str(ticket_id),),
{'content':post_data})
state = msg.split('\n')[2]
if not hasattr(self, 'links_updated_pattern'):
self.links_updated_pattern = re.compile('^# Links for ticket [0-9]+ updated.$')
return self.links_updated_pattern.match(state) is not None
def merge_ticket(self, ticket_id, into_id):
""" Merge ticket into another (undocumented API feature). May not work
in 4.x RT series.
:param ticket_id: ID of ticket to be merged
:param into: ID of destination ticket
:returns: ``True``
Operation was successful
``False``
Either origin or destination ticket does not
exist or user does not have ModifyTicket permission.
"""
msg = self.__request('ticket/merge/%s' % (str(ticket_id),),
{'into':into_id})
state = msg.split('\n')[2]
if not hasattr(self, 'merge_successful_pattern'):
self.merge_successful_pattern = re.compile('^Merge Successful$')
return self.merge_successful_pattern.match(state) is not None