-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathreprotool.py
122 lines (95 loc) · 4.34 KB
/
reprotool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#! /usr/bin/python
# pylint: disable=line-too-long,no-member
# Copyright 2015 F-Secure Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
# reprotool.py
#
# Plays back an MQTT session to reproduce issues.
from twisted.internet.protocol import Protocol, ClientFactory
import time
import binascii
import uuid
import calendar
class MQTTFuzzProtocol(Protocol):
'''Plays back a specified MQTT session.'''
# Raw MQTT messages to be played back, in base64.
# This just connects and disconnects. Replace
# this data with a list of data from the fuzzer logs.
session_data = ['EBYABE1RVFQEAAAAAApteWNsaWVudGlk',
'4AA=']
current_session = iter(session_data)
def dataReceived(self, data):
"""Callback: If we receive data from the remote peer, print it out
:param data: Data received from remote peer
"""
print("{}:{}:Server -> Fuzzer: {}".format(calendar.timegm(time.gmtime()), self.session_id, binascii.b2a_base64(data)))
def connectionMade(self):
"""Callback: We have connected to the MQTT server, so start banging away.
"""
print("{}:{}:Connected to server".format(calendar.timegm(time.gmtime()), self.session_id))
self.send_next_pdu()
def send_next_pdu(self):
"""Send the next message in list
"""
from twisted.internet import reactor
try:
# Send a PDU and schedule the next PDU
self.send_pdu(next(self.current_session))
reactor.callLater(0.05, self.send_next_pdu)
except StopIteration:
# We have sent all the PDUs of this session. Tear down
# connection. It will trigger a reconnection in the factory.
print("{}:{}:End of session, initiating disconnect.".format(calendar.timegm(time.gmtime()), self.session_id))
self.transport.loseConnection()
def send_pdu(self, pdu):
"""Actually send the message out
:param pdu: The message to be sent out
"""
# Send either a valid case or a fuzz case
# 1 in 10, send a fuzz case, otherwise a valid case
print("{}:{}:Fuzzer -> Server: {}".format(calendar.timegm(time.gmtime()), self.session_id, pdu))
self.transport.write(binascii.a2b_base64(pdu))
class MQTTClientFactory(ClientFactory):
protocol = MQTTFuzzProtocol # Factory creates Fuzzer clients
def buildProtocol(self, address):
# Create the fuzzer instance
protocol_instance = ClientFactory.buildProtocol(self, address)
# Tell the fuzzer instance which type of session it should run
protocol_instance.session_id = str(uuid.uuid4())
return protocol_instance
def clientConnectionFailed(self, connector, reason):
# The server under test has died
from twisted.internet import reactor
print("{}:Failed to connect to MQTT server: {}".format(calendar.timegm(time.gmtime()), reason))
reactor.stop()
def clientConnectionLost(self, connector, reason):
# The server under test closed connection or we decided to
# tear down the connection at the end of a session. We'll
# reconnect (which starts another session in the protocol instance)
from twisted.internet import reactor
print("{}:Connection to MQTT server lost: {}".format(calendar.timegm(time.gmtime()), reason))
reactor.stop()
def run_tests():
"""Main function, sends the predetermined list of messages
"""
from twisted.internet import reactor
factory = MQTTClientFactory()
hostname = 'localhost'
port = 1883
print("{}:Starting repro run to {}:{}".format(calendar.timegm(time.gmtime()), hostname, port))
reactor.connectTCP(hostname, port, factory)
reactor.run()
print("{}:Stopped repro run to {}:{}".format(calendar.timegm(time.gmtime()), hostname, port))
if __name__ == '__main__':
run_tests()