-
Notifications
You must be signed in to change notification settings - Fork 2
/
RDLCheck.py
141 lines (107 loc) · 3.41 KB
/
RDLCheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import struct, argparse
from time import sleep, time
from impacket.dcerpc.v5 import transport, epm
from impacket.dcerpc.v5.ndr import NDRUniConformantArray, NDRPOINTER, NDRSTRUCT, NDRCALL
from impacket.dcerpc.v5.dtypes import BOOL, ULONG, DWORD, PULONG, PWCHAR, PBYTE, WIDESTR, UCHAR, WORD, LPSTR, PUINT, \
WCHAR
from impacket.uuid import uuidtup_to_bin
from concurrent.futures import ThreadPoolExecutor
import time
UUID = uuidtup_to_bin(("3d267954-eeb7-11d1-b94e-00c04fa3080d", "1.0"))
TRY_TIMES = 3
SLEEP_TIME = 210
DESCRIPTION = "Windows Remote Desktop Licensing Service Detect"
BBYTE = UCHAR
dce = None
rpctransport = None
ctx_handle = None
handle_lists = []
leak_idx = 0
heap_base = 0
ntdll_base = 0
peb_base = 0
pe_base = 0
rpcrt4_base = 0
kernelbase_base = 0
def p8(x):
return struct.pack("B", x)
def p16(x):
return struct.pack("H", x)
def p32(x):
return struct.pack("I", x)
def p64(x):
return struct.pack("Q", x)
class CONTEXT_HANDLE(NDRSTRUCT):
structure = (
("Data", "20s=b"),
)
def getAlignment(self):
return 4
class TLSRpcGetVersion(NDRCALL):
opnum = 0
structure = (
("ctx_handle", CONTEXT_HANDLE),
("version", PULONG),
)
class TLSRpcGetVersionResponse(NDRCALL):
structure = (
("version", ULONG),
)
class TLSRpcConnect(NDRCALL):
opnum = 1
class TLSRpcConnectResponse(NDRCALL):
structure = (
("ctx_handle", CONTEXT_HANDLE),
)
class TLSBLOB(NDRSTRUCT):
structure = (
("cbData", ULONG),
("pbData", PBYTE),
)
def connect_to_license_server(target_ip):
global dce, rpctransport, ctx_handle
stringbinding = epm.hept_map(target_ip, UUID, protocol="ncacn_ip_tcp")
rpctransport = transport.DCERPCTransportFactory(stringbinding)
rpctransport.set_connect_timeout(10)
dce = rpctransport.get_dce_rpc()
dce.set_auth_level(2)
dce.connect()
dce.bind(UUID)
rpc_conn = TLSRpcConnect()
res_rpc_conn = dce.request(rpc_conn)
ctx_handle = res_rpc_conn["ctx_handle"]
get_version = TLSRpcGetVersion()
get_version["ctx_handle"] = ctx_handle
get_version["version"] = 3
res_get_version = dce.request(get_version)
version = res_get_version["version"]
return "0x{:x}".format(version)
def check(target_ip):
try:
version = connect_to_license_server(target_ip)
if version != "":
print(f"[+] {target_ip} seems running RDL service,Server version is {version}.")
except:
print(f"[-] {target_ip} seems not running RDL service or 135 port not available.")
if __name__ == '__main__':
parse = argparse.ArgumentParser(description=DESCRIPTION)
parse.add_argument("-ip", type=str, required=False, help="Target IP, eg: 192.168.120.1")
parse.add_argument("-f", type=str, required=False, help="Target File, eg: ip.txt")
parse.add_argument("-t", type=int, required=False, help="scan thread, default 1")
args = parse.parse_args()
target_host = args.ip
target_file = args.f
thread_num = args.t
start_time = time.time()
if thread_num == 0:
thread_num = 1
if target_file is not None:
with ThreadPoolExecutor(thread_num) as executor:
with open(target_file, "r") as f:
for ip in f:
ip = ip.strip()
executor.submit(check, ip)
elif target_host is not None:
check(target_host)
else:
parse.print_help()