Skip to content

Commit

Permalink
feat: remove logger a tracking source can be used instead
Browse files Browse the repository at this point in the history
  • Loading branch information
blacha committed Feb 11, 2022
1 parent 65f2395 commit 151dd08
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 64 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/__test__/chunk.source.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ o.spec('SourceChunk', () => {
});

async function Chunk(chunkId: number): Promise<DataView> {
await source.loadBytes(chunkId * source.chunkSize, source.chunkSize, undefined);
await source.loadBytes(chunkId * source.chunkSize, source.chunkSize);
return source.getView(chunkId as ChunkId);
}

Expand Down
32 changes: 6 additions & 26 deletions packages/core/src/chunk.source.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { ChunkSource } from './source.js';
import { ByteSize } from './bytes.js';
import { LogType } from './log.js';
export type ChunkId = number & { _type: 'chunkId' };

/** Shifting `<< 32` does not work in javascript */
Expand Down Expand Up @@ -29,9 +28,6 @@ interface TinyMap<K, V> {
* This will also handle joining of consecutive requests, even when it is semi consecutive
*/
export abstract class ChunkSourceBase implements ChunkSource {
/** By default record a log of requests made by chunked sources */
static DefaultTrackRequests = false;

/** By default create a new cache for every chunk source */
static DefaultChunkCache = (): TinyMap<number, DataView> => new Map<number, DataView>();

Expand Down Expand Up @@ -95,9 +91,8 @@ export abstract class ChunkSourceBase implements ChunkSource {
*
* @param offset Byte to start reading form
* @param length optional number of bytes to read
* @param log optional logger to track requests with
*/
abstract fetchBytes(offset: number, length?: number, log?: LogType): Promise<ArrayBuffer>;
abstract fetchBytes(offset: number, length?: number): Promise<ArrayBuffer>;

/** Byte size of the file */
abstract size: Promise<number>;
Expand Down Expand Up @@ -150,7 +145,7 @@ export abstract class ChunkSourceBase implements ChunkSource {
return { chunks, blankFill };
}

private async fetchData(logger?: LogType): Promise<void> {
private async fetchData(): Promise<void> {
if (this.toFetch.size === 0) return;
const chunkIds = this.toFetch;
this.toFetch = new Set();
Expand All @@ -160,31 +155,17 @@ export abstract class ChunkSourceBase implements ChunkSource {

const chunkData: ArrayBuffer[] = [];

const startAt = Date.now();
// TODO putting this in a promise queue to do multiple requests
// at a time would be a good idea.
for (const chunkRange of ranges.chunks) {
const firstChunk = chunkRange[0];
const lastChunk = chunkRange[chunkRange.length - 1];
const req = { startAt, requestStartAt: Date.now(), endAt: -1, chunks: chunkRange };

const offset = firstChunk * this.chunkSize;
const length = lastChunk * this.chunkSize + this.chunkSize - offset;

const startTime = Date.now();
const buffer = await this.fetchBytes(offset, length, logger);
req.endAt = Date.now();
logger?.info(
{
uri: this.uri,
source: this.type,
bytes: length,
chunks: chunkRange,
chunkCount: chunkRange.length,
duration: Date.now() - startTime,
},
'FetchChunk',
);
const buffer = await this.fetchBytes(offset, length);

if (chunkRange.length === 1) {
chunkData[firstChunk] = buffer;
this.chunks.set(firstChunk, new DataView(buffer));
Expand All @@ -206,9 +187,8 @@ export abstract class ChunkSourceBase implements ChunkSource {
*
* @param offset byte offset to start reading from
* @param length number of bytes to load
* @param log optional logger to log requests
*/
public async loadBytes(offset: number, length: number, log?: LogType): Promise<void> {
public async loadBytes(offset: number, length: number): Promise<void> {
if (offset < 0) throw new Error('Offset must be positive');
const startChunk = Math.floor(offset / this.chunkSize);
const endChunk = Math.ceil((offset + length) / this.chunkSize) - 1;
Expand All @@ -222,7 +202,7 @@ export abstract class ChunkSourceBase implements ChunkSource {
// Queue a fetch
if (this.toFetchPromise == null) {
this.toFetchPromise = new Promise<void>((resolve) => setNext(resolve, this.delayMs)).then(() => {
return this.fetchData(log);
return this.fetchData();
});
}

Expand Down
1 change: 0 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export { ByteSize } from './bytes.js';
export { ChunkSourceBase } from './chunk.source.js';
export { LogType } from './log.js';
export { SourceMemory } from './chunk.source.memory.js';
export { ChunkSource } from './source.js';
export { ErrorCodes, CompositeError } from './composite.js';
Expand Down
17 changes: 0 additions & 17 deletions packages/core/src/log.ts

This file was deleted.

6 changes: 2 additions & 4 deletions packages/core/src/source.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { ByteSize } from './bytes.js';
import { LogType } from './log.js';

export interface ChunkSource extends DataView {
/** type of the source, @example `file` or `aws:s3` */
Expand All @@ -24,13 +23,12 @@ export interface ChunkSource extends DataView {
*
* @param offset Byte to start reading form
* @param length optional number of bytes to read
* @param log optional logger to track requests with
*/
fetchBytes(offset: number, length?: number, log?: LogType): Promise<ArrayBuffer>;
fetchBytes(offset: number, length?: number): Promise<ArrayBuffer>;
/** are the following bytes loaded into memory */
hasBytes(offset: number, length: number): boolean;
/** Load bytes from a remote source into memory */
loadBytes(offset: number, length: number, log?: LogType): Promise<void>;
loadBytes(offset: number, length: number): Promise<void>;
/** Read bytes out of the sources */
bytes(offset: number, length: number): Uint8Array;
/** Read a uint64 from the source this reduces precision to `number` */
Expand Down
5 changes: 2 additions & 3 deletions packages/source-aws/src/s3.source.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ChunkSource, ChunkSourceBase, CompositeError, isRecord, LogType } from '@chunkd/core';
import { ChunkSource, ChunkSourceBase, CompositeError, isRecord } from '@chunkd/core';
import { S3Like, toPromise } from './type.js';

export function getCompositeError(e: unknown, msg: string): CompositeError {
Expand Down Expand Up @@ -84,7 +84,7 @@ export class SourceAwsS3 extends ChunkSourceBase {
return new SourceAwsS3(res.bucket, res.key, remote);
}

async fetchBytes(offset: number, length?: number, logger?: LogType): Promise<ArrayBuffer> {
async fetchBytes(offset: number, length?: number): Promise<ArrayBuffer> {
const fetchRange = this.toRange(offset, length);
try {
const resp = await this.remote.getObject({ Bucket: this.bucket, Key: this.key, Range: fetchRange }).promise();
Expand All @@ -98,7 +98,6 @@ export class SourceAwsS3 extends ChunkSourceBase {
const buffer = resp.Body;
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength);
} catch (err) {
logger?.error({ err, source: this.name, fetchRange }, 'FailedToFetch');
throw getCompositeError(err, `Failed to fetch ${this.name} ${fetchRange}`);
}
}
Expand Down
14 changes: 2 additions & 12 deletions packages/source-http/src/http.source.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ChunkSource, ChunkSourceBase, LogType } from '@chunkd/core';
import { ChunkSource, ChunkSourceBase } from '@chunkd/core';

export interface FetchLikeOptions {
method?: string;
Expand Down Expand Up @@ -41,7 +41,7 @@ export class SourceHttp extends ChunkSourceBase {
return this._size;
}

async fetchBytes(offset: number, length?: number, logger?: LogType): Promise<ArrayBuffer> {
async fetchBytes(offset: number, length?: number): Promise<ArrayBuffer> {
const Range = this.toRange(offset, length);
const headers = { Range };
const response = await SourceHttp.fetch(this.uri, { headers });
Expand All @@ -53,16 +53,6 @@ export class SourceHttp extends ChunkSourceBase {
}
return response.arrayBuffer();
}
logger?.error(
{
offset,
bytes: length,
status: response.status,
statusText: response.statusText,
url: this.uri,
},
'Failed to fetch',
);

throw new Error('Failed to fetch');
}
Expand Down

0 comments on commit 151dd08

Please sign in to comment.