-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
53 lines (39 loc) · 1.34 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from services.updater import Updater
import time
import sys
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "server"
}
def main():
""" Every second the updater class is used to get changes from mysql log """
updater = Updater()
while True:
time.sleep(1)
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
server_id=100,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
prefix = {"table": binlogevent.table, "schema": binlogevent.schema}
for row in binlogevent.rows:
if isinstance(binlogevent, DeleteRowsEvent):
vals = row["values"]
updater.pushDelete(prefix,vals)
elif isinstance(binlogevent, UpdateRowsEvent):
vals = row["after_values"]
updater.pushUpdate(prefix,vals)
elif isinstance(binlogevent, WriteRowsEvent):
vals = row["values"]
updater.pushIsert(prefix,vals)
updater.close()
sys.stdout.flush()
main()