Skip to content

Commit

Permalink
Merge branch 'feature/non-recursive-split'
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-jayapalan committed Oct 19, 2024
2 parents baea9e5 + 05537a6 commit 3268576
Showing 1 changed file with 66 additions and 65 deletions.
131 changes: 66 additions & 65 deletions src/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,71 +194,72 @@ def _process(self, reader: BytesIO, limit: int,
carryover: bytes = kwargs.get('carryover', None)
header: bytes = kwargs.get('header', None)
manifest: csv.DictWriter = kwargs.get('manifest', None)
processed = 0
splitfilename = self._getnextsplit(splitnum)
splitfile = os.path.join(self.outputdir, splitfilename)
if includeheader and not header:
newline = True
header = reader.readline()
writer = open(splitfile, mode='wb+')
try:
if header:
writer.write(header)
processed += len(header) if splitby == 'size' else 1
if carryover:
writer.write(carryover)
processed += len(carryover) if splitby == 'size' else 1
carryover = None
if splitby == 'size':
buffersize = Split._getreadbuffersize(splitsize=limit)
while 1:
if self.terminate:
log.info('Term flag has been set by the user.')
log.info('Terminating the process.')
break
if newline:
while True:
processed = 0
splitfilename = self._getnextsplit(splitnum)
splitfile = os.path.join(self.outputdir, splitfilename)
if includeheader and not header:
newline = True
header = reader.readline()
writer = open(splitfile, mode='wb+')
try:
if header:
writer.write(header)
processed += len(header) if splitby == 'size' else 1
if carryover:
writer.write(carryover)
processed += len(carryover) if splitby == 'size' else 1
carryover = None
if splitby == 'size':
buffersize = Split._getreadbuffersize(splitsize=limit)
while 1:
if self.terminate:
log.info('Term flag has been set by the user.')
log.info('Terminating the process.')
break
if newline:
chunk = reader.readline()
else:
chunk = reader.read(buffersize)
if not chunk:
break
chunksize = len(chunk)
if processed + chunksize <= limit:
writer.write(chunk)
processed += chunksize
else:
carryover = chunk
break
elif splitby == 'linecount':
while 1:
if self.terminate:
log.info('Term flag has been set by the user.')
log.info('Terminating the process.')
break
chunk = reader.readline()
else:
chunk = reader.read(buffersize)
if not chunk:
break
chunksize = len(chunk)
if processed + chunksize <= limit:
writer.write(chunk)
processed += chunksize
else:
carryover = chunk
break
elif splitby == 'linecount':
while 1:
if self.terminate:
log.info('Term flag has been set by the user.')
log.info('Terminating the process.')
break
chunk = reader.readline()
if not chunk:
break
processed += 1
if processed <= limit:
writer.write(chunk)
else:
carryover = chunk
break
if not chunk:
break
processed += 1
if processed <= limit:
writer.write(chunk)
else:
carryover = chunk
break
else:
raise ValueError('Unsupported split type provided.')
finally:
writer.close()
splitsize = os.path.getsize(splitfile)
if manifest:
manifest.writerow(
{'filename': splitfilename, 'filesize': splitsize, 'header': includeheader})
if callback:
callback(splitfile, splitsize)
if carryover:
splitnum += 1
continue
else:
raise ValueError('Unsupported split type provided.')
finally:
writer.close()
splitsize = os.path.getsize(splitfile)
if manifest:
manifest.writerow(
{'filename': splitfilename, 'filesize': splitsize, 'header': includeheader})
if callback:
callback(splitfile, splitsize)
if carryover:
splitnum += 1
self._process(reader, limit, splitby, newline, includeheader, callback,
splitnum=splitnum, carryover=carryover, header=header,
manifest=manifest)
break

def _endprocess(self):
"""Runs statements that marks the completion of the process
Expand Down Expand Up @@ -336,7 +337,7 @@ def bylinecount(self, linecount: int, includeheader: bool = False,
# th.daemon = True
# th.start()

# #split.bysize(10, includeheader=True, callback=cb)
# split.bylinecount(10000, includeheader=True, callback=cb)
# split.bysize(10000, includeheader=True, callback=cb)
# # split.bylinecount(10000, includeheader=False, callback=cb)

# th.join()

0 comments on commit 3268576

Please sign in to comment.