-
Notifications
You must be signed in to change notification settings - Fork 8
/
benchmark.py
115 lines (83 loc) · 2.71 KB
/
benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# Copyright (c) 2017-2020, Carl Cheung
# All rights reserved.
import logging
import time
from multiprocessing import Process
USE_GEVENT = True
if USE_GEVENT:
import gevent
as_completed = gevent.wait
from gevent.threadpool import ThreadPoolExecutor as ThreadPoolExecutor
else:
from concurrent.futures import ThreadPoolExecutor, as_completed
pool = ThreadPoolExecutor(32)
from kafka_rpc import KRPCClient
from kafka_rpc import KRPCServer
NUMS = 1000
# 213.8162382121568, no concurrent
# 419.71660475278094, no concurrent, use_redis
# 383.26126764939164, no concurrent, use_redis, has verification and encryption
# 2549.6978760136653, threadpool, poolsize 128
# 3408.9112918300966, threadpool, poolsize 64
# 6788.747358099728, threadpool, poolsize 32
# 2777.8816114784295, threadpool, poolsize 16
def start_server():
class Sum:
def add(self, x, y):
return x + y
krs = KRPCServer('localhost:9092', handle=Sum(), topic_name='sum', replication_factor=1, concurrent=False)
krs.server_forever()
def start_server_blocking():
class Sum:
def add(self, x, y):
time.sleep(0.1)
return x + y
krs = KRPCServer('localhost:9092', handle=Sum(), topic_name='sum', concurrent=32)
krs.server_forever()
def call():
krc = KRPCClient('localhost:9092', topic_name='sum')
t1 = time.time()
for i in range(NUMS):
result = krc.add(1, 2)
print(result)
t2 = time.time()
print('Basic Kafka Client QPS:', NUMS / (t2 - t1))
krc.close()
def call_async():
krc = KRPCClient('localhost:9092', topic_name='sum')
t1 = time.time()
futures = []
for i in range(NUMS):
futures.append(pool.submit(krc.add, 1, 2))
for future in as_completed(futures):
result = future.result()
print(result)
# use map instead of submit if you want to
# for result in pool.map(krc.add, (1 for _ in range(NUMS)), (1 for _ in range(NUMS))):
# print(result)
t2 = time.time()
print('Async Kafka Client QPS:', NUMS / (t2 - t1))
krc.close()
if __name__ == '__main__':
log_fmt = '[%(asctime)s]\t-\t%(filename)s\t-\t%(funcName)s\t-\t%(lineno)d\t-\t[%(levelname)s]: %(message)s'
formatter = logging.Formatter(log_fmt)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log_file_handler = logging.FileHandler('log')
log_file_handler.setLevel(logging.INFO)
log_file_handler.setFormatter(formatter)
log.addHandler(log_file_handler)
p = Process(target=start_server)
p.start()
# call()
call_async()
p.terminate()
# p = Process(target=start_server_blocking)
# p.start()
#
# call()
# call_async()
#
# p.terminate()
#
# pool.shutdown()