-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpb_utils.py
144 lines (115 loc) · 3.73 KB
/
pb_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
r"""
Created on Fri Jun 12 15:50:11 2020
@author: Macrobull
"""
from __future__ import division, unicode_literals # absolute_import(utils)
from io import StringIO, TextIOBase
### helper classes ###
class StreamReader:
r"""simple prototxt stream splitter"""
def __init__(self, stream:TextIOBase):
self.stream = stream
def __iter__(self)->'Iterator':
while True:
data = self.read_one()
if not data:
break
yield data
def read_one(self)->str:
r"""read one parsible field / block"""
self.buf = StringIO()
self.states = []
while True:
line = self.stream.readline()
if not line:
break
if not self.states and line.strip():
self.states.append((self._read_block, dict()))
pos = self.buf.tell()
self.buf.write(line)
self.buf.seek(pos)
sub_state = None
while self.states:
func, state = self.states.pop()
sub_state = func(sub_state, **state)
if sub_state is None:
break
else:
size = self.buf.tell()
self.buf.seek(0)
ret = self.buf.read(size)
self.buf = StringIO()
if not (sub_state and sub_state.get('newline')):
ret += '\n'
return ret
if self.states:
raise IOError('stream closed with trailing content')
def _read_block(
self, sub_state,
depth=0):
if sub_state and sub_state.get('newline'):
self.buf.seek(self.buf.tell() - 1)
get_state = lambda: {'depth': depth}
while True:
char = self.buf.read(1)
if not char:
break
if char == '"' or char == "'":
self.states.append((self._read_block, get_state()))
self.states.append((self._read_quoted_string, {'quote': char}))
return Ellipsis
elif char == '#':
self.states.append((self._read_block, get_state()))
self.states.append((self._read_comment, dict()))
return Ellipsis
elif char == '\n':
if depth == 0:
return {'newline': True}
elif char == '{':
depth += 1
elif char == '}':
if depth == 0:
raise ValueError('unexpected }')
depth -= 1
self.states.append((self._read_block, get_state()))
def _read_quoted_string(
self, sub_state,
quote='"', escaped=False):
while True:
char = self.buf.read(1)
if not char:
break
if escaped:
escaped = False
continue
if char == '\\':
escaped = True
elif char == quote:
return dict()
self.states.append((self._read_quoted_string, {'quote': quote, 'escaped': escaped}))
def _read_comment(self, sub_state):
line = self.buf.readline()
if line.endswith('\n'):
return {'newline': True}
self.states.append((self._read_comment, dict()))
if __name__ == '__main__':
s = StringIO("""
a: 1
b {
x: 2 # waka'waka
y {
z: "Some
Mutiline
Text"
}
y {
z: 'Apple\\'s good'
}
}
""")
s = StreamReader(s)
for b in s:
print(b.rstrip('\n'))
print('-' * 9)