diff --git a/src/split.py b/src/split.py index f8917cc..4471320 100644 --- a/src/split.py +++ b/src/split.py @@ -194,71 +194,72 @@ def _process(self, reader: BytesIO, limit: int, carryover: bytes = kwargs.get('carryover', None) header: bytes = kwargs.get('header', None) manifest: csv.DictWriter = kwargs.get('manifest', None) - processed = 0 - splitfilename = self._getnextsplit(splitnum) - splitfile = os.path.join(self.outputdir, splitfilename) - if includeheader and not header: - newline = True - header = reader.readline() - writer = open(splitfile, mode='wb+') - try: - if header: - writer.write(header) - processed += len(header) if splitby == 'size' else 1 - if carryover: - writer.write(carryover) - processed += len(carryover) if splitby == 'size' else 1 - carryover = None - if splitby == 'size': - buffersize = Split._getreadbuffersize(splitsize=limit) - while 1: - if self.terminate: - log.info('Term flag has been set by the user.') - log.info('Terminating the process.') - break - if newline: + while True: + processed = 0 + splitfilename = self._getnextsplit(splitnum) + splitfile = os.path.join(self.outputdir, splitfilename) + if includeheader and not header: + newline = True + header = reader.readline() + writer = open(splitfile, mode='wb+') + try: + if header: + writer.write(header) + processed += len(header) if splitby == 'size' else 1 + if carryover: + writer.write(carryover) + processed += len(carryover) if splitby == 'size' else 1 + carryover = None + if splitby == 'size': + buffersize = Split._getreadbuffersize(splitsize=limit) + while 1: + if self.terminate: + log.info('Term flag has been set by the user.') + log.info('Terminating the process.') + break + if newline: + chunk = reader.readline() + else: + chunk = reader.read(buffersize) + if not chunk: + break + chunksize = len(chunk) + if processed + chunksize <= limit: + writer.write(chunk) + processed += chunksize + else: + carryover = chunk + break + elif splitby == 'linecount': + while 1: + if self.terminate: + log.info('Term flag has been set by the user.') + log.info('Terminating the process.') + break chunk = reader.readline() - else: - chunk = reader.read(buffersize) - if not chunk: - break - chunksize = len(chunk) - if processed + chunksize <= limit: - writer.write(chunk) - processed += chunksize - else: - carryover = chunk - break - elif splitby == 'linecount': - while 1: - if self.terminate: - log.info('Term flag has been set by the user.') - log.info('Terminating the process.') - break - chunk = reader.readline() - if not chunk: - break - processed += 1 - if processed <= limit: - writer.write(chunk) - else: - carryover = chunk - break + if not chunk: + break + processed += 1 + if processed <= limit: + writer.write(chunk) + else: + carryover = chunk + break + else: + raise ValueError('Unsupported split type provided.') + finally: + writer.close() + splitsize = os.path.getsize(splitfile) + if manifest: + manifest.writerow( + {'filename': splitfilename, 'filesize': splitsize, 'header': includeheader}) + if callback: + callback(splitfile, splitsize) + if carryover: + splitnum += 1 + continue else: - raise ValueError('Unsupported split type provided.') - finally: - writer.close() - splitsize = os.path.getsize(splitfile) - if manifest: - manifest.writerow( - {'filename': splitfilename, 'filesize': splitsize, 'header': includeheader}) - if callback: - callback(splitfile, splitsize) - if carryover: - splitnum += 1 - self._process(reader, limit, splitby, newline, includeheader, callback, - splitnum=splitnum, carryover=carryover, header=header, - manifest=manifest) + break def _endprocess(self): """Runs statements that marks the completion of the process @@ -336,7 +337,7 @@ def bylinecount(self, linecount: int, includeheader: bool = False, # th.daemon = True # th.start() -# #split.bysize(10, includeheader=True, callback=cb) -# split.bylinecount(10000, includeheader=True, callback=cb) +# split.bysize(10000, includeheader=True, callback=cb) +# # split.bylinecount(10000, includeheader=False, callback=cb) # th.join()