Skip to content

Commit

Permalink
Newline char sniffer
Browse files Browse the repository at this point in the history
  • Loading branch information
dalmijn committed Dec 5, 2024
1 parent 7a0ea5c commit 109c982
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 86 deletions.
163 changes: 86 additions & 77 deletions src/fiat/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def __init__(
self.path = path
self.size = None
self.skip = skip
self.nchar = b"\n"
self.stream = None

if self.stream is None:
Expand Down Expand Up @@ -212,9 +213,17 @@ def close_stream(self):
def setup_stream(self):
"""_summary_."""
self.stream = BufferedReader(FileIO(self.path))
self.size = self.stream.read().count(os.linesep.encode())
self.sniffer()
self.size = self.stream.read().count(self.nchar)
self.stream.seek(self.skip)

def sniffer(self):
"""Sniff for the newline character."""
raw = self.stream.read(20000)
if len(raw.split(b"\r\n")) != 1:
self.nchar = b"\r\n"
self.stream.seek(0)


class BufferedGeomWriter:
"""_summary_."""
Expand Down Expand Up @@ -446,29 +455,80 @@ def __init__(
index: str = None,
):
"""_summary_."""
self.delim = delimiter
self.delimiter = delimiter
self.data = handler
self.meta = {}
self.meta["index_col"] = -1
self.meta["index_name"] = None
self.meta["delimiter"] = delimiter
self.meta["dup_cols"] = None
self.meta["nchar"] = self.data.nchar
self.index = None
self.columns = None
self._nrow = self.data.size
self._ncol = 0

self._parse_meta(header)
self._deter_dtypes_index(index=index)
self.parse_meta(header)
self.parse_structure(index=index)

def _deter_dtypes_index(
def parse_meta(
self,
header: bool,
):
"""Parse the meta data of the csv file."""
_pat = regex_pattern(self.delimiter)
self.data.stream.seek(0)

while True:
self._nrow -= 1
cur_pos = self.data.stream.tell()
line = self.data.stream.readline().decode("utf-8-sig")

if line.startswith("#"):
t = line.strip().split("=")
if len(t) == 1:
tl = t[0].split(":")
if len(tl) > 1:
lst = tl[1].split(self.delimiter)
_entry = tl[0].strip().replace("#", "").lower()
_val = [item.strip() for item in lst]
self.meta[_entry] = _val
else:
lst = t[0].split(self.delimiter)
_entry = lst[0].strip().replace("#", "").lower()
_val = [item.strip() for item in lst[1:]]
self.meta[_entry] = _val
# raise ValueError("Supplied metadata in unknown format..")
else:
key, item = t
self.meta[key.strip().replace("#", "").lower()] = item.strip()
continue

if not header:
self.columns = None
self._ncol = len(_pat.split(line.encode("utf-8-sig")))
self.data.stream.seek(cur_pos)
self._nrow += 1
break

self.columns = [item.strip() for item in line.split(self.delimiter)]
self.meta["dup_cols"] = find_duplicates(self.columns)
self.resolve_column_headers()
self._ncol = len(self.columns)
break

self.data.skip = self.data.stream.tell()
self.meta["ncol"] = self._ncol
self.meta["nrow"] = self._nrow

def parse_structure(
self,
index: str,
):
"""_summary_."""
"""Parse the csv file to create the structure."""
_get_index = False
_get_dtypes = True
_pat_multi = regex_pattern(self.delim, multi=True)
_pat_multi = regex_pattern(self.delimiter, multi=True, nchar=self.data.nchar)

if index is not None:
try:
Expand All @@ -495,7 +555,9 @@ def _deter_dtypes_index(
if _get_dtypes:
_dtypes = [0] * self._ncol
with self.data as _h:
for _nlines, sd in _text_chunk_gen(_h, pattern=_pat_multi):
for _nlines, sd in _text_chunk_gen(
_h, pattern=_pat_multi, nchar=self.data.nchar
):
if _get_dtypes:
for idx in range(self._ncol):
_dtypes[idx] = max(
Expand All @@ -512,58 +574,8 @@ def _deter_dtypes_index(
func = self.meta["dtypes"][idcol]
self.index = [func(item.decode()) for item in _index]

def _parse_meta(
self,
header: bool,
):
"""_summary_."""
_pat = regex_pattern(self.delim)
self.data.stream.seek(0)

while True:
self._nrow -= 1
cur_pos = self.data.stream.tell()
line = self.data.stream.readline().decode("utf-8-sig")

if line.startswith("#"):
t = line.strip().split("=")
if len(t) == 1:
tl = t[0].split(":")
if len(tl) > 1:
lst = tl[1].split(self.delim)
_entry = tl[0].strip().replace("#", "").lower()
_val = [item.strip() for item in lst]
self.meta[_entry] = _val
else:
lst = t[0].split(self.delim)
_entry = lst[0].strip().replace("#", "").lower()
_val = [item.strip() for item in lst[1:]]
self.meta[_entry] = _val
# raise ValueError("Supplied metadata in unknown format..")
else:
key, item = t
self.meta[key.strip().replace("#", "").lower()] = item.strip()
continue

if not header:
self.columns = None
self._ncol = len(_pat.split(line.encode("utf-8-sig")))
self.data.stream.seek(cur_pos)
self._nrow += 1
break

self.columns = [item.strip() for item in line.split(self.delim)]
self.meta["dup_cols"] = find_duplicates(self.columns)
self._resolve_column_headers()
self._ncol = len(self.columns)
break

self.data.skip = self.data.stream.tell()
self.meta["ncol"] = self._ncol
self.meta["nrow"] = self._nrow

def _resolve_column_headers(self):
"""_summary_."""
def resolve_column_headers(self):
"""Resolve the column headers."""
_cols = self.columns
dup = self.meta["dup_cols"]
if dup is None:
Expand Down Expand Up @@ -1701,20 +1713,15 @@ def __init__(
self.meta = kwargs
self.index_col = -1

if "index_col" in self.meta:
self.index_col = self.meta["index_col"]
# Set the attributes of the object
for key, item in kwargs.items():
if not key.startswith("_"):
self.__setattr__(key, item)

# Get the index integer ids
index_int = list(range(kwargs["nrow"]))

if "index_int" in kwargs:
index_int = kwargs.pop("index_int")

if "delimiter" in kwargs:
self.delimiter = kwargs.pop("delimiter")

# Create body of struct
if "dtypes" in kwargs:
self.dtypes = kwargs.pop("dtypes")
if "_index_int" in kwargs:
index_int = kwargs.pop("_index_int")

if columns is None:
columns = [f"col_{num}" for num in range(kwargs["ncol"])]
Expand Down Expand Up @@ -1794,7 +1801,7 @@ def __init__(
self._build_from_stream(
data,
columns,
kwargs,
**kwargs,
)

elif isinstance(data, ndarray):
Expand Down Expand Up @@ -1857,15 +1864,17 @@ def _build_from_stream(
self,
data: BufferHandler,
columns: list,
kwargs,
**kwargs,
):
"""_summary_."""
dtypes = kwargs["dtypes"]
ncol = kwargs["ncol"]
index_col = kwargs["index_col"]
nchar = kwargs["nchar"]
_pat_multi = regex_pattern(
kwargs["delimiter"],
multi=True,
nchar=nchar,
)
with data as h:
_d = _pat_multi.split(h.read().strip())
Expand Down Expand Up @@ -2006,7 +2015,7 @@ def __init__(
if not h.readline() or _c == kwargs["nrow"]:
break

kwargs["index_int"] = index_int
kwargs["_index_int"] = index_int
del index_int

_Table.__init__(
Expand Down Expand Up @@ -2070,14 +2079,14 @@ def set_index(
if key == self.index_col:
return

_pat_multi = regex_pattern(self.delim, multi=True)
_pat_multi = regex_pattern(self.delimiter, multi=True, nchar=self.nchar)
idx = self.header_index[key]
new_index = [None] * self.handler.size

with self.handler as h:
c = 0

for _nlines, sd in _text_chunk_gen(h, _pat_multi):
for _nlines, sd in _text_chunk_gen(h, _pat_multi, nchar=self.nchar):
new_index[c:_nlines] = [
*map(
self.dtypes[idx],
Expand Down
2 changes: 1 addition & 1 deletion src/fiat/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ def add_file_handler(
level : int, optional
Logging level.
filename : str, optional
The name of the file, also the identifier for the steam handler.
The name of the file, also the identifier for the stream handler.
"""
self._handlers.append(FileHandler(dst=dst, level=level, name=filename))

Expand Down
2 changes: 1 addition & 1 deletion src/fiat/models/worker_geom.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def worker(
out_text_writer = DummyWriter()
if exp_data is not None:
man_columns_idxs = [exp_data.columns.index(item) for item in man_columns]
pattern = regex_pattern(exp_data.delimiter)
pattern = regex_pattern(exp_data.delimiter, nchar=exp_data.nchar)
out_text_writer = BufferedTextWriter(
Path(cfg.get("output.path"), cfg.get("output.csv.name")),
mode="ab",
Expand Down
13 changes: 6 additions & 7 deletions src/fiat/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@
}


def regex_pattern(
delimiter: str,
multi: bool = False,
):
def regex_pattern(delimiter: str, multi: bool = False, nchar: bytes = b"\n"):
"""_summary_."""
nchar = nchar.decode()
if not multi:
return regex.compile(rf'"[^"]*"(*SKIP)(*FAIL)|{delimiter}'.encode())
return regex.compile(rf'"[^"]*"(*SKIP)(*FAIL)|{delimiter}|{NEWLINE_CHAR}'.encode())
return regex.compile(rf'"[^"]*"(*SKIP)(*FAIL)|{delimiter}|{nchar}'.encode())


# Calculation
Expand All @@ -73,6 +71,7 @@ def _text_chunk_gen(
h: object,
pattern: re.Pattern,
chunk_size: int = 100000,
nchar: bytes = b"\n",
):
_res = b""
while True:
Expand All @@ -82,12 +81,12 @@ def _text_chunk_gen(
t = _res + t
try:
t, _res = t.rsplit(
NEWLINE_CHAR.encode(),
nchar,
1,
)
except Exception:
_res = b""
_nlines = t.count(NEWLINE_CHAR.encode())
_nlines = t.count(nchar)
sd = pattern.split(t)
del t
yield _nlines, sd
Expand Down

0 comments on commit 109c982

Please sign in to comment.