Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BufferTokenizer #526

Merged
merged 1 commit into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions lib/AbstractTokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ export abstract class AbstractTokenizer implements ITokenizer {

/**
* Peek (read ahead) buffer from tokenizer
* @param buffer - Target buffer to fill with data peek from the tokenizer-stream
* @param uint8Array- Target buffer to fill with data peek from the tokenizer-stream
* @param options - Peek behaviour options
* @returns Promise with number of bytes read
*/
public abstract peekBuffer(buffer: Uint8Array, options?: IReadChunkOptions): Promise<number>;
public abstract peekBuffer(uint8Array: Uint8Array, options?: IReadChunkOptions): Promise<number>;

/**
* Read a token from the tokenizer-stream
* @param token - The token to read
* @param position - If provided, the desired position in the tokenizer-stream
* @returns Promise with token data
*/
public async readToken<Value>(token: IGetToken<Value>, position?: number): Promise<Value> {
public async readToken<Value>(token: IGetToken<Value>, position: number = this.position): Promise<Value> {
const uint8Array = Buffer.alloc(token.len);
const len = await this.readBuffer(uint8Array, {position});
if (len < token.len)
Expand Down Expand Up @@ -89,14 +89,32 @@ export abstract class AbstractTokenizer implements ITokenizer {
}

/**
* Ignore number of bytes, advances the pointer in under tokenizer-stream.
* @param length - Number of bytes to skip (ignore)
* @return actual number of bytes ignored
* Ignore number of bytes, advances the pointer in under tokenizer-stream.
* @param length - Number of bytes to ignore
* @return resolves the number of bytes ignored, equals length if this available, otherwise the number of bytes available
*/
public abstract ignore(length: number): Promise<number>;
public async ignore(length: number): Promise<number> {
const bytesLeft = this.fileInfo.size - this.position;
if (length <= bytesLeft) {
this.position += length;
return length;
} else {
this.position += bytesLeft;
return bytesLeft;
}
}

public async close(): Promise<void> {
// empty
}

protected normalizeOptions(uint8Array: Uint8Array, options?: IReadChunkOptions): IReadChunkOptions {
options = {
offset: 0,
length: uint8Array.length - ((options && options.offset) ? options.offset : 0),
position: this.position,
...options
};
return options;
}
}
95 changes: 14 additions & 81 deletions lib/BufferTokenizer.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
import { IFileInfo, IReadChunkOptions, ITokenizer } from './types';
import { IFileInfo, IReadChunkOptions } from './types';
import { EndOfStreamError } from 'peek-readable';
import { IGetToken, IToken } from '@tokenizer/token';
import { AbstractTokenizer } from './AbstractTokenizer';

export class BufferTokenizer implements ITokenizer {

public fileInfo: IFileInfo;
public position: number = 0;
export class BufferTokenizer extends AbstractTokenizer {

/**
* Construct BufferTokenizer
* @param buffer - Buffer to tokenize
* @param uint8Array - Uint8Array to tokenize
* @param fileInfo - Pass additional file information to the tokenizer
*/
constructor(private uint8Array: Uint8Array, fileInfo?: IFileInfo) {
this.fileInfo = fileInfo ? fileInfo : {};
super(fileInfo);
this.fileInfo.size = this.fileInfo.size ? this.fileInfo.size : uint8Array.length;
}

/**
* Read buffer from tokenizer
* @param buffer
* @param uint8Array - Uint8Array to tokenize
* @param options - Read behaviour options
* @returns {Promise<number>}
*/
public async readBuffer(buffer: Uint8Array, options?: IReadChunkOptions): Promise<number> {
public async readBuffer(uint8Array: Uint8Array, options?: IReadChunkOptions): Promise<number> {

if (options && options.position) {
if (options.position < this.position) {
Expand All @@ -32,10 +29,9 @@ export class BufferTokenizer implements ITokenizer {
this.position = options.position;
}

return this.peekBuffer(buffer, options).then(bytesRead => {
this.position += bytesRead;
return bytesRead;
});
const bytesRead = await this.peekBuffer(uint8Array, options);
this.position += bytesRead;
return bytesRead;
}

/**
Expand All @@ -46,80 +42,17 @@ export class BufferTokenizer implements ITokenizer {
*/
public async peekBuffer(uint8Array: Uint8Array, options?: IReadChunkOptions): Promise<number> {

let offset = 0;
let length = uint8Array.length;
let position = this.position;

if (options) {
if (options.position) {
if (options.position < this.position) {
throw new Error('`options.position` can be less than `tokenizer.position`');
}
position = options.position;
}
if (Number.isInteger(options.length)) {
length = options.length;
} else {
length -= options.offset || 0;
}
if (options.offset) {
offset = options.offset;
}
}

if (length === 0) {
return Promise.resolve(0);
}
options = this.normalizeOptions(uint8Array, options);

position = position || this.position;
if (!length) {
length = uint8Array.length;
}
const bytes2read = Math.min(this.uint8Array.length - position, length);
if ((!options || !options.mayBeLess) && bytes2read < length) {
const bytes2read = Math.min(this.uint8Array.length - options.position, options.length);
if ((!options.mayBeLess) && bytes2read < options.length) {
throw new EndOfStreamError();
} else {
uint8Array.set(this.uint8Array.subarray(position, position + bytes2read), offset);
uint8Array.set(this.uint8Array.subarray(options.position, options.position + bytes2read), options.offset);
return bytes2read;
}
}

public async readToken<T>(token: IGetToken<T>, position?: number): Promise<T> {
this.position = position || this.position;
try {
const tv = this.peekToken(token, this.position);
this.position += token.len;
return tv;
} catch (err) {
this.position += this.uint8Array.length - position;
throw err;
}
}

public async peekToken<T>(token: IGetToken<T>, position: number = this.position): Promise<T> {
if (this.uint8Array.length - position < token.len) {
throw new EndOfStreamError();
}
return token.get(this.uint8Array, position);
}

public async readNumber(token: IToken<number>): Promise<number> {
return this.readToken(token);
}

public async peekNumber(token: IToken<number>): Promise<number> {
return this.peekToken(token);
}

/**
* @return actual number of bytes ignored
*/
public async ignore(length: number): Promise<number> {
const bytesIgnored = Math.min(this.uint8Array.length - this.position, length);
this.position += bytesIgnored;
return bytesIgnored;
}

public async close(): Promise<void> {
// empty
}
Expand Down
48 changes: 5 additions & 43 deletions lib/FileTokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,59 +50,21 @@ export class FileTokenizer extends AbstractTokenizer {

/**
* Peek buffer from file
* @param buffer
* @param uint8Array - Uint8Array (or Buffer) to write data to
* @param options - Read behaviour options
* @returns Promise number of bytes read
*/
public async peekBuffer(buffer: Uint8Array, options?: IReadChunkOptions): Promise<number> {

let offset = 0;
let length = buffer.length;
let position = this.position;

if (options) {
if (options.position) {
if (options.position < this.position) {
throw new Error('`options.position` must be equal or greater than `tokenizer.position`');
}
position = options.position;
}
if (Number.isInteger(options.length)) {
length = options.length;
} else {
length -= options.offset || 0;
}
if (options.offset) {
offset = options.offset;
}
}
public async peekBuffer(uint8Array: Uint8Array, options?: IReadChunkOptions): Promise<number> {

if (length === 0) {
return Promise.resolve(0);
}
options = this.normalizeOptions(uint8Array, options);

const res = await fs.read(this.fd, buffer, offset, length, position);
if ((!options || !options.mayBeLess) && res.bytesRead < length) {
const res = await fs.read(this.fd, uint8Array, options.offset, options.length, options.position);
if ((!options.mayBeLess) && res.bytesRead < options.length) {
throw new EndOfStreamError();
}
return res.bytesRead;
}

/**
* @param length - Number of bytes to ignore
* @return resolves the number of bytes ignored, equals length if this available, otherwise the number of bytes available
*/
public async ignore(length: number): Promise<number> {
const bytesLeft = this.fileInfo.size - this.position;
if (length <= bytesLeft) {
this.position += length;
return length;
} else {
this.position += bytesLeft;
return bytesLeft;
}
}

public async close(): Promise<void> {
return fs.close(this.fd);
}
Expand Down
65 changes: 26 additions & 39 deletions lib/ReadStreamTokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,54 +72,41 @@ export class ReadStreamTokenizer extends AbstractTokenizer {

/**
* Peek (read ahead) buffer from tokenizer
* @param buffer - Target buffer to write the data read to
* @param uint8Array - Uint8Array (or Buffer) to write data to
* @param options - Read behaviour options
* @returns Promise with number of bytes peeked
*/
public async peekBuffer(buffer: Uint8Array, options?: IReadChunkOptions): Promise<number> {

// const _offset = position ? position : this.position;
// debug(`peek ${_offset}...${_offset + length - 1}`);

let offset = 0;
let bytesRead: number;
let length = buffer.length;
if (options) {

if (options.offset) {
offset = options.offset;
}

if (Number.isInteger(options.length)) {
length = options.length;
} else {
length -= options.offset || 0;
public async peekBuffer(uint8Array: Uint8Array, options?: IReadChunkOptions): Promise<number> {

options = this.normalizeOptions(uint8Array, options);
let bytesRead = 0;

if (options.position) {
const skipBytes = options.position - this.position;
if (skipBytes > 0) {
const skipBuffer = new Uint8Array(options.length + skipBytes);
bytesRead = await this.peekBuffer(skipBuffer, {mayBeLess: options.mayBeLess});
uint8Array.set(skipBuffer.subarray(skipBytes), options.offset);
return bytesRead - skipBytes;
} else if (skipBytes < 0) {
throw new Error('Cannot peek from a negative offset in a stream');
}
}

if (options.position) {
const skipBytes = options.position - this.position;
if (skipBytes > 0) {
const skipBuffer = new Uint8Array(length + skipBytes);
bytesRead = await this.peekBuffer(skipBuffer, {mayBeLess: options.mayBeLess});
buffer.set(skipBuffer.subarray(skipBytes), offset);
return bytesRead - skipBytes;
} else if (skipBytes < 0) {
throw new Error('Cannot peek from a negative offset in a stream');
if (options.length > 0) {
try {
bytesRead = await this.streamReader.peek(uint8Array, options.offset, options.length);
} catch (err) {
if (options && options.mayBeLess && err instanceof EndOfStreamError) {
return 0;
}
throw err;
}
}

try {
bytesRead = await this.streamReader.peek(buffer, offset, length);
} catch (err) {
if (options && options.mayBeLess && err instanceof EndOfStreamError) {
return 0;
if ((!options.mayBeLess) && bytesRead < options.length) {
throw new EndOfStreamError();
}
throw err;
}
if ((!options || !options.mayBeLess) && bytesRead < length) {
throw new EndOfStreamError();
}

return bytesRead;
}

Expand Down