-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathip17mon.py
119 lines (95 loc) · 3.1 KB
/
ip17mon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# -*- coding: utf-8 -*-
import os
import socket
import struct
import sys
#reload(sys)
#sys.setdefaultencoding('utf-8')
try:
import mmap
except ImportError:
mmap = None
__all__ = ['IPv4Database', 'find']
_unpack_V = lambda b: struct.unpack("<L", b)[0]
_unpack_N = lambda b: struct.unpack(">L", b)[0]
def _unpack_C(b):
if isinstance(b, int):
return b
return struct.unpack("B", b)[0]
datfile = os.path.join(os.path.dirname(__file__), "mydata4vipday2.datx")
class IPv4Database(object):
def __init__(self, filename=None, use_mmap=True):
if filename is None:
filename = datfile
with open(filename, 'rb') as f:
if use_mmap and mmap is not None:
buf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
else:
buf = f.read()
use_mmap = False
self._use_mmap = use_mmap
self._buf = buf
self._offset = _unpack_N(buf[:4])
self._is_closed = False
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
if self._use_mmap:
self._buf.close()
self._is_closed = True
def _lookup_ipv4(self, ip):
index_offset = None
index_length = None
nip = socket.inet_aton(ip)
# first IP number
fip = bytearray(nip)[0]
# second IP number
sip = bytearray(nip)[1]
# 4 + (fip - 1) * 4
tmp_offset = (fip * 256 + sip) * 4
# position in the index block
start = _unpack_V(self._buf[tmp_offset:tmp_offset + 4])
start = start * 9 + 262148
max_comp_len = self._offset - 262148
while start < max_comp_len:
if self._buf[start:start + 4] >= nip:
index_offset = _unpack_V(self._buf[start + 4:start + 7] + b'\0')
index_length = _unpack_C(self._buf[start + 8])
break
start += 9
if index_offset == self._offset:
return None
offset = self._offset + index_offset - 262144
value = self._buf[offset:offset + index_length]
return value.decode('utf-8').strip()
def find(self, ip):
if self._is_closed:
raise ValueError('I/O operation on closed dat file')
return self._lookup_ipv4(ip)
def find(ip):
# keep find for compatibility
result = {'stat': 'fail',
'area': [],
'location': {'latitude': None, 'longitude': None}}
try:
ip = socket.gethostbyname(ip)
except socket.gaierror:
return result
with IPv4Database() as db:
data = db.find(ip)
if data:
result['stat'] = 'ok'
ipinfo = data.split('\t')
if len(ipinfo) >= 5:
result['area'] = ipinfo[:5]
else:
result['area'] = ['','','',ipinfo[0],'']
if len(ipinfo) >= 6:
try:
result['location']['latitude'] = float(ipinfo[5])
result['location']['longitude'] = float(ipinfo[6])
except:
pass
return result