forked from ethereum/web3.py
-
Notifications
You must be signed in to change notification settings - Fork 21
/
ipc.py
160 lines (134 loc) · 4.14 KB
/
ipc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import socket
import sys
import threading
try:
from json import JSONDecodeError
except ImportError:
JSONDecodeError = ValueError
from web3.utils.threads import (
Timeout,
)
from .base import JSONBaseProvider
def get_ipc_socket(ipc_path, timeout=0.1):
if sys.platform == 'win32':
# On Windows named pipe is used. Simulate socket with it.
from web3.utils.windows import NamedPipe
return NamedPipe(ipc_path)
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(ipc_path)
sock.settimeout(timeout)
return sock
class PersistantSocket(object):
sock = None
def __init__(self, ipc_path):
self.ipc_path = ipc_path
def __enter__(self):
if not self.sock:
self.sock = get_ipc_socket(self.ipc_path)
return self.sock
def __exit__(self, exc_type, exc_value, traceback):
# only close the socket if there was an error
if exc_value is not None:
try:
self.sock.close()
except Exception:
pass
self.sock = None
def get_default_ipc_path(testnet=False):
if testnet:
testnet = "testnet"
else:
testnet = ""
if sys.platform == 'darwin':
ipc_path = os.path.expanduser(os.path.join(
"~",
"Library",
"Ethereum",
testnet,
"geth.ipc"
))
if os.path.exists(ipc_path):
return ipc_path
ipc_path = os.path.expanduser(os.path.join(
"~",
"Library",
"Application Support",
"io.parity.ethereum",
"jsonrpc.ipc"
))
if os.path.exists(ipc_path):
return ipc_path
elif sys.platform.startswith('linux'):
ipc_path = os.path.expanduser(os.path.join(
"~",
".ethereum",
testnet,
"geth.ipc"
))
if os.path.exists(ipc_path):
return ipc_path
ipc_path = os.path.expanduser(os.path.join(
"~",
".local",
"share",
"io.parity.ethereum",
"jsonrpc.ipc"
))
if os.path.exists(ipc_path):
return ipc_path
elif sys.platform == 'win32':
ipc_path = os.path.join(
"\\\\",
".",
"pipe",
"geth.ipc"
)
if os.path.exists(ipc_path):
return ipc_path
ipc_path = os.path.join(
"\\\\",
".",
"pipe",
"jsonrpc.ipc"
)
if os.path.exists(ipc_path):
return ipc_path
else:
raise ValueError(
"Unsupported platform '{0}'. Only darwin/linux2/win32 are "
"supported. You must specify the ipc_path".format(sys.platform)
)
class IPCProvider(JSONBaseProvider):
_socket = None
def __init__(self, ipc_path=None, testnet=False, *args, **kwargs):
if ipc_path is None:
self.ipc_path = get_default_ipc_path(testnet)
else:
self.ipc_path = ipc_path
self._lock = threading.Lock()
self._socket = PersistantSocket(self.ipc_path)
super(IPCProvider, self).__init__(*args, **kwargs)
def make_request(self, method, params):
request = self.encode_rpc_request(method, params)
with self._lock, self._socket as sock:
sock.sendall(request)
raw_response = b""
with Timeout(10) as timeout:
while True:
try:
raw_response += sock.recv(4096)
except socket.timeout:
timeout.sleep(0)
continue
if raw_response == b"":
timeout.sleep(0)
else:
try:
response = self.decode_rpc_response(raw_response)
except JSONDecodeError:
timeout.sleep(0)
continue
else:
return response