-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathtest_unpack.py
89 lines (68 loc) · 2.37 KB
/
test_unpack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import sys
from io import BytesIO
from pytest import mark, raises
from msgpack import ExtType, OutOfData, Unpacker, packb
def test_unpack_array_header_from_file():
f = BytesIO(packb([1, 2, 3, 4]))
unpacker = Unpacker(f)
assert unpacker.read_array_header() == 4
assert unpacker.unpack() == 1
assert unpacker.unpack() == 2
assert unpacker.unpack() == 3
assert unpacker.unpack() == 4
with raises(OutOfData):
unpacker.unpack()
@mark.skipif(
"not hasattr(sys, 'getrefcount') == True",
reason="sys.getrefcount() is needed to pass this test",
)
def test_unpacker_hook_refcnt():
result = []
def hook(x):
result.append(x)
return x
basecnt = sys.getrefcount(hook)
up = Unpacker(object_hook=hook, list_hook=hook)
assert sys.getrefcount(hook) >= basecnt + 2
up.feed(packb([{}]))
up.feed(packb([{}]))
assert up.unpack() == [{}]
assert up.unpack() == [{}]
assert result == [{}, [{}], {}, [{}]]
del up
assert sys.getrefcount(hook) == basecnt
def test_unpacker_ext_hook():
class MyUnpacker(Unpacker):
def __init__(self):
super().__init__(ext_hook=self._hook, raw=False)
def _hook(self, code, data):
if code == 1:
return int(data)
else:
return ExtType(code, data)
unpacker = MyUnpacker()
unpacker.feed(packb({"a": 1}))
assert unpacker.unpack() == {"a": 1}
unpacker.feed(packb({"a": ExtType(1, b"123")}))
assert unpacker.unpack() == {"a": 123}
unpacker.feed(packb({"a": ExtType(2, b"321")}))
assert unpacker.unpack() == {"a": ExtType(2, b"321")}
def test_unpacker_tell():
objects = 1, 2, "abc", "def", "ghi"
packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
positions = 1, 2, 6, 10, 14
unpacker = Unpacker(BytesIO(packed))
for obj, unp, pos in zip(objects, unpacker, positions):
assert obj == unp
assert pos == unpacker.tell()
def test_unpacker_tell_read_bytes():
objects = 1, "abc", "ghi"
packed = b"\x01\x02\xa3abc\xa3def\xa3ghi"
raw_data = b"\x02", b"\xa3def", b""
lenghts = 1, 4, 999
positions = 1, 6, 14
unpacker = Unpacker(BytesIO(packed))
for obj, unp, pos, n, raw in zip(objects, unpacker, positions, lenghts, raw_data):
assert obj == unp
assert pos == unpacker.tell()
assert unpacker.read_bytes(n) == raw