Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add properties in gtid event #586

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,43 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.last_committed = struct.unpack("<Q", self.packet.read(8))[0]
self.sequence_number = struct.unpack("<Q", self.packet.read(8))[0]

format_string = "<Q"
read_immediate_commit_timestamp = self.packet.read(7)
self.immediate_commit_timestamp = read_immediate_commit_timestamp.ljust(
struct.calcsize(format_string), b"\x00"
)
self.immediate_commit_timestamp = struct.unpack(
format_string, self.immediate_commit_timestamp
)[0]
self.immediate_commit_timestamp = self.immediate_commit_timestamp & (
(1 << (8 * struct.calcsize(format_string))) - 1
)
self.immediate_commit_timestamp = struct.pack(
format_string, self.immediate_commit_timestamp
)

read_original_commit_timestamp = self.packet.read(7)
self.original_commit_timestamp = read_original_commit_timestamp.ljust(
struct.calcsize(format_string), b"\x00"
)
self.original_commit_timestamp = struct.unpack(
format_string, self.original_commit_timestamp
)[0]
self.original_commit_timestamp = self.original_commit_timestamp & (
(1 << (8 * struct.calcsize(format_string))) - 1
)
self.original_commit_timestamp = struct.pack(
format_string, self.original_commit_timestamp
)

# todo(hun): We should implement the properties below,
# but due to the lack of documentation on the variable-length property,
# we've left it as future work.
# - transaction_length, unknown Variable Length
# - immediate_server_version, 4 bytes
# - original_server_version, 4 bytes
# - Commit group ticket, 8 bytes, This property is only introduced in the CPP implementation.

@property
def gtid(self):
"""
Expand Down
7 changes: 7 additions & 0 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def isMySQL57(self):
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
return version == 5.7

def isMySQL801AndMore(self):
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
version_detail = int(self.getMySQLVersion().rsplit(".", 1)[1])
if version > 8.0:
return True
return version == 8.0 and version_detail >= 10

def isMySQL80AndMore(self):
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
return version >= 8.0
Expand Down
37 changes: 36 additions & 1 deletion pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from pymysql.protocol import MysqlPacket
import pytest


__all__ = [
"TestBasicBinLogStreamReader",
"TestMultipleRowBinLogStreamReader",
Expand Down Expand Up @@ -1473,6 +1472,42 @@ def test_rows_query_log_event(self):
self.assertIsInstance(event, RowsQueryLogEvent)


class TestGtidEvent(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestGtidEvent, self).setUp()
self.execute("SET SESSION binlog_rows_query_log_events=1")

def tearDown(self):
self.execute("SET SESSION binlog_rows_query_log_events=0")
super(TestGtidEvent, self).tearDown()

def test_gtid_event(self):
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=[GtidEvent, FormatDescriptionEvent],
)
if not self.isMySQL801AndMore():
self.skipTest("Mysql version is under 8.0.1")
self.execute(
"CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))"
)
format_description_event = self.stream.fetchone()
gtid_event = self.stream.fetchone()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should grep FormatEvent too

Now gtid_event assign FormatDescriptionEvent

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After i grep FormatDescriptionEvent, test was passed in my local env. 👍

self.assertIsInstance(format_description_event, FormatDescriptionEvent)
self.assertIsInstance(gtid_event, GtidEvent)
self.assertIsInstance(gtid_event.event_type, int)
self.assertIsInstance(gtid_event.sid, bytes)
self.assertIsInstance(gtid_event.gno, int)
self.assertIsInstance(gtid_event.lt_type, int)
self.assertIsInstance(gtid_event.last_committed, int)
Copy link
Collaborator

@sean-k1 sean-k1 Dec 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upper mysql 5.7 version has this variable so pytest failed.

  1. base.py make method isMySQL57AndMore
  2. Test setUp method use isMySQL57AndMore* before start Test

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolves it!

self.assertIsInstance(gtid_event.sequence_number, int)
self.assertEqual(gtid_event.sequence_number, 1)
self.assertIsInstance(gtid_event.immediate_commit_timestamp, bytes)
self.assertIsInstance(gtid_event.original_commit_timestamp, bytes)


class TestLatin1(base.PyMySQLReplicationTestCase):
def setUp(self):
super().setUp(charset="latin1")
Expand Down