Skip to content

Commit

Permalink
fix: Integrate wrapStreamError to prevent uncaught errors
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimvh committed Nov 24, 2020
1 parent 1a30b51 commit e418333
Show file tree
Hide file tree
Showing 24 changed files with 112 additions and 108 deletions.
4 changes: 2 additions & 2 deletions src/init/Setup.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import streamifyArray from 'streamify-array';
import type { AclManager } from '../authorization/AclManager';
import { RepresentationMetadata } from '../ldp/representation/RepresentationMetadata';
import type { LoggerFactory } from '../logging/LoggerFactory';
import { getLoggerFor, setGlobalLoggerFactory } from '../logging/LogUtil';
import type { ExpressHttpServerFactory } from '../server/ExpressHttpServerFactory';
import type { ResourceStore } from '../storage/ResourceStore';
import { TEXT_TURTLE } from '../util/ContentTypes';
import { guardedStreamFrom } from '../util/StreamUtil';
import { CONTENT_TYPE } from '../util/UriConstants';

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ export class Setup {
baseAclId,
{
binary: true,
data: streamifyArray([ acl ]),
data: guardedStreamFrom([ acl ]),
metadata,
},
);
Expand Down
3 changes: 2 additions & 1 deletion src/ldp/http/response/OkResponseDescription.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Readable } from 'stream';
import type { Guarded } from '../../../util/GuardedStream';
import type { RepresentationMetadata } from '../../representation/RepresentationMetadata';
import { ResponseDescription } from './ResponseDescription';

Expand All @@ -10,7 +11,7 @@ export class OkResponseDescription extends ResponseDescription {
* @param metadata - Metadata concerning the response.
* @param data - Potential data. @ignored
*/
public constructor(metadata: RepresentationMetadata, data?: Readable) {
public constructor(metadata: RepresentationMetadata, data?: Guarded<Readable>) {
super(200, metadata, data);
}
}
5 changes: 3 additions & 2 deletions src/ldp/http/response/ResponseDescription.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Readable } from 'stream';
import type { Guarded } from '../../../util/GuardedStream';
import type { RepresentationMetadata } from '../../representation/RepresentationMetadata';

/**
Expand All @@ -7,14 +8,14 @@ import type { RepresentationMetadata } from '../../representation/Representation
export class ResponseDescription {
public readonly statusCode: number;
public readonly metadata?: RepresentationMetadata;
public readonly data?: Readable;
public readonly data?: Guarded<Readable>;

/**
* @param statusCode - Status code to return.
* @param metadata - Metadata corresponding to the response (and data potentially).
* @param data - Data that needs to be returned. @ignored
*/
public constructor(statusCode: number, metadata?: RepresentationMetadata, data?: Readable) {
public constructor(statusCode: number, metadata?: RepresentationMetadata, data?: Guarded<Readable>) {
this.statusCode = statusCode;
this.metadata = metadata;
this.data = data;
Expand Down
3 changes: 2 additions & 1 deletion src/ldp/representation/Representation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Readable } from 'stream';
import type { Guarded } from '../../util/GuardedStream';
import type { RepresentationMetadata } from './RepresentationMetadata';

/**
Expand All @@ -12,7 +13,7 @@ export interface Representation {
/**
* The raw data stream for this representation.
*/
data: Readable;
data: Guarded<Readable>;
/**
* Whether the data stream consists of binary/string chunks
* (as opposed to complex objects).
Expand Down
3 changes: 2 additions & 1 deletion src/server/ExpressHttpServerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import cors from 'cors';
import type { Express } from 'express';
import express from 'express';
import { getLoggerFor } from '../logging/LogUtil';
import { guardStream } from '../util/GuardedStream';
import type { HttpHandler } from './HttpHandler';
import type { HttpServerFactory } from './HttpServerFactory';

Expand Down Expand Up @@ -40,7 +41,7 @@ export class ExpressHttpServerFactory implements HttpServerFactory {
app.use(async(request, response, done): Promise<void> => {
try {
this.logger.info(`Received request for ${request.url}`);
await this.handler.handleSafe({ request, response });
await this.handler.handleSafe({ request: guardStream(request), response });
} catch (error: unknown) {
const errMsg = error instanceof Error ? `${error.name}: ${error.message}\n${error.stack}` : 'Unknown error.';
this.logger.error(errMsg);
Expand Down
3 changes: 2 additions & 1 deletion src/server/HttpRequest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { IncomingMessage } from 'http';
import type { Guarded } from '../util/GuardedStream';

/**
* An incoming HTTP request;
*/
export type HttpRequest = IncomingMessage;
export type HttpRequest = Guarded<IncomingMessage>;
9 changes: 5 additions & 4 deletions src/storage/DataAccessorBasedStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Readable } from 'stream';
import { DataFactory } from 'n3';
import type { Quad } from 'rdf-js';
import streamifyArray from 'streamify-array';
import { v4 as uuid } from 'uuid';
import type { Representation } from '../ldp/representation/Representation';
import { RepresentationMetadata } from '../ldp/representation/RepresentationMetadata';
Expand All @@ -12,6 +11,7 @@ import { MethodNotAllowedHttpError } from '../util/errors/MethodNotAllowedHttpEr
import { NotFoundHttpError } from '../util/errors/NotFoundHttpError';
import { NotImplementedError } from '../util/errors/NotImplementedError';
import { UnsupportedHttpError } from '../util/errors/UnsupportedHttpError';
import type { Guarded } from '../util/GuardedStream';
import {
ensureTrailingSlash,
getParentContainer,
Expand All @@ -21,6 +21,7 @@ import {
} from '../util/PathUtil';
import { parseQuads } from '../util/QuadUtil';
import { generateResourceQuads } from '../util/ResourceUtil';
import { guardedStreamFrom } from '../util/StreamUtil';
import { CONTENT_TYPE, HTTP, LDP, RDF } from '../util/UriConstants';
import type { DataAccessor } from './accessors/DataAccessor';
import type { ResourceStore } from './ResourceStore';
Expand Down Expand Up @@ -70,9 +71,9 @@ export class DataAccessorBasedStore implements ResourceStore {
metadata.contentType = INTERNAL_QUADS;
result = {
binary: false,
get data(): Readable {
get data(): Guarded<Readable> {
// This allows other modules to still add metadata before the output data is written
return streamifyArray(result.metadata.quads());
return guardedStreamFrom(result.metadata.quads());
},
metadata,
};
Expand Down Expand Up @@ -365,7 +366,7 @@ export class DataAccessorBasedStore implements ResourceStore {
protected getEmptyContainerRepresentation(container: ResourceIdentifier): Representation {
return {
binary: true,
data: streamifyArray([]),
data: guardedStreamFrom([]),
metadata: new RepresentationMetadata(container.path),
};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/LockingResourceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Representation } from '../ldp/representation/Representation';
import type { RepresentationPreferences } from '../ldp/representation/RepresentationPreferences';
import type { ResourceIdentifier } from '../ldp/representation/ResourceIdentifier';
import { getLoggerFor } from '../logging/LogUtil';
import type { Guarded } from '../util/GuardedStream';
import type { AtomicResourceStore } from './AtomicResourceStore';
import type { Conditions } from './Conditions';
import type { ExpiringLock } from './ExpiringLock';
Expand Down Expand Up @@ -118,7 +119,7 @@ export class LockingResourceStore implements AtomicResourceStore {
* @param source - The readable to wrap
* @param lock - The lock for the corresponding identifier.
*/
protected createExpiringReadable(source: Readable, lock: ExpiringLock): Readable {
protected createExpiringReadable(source: Guarded<Readable>, lock: ExpiringLock): Readable {
// Destroy the source when a timeout occurs.
lock.on('expired', (): void => {
source.destroy(new Error(`Stream reading timout exceeded`));
Expand Down
6 changes: 4 additions & 2 deletions src/storage/accessors/DataAccessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Readable } from 'stream';
import type { Representation } from '../../ldp/representation/Representation';
import type { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
import type { Guarded } from '../../util/GuardedStream';

/**
* A DataAccessor is the building block closest to the actual data storage.
Expand All @@ -27,7 +28,7 @@ export interface DataAccessor {
* It can be assumed that the incoming identifier will always correspond to a document.
* @param identifier - Identifier for which the data is requested.
*/
getData: (identifier: ResourceIdentifier) => Promise<Readable>;
getData: (identifier: ResourceIdentifier) => Promise<Guarded<Readable>>;

/**
* Returns the metadata corresponding to the identifier.
Expand All @@ -42,7 +43,8 @@ export interface DataAccessor {
* @param data - Data to store.
* @param metadata - Metadata to store.
*/
writeDocument: (identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata) => Promise<void>;
writeDocument: (identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata) =>
Promise<void>;

/**
* Writes metadata for a container.
Expand Down
10 changes: 6 additions & 4 deletions src/storage/accessors/FileDataAccessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { ConflictHttpError } from '../../util/errors/ConflictHttpError';
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import { isSystemError } from '../../util/errors/SystemError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { guardStream } from '../../util/GuardedStream';
import type { Guarded } from '../../util/GuardedStream';
import { isContainerIdentifier } from '../../util/PathUtil';
import { parseQuads, pushQuad, serializeQuads } from '../../util/QuadUtil';
import { generateContainmentQuads, generateResourceQuads } from '../../util/ResourceUtil';
Expand Down Expand Up @@ -45,12 +47,12 @@ export class FileDataAccessor implements DataAccessor {
* Will return data stream directly to the file corresponding to the resource.
* Will throw NotFoundHttpError if the input is a container.
*/
public async getData(identifier: ResourceIdentifier): Promise<Readable> {
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
const link = await this.resourceMapper.mapUrlToFilePath(identifier);
const stats = await this.getStats(link.filePath);

if (stats.isFile()) {
return createReadStream(link.filePath);
return guardStream(createReadStream(link.filePath));
}

throw new NotFoundHttpError();
Expand All @@ -76,7 +78,7 @@ export class FileDataAccessor implements DataAccessor {
* Writes the given data as a file (and potential metadata as additional file).
* The metadata file will be written first and will be deleted if something goes wrong writing the actual data.
*/
public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
Promise<void> {
if (this.isMetadataPath(identifier.path)) {
throw new ConflictHttpError('Not allowed to create files with the metadata extension.');
Expand Down Expand Up @@ -264,7 +266,7 @@ export class FileDataAccessor implements DataAccessor {
// Check if the metadata file exists first
await fsPromises.lstat(metadataPath);

const readMetadataStream = createReadStream(metadataPath);
const readMetadataStream = guardStream(createReadStream(metadataPath));
return await parseQuads(readMetadataStream);
} catch (error: unknown) {
// Metadata file doesn't exist so lets keep `rawMetaData` an empty array.
Expand Down
31 changes: 6 additions & 25 deletions src/storage/accessors/InMemoryDataAccessor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Readable } from 'stream';
import type { Readable } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { DataFactory } from 'n3';
import type { NamedNode } from 'rdf-js';
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import type { Guarded } from '../../util/GuardedStream';
import { ensureTrailingSlash, isContainerIdentifier } from '../../util/PathUtil';
import { generateContainmentQuads, generateResourceQuads } from '../../util/ResourceUtil';
import { guardedStreamFrom } from '../../util/StreamUtil';
import type { DataAccessor } from './DataAccessor';

interface DataEntry {
Expand All @@ -19,27 +21,6 @@ interface ContainerEntry {
}
type CacheEntry = DataEntry | ContainerEntry;

class ArrayReadable extends Readable {
private readonly data: any[];
private idx: number;

public constructor(data: any[]) {
super({ objectMode: true });
this.data = data;
this.idx = 0;
}

// eslint-disable-next-line @typescript-eslint/naming-convention
public _read(): void {
if (this.idx < this.data.length) {
this.push(this.data[this.idx]);
this.idx += 1;
} else {
this.push(null);
}
}
}

export class InMemoryDataAccessor implements DataAccessor {
private readonly base: string;
private readonly store: ContainerEntry;
Expand All @@ -56,12 +37,12 @@ export class InMemoryDataAccessor implements DataAccessor {
// All data is supported since streams never get read, only copied
}

public async getData(identifier: ResourceIdentifier): Promise<Readable> {
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
const entry = this.getEntry(identifier);
if (!this.isDataEntry(entry)) {
throw new NotFoundHttpError();
}
return new ArrayReadable(entry.data);
return guardedStreamFrom(entry.data);
}

public async getMetadata(identifier: ResourceIdentifier): Promise<RepresentationMetadata> {
Expand All @@ -72,7 +53,7 @@ export class InMemoryDataAccessor implements DataAccessor {
return this.generateMetadata(identifier, entry);
}

public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
Promise<void> {
const { parent, name } = this.getParentEntry(identifier);
parent.entries[name] = {
Expand Down
12 changes: 7 additions & 5 deletions src/storage/accessors/SparqlDataAccessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { ConflictHttpError } from '../../util/errors/ConflictHttpError';
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { guardStream } from '../../util/GuardedStream';
import type { Guarded } from '../../util/GuardedStream';
import { ensureTrailingSlash, getParentContainer, isContainerIdentifier } from '../../util/PathUtil';
import { generateResourceQuads } from '../../util/ResourceUtil';
import { CONTENT_TYPE, LDP } from '../../util/UriConstants';
Expand Down Expand Up @@ -70,9 +72,9 @@ export class SparqlDataAccessor implements DataAccessor {
* Returns all triples stored for the corresponding identifier.
* Note that this will not throw a 404 if no results were found.
*/
public async getData(identifier: ResourceIdentifier): Promise<Readable> {
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
const name = namedNode(identifier.path);
return this.sendSparqlConstruct(this.sparqlConstruct(name));
return await this.sendSparqlConstruct(this.sparqlConstruct(name));
}

/**
Expand Down Expand Up @@ -114,7 +116,7 @@ export class SparqlDataAccessor implements DataAccessor {
/**
* Reads the given data stream and stores it together with the metadata.
*/
public async writeDocument(identifier: ResourceIdentifier, data: Readable, metadata: RepresentationMetadata):
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
Promise<void> {
if (this.isMetadataIdentifier(identifier)) {
throw new ConflictHttpError('Not allowed to create NamedNodes with the metadata extension.');
Expand Down Expand Up @@ -292,11 +294,11 @@ export class SparqlDataAccessor implements DataAccessor {
* Sends a SPARQL CONSTRUCT query to the endpoint and returns a stream of quads.
* @param sparqlQuery - Query to execute.
*/
private async sendSparqlConstruct(sparqlQuery: ConstructQuery): Promise<Readable> {
private async sendSparqlConstruct(sparqlQuery: ConstructQuery): Promise<Guarded<Readable>> {
const query = this.generator.stringify(sparqlQuery);
this.logger.info(`Sending SPARQL CONSTRUCT query to ${this.endpoint}: ${query}`);
try {
return await this.fetcher.fetchTriples(this.endpoint, query);
return guardStream(await this.fetcher.fetchTriples(this.endpoint, query));
} catch (error: unknown) {
if (error instanceof Error) {
this.logger.error(`SPARQL endpoint ${this.endpoint} error: ${error.message}`);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/conversion/QuadToRdfConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Representation } from '../../ldp/representation/Representation';
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
import type { RepresentationPreferences } from '../../ldp/representation/RepresentationPreferences';
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { guardStream } from '../../util/GuardedStream';
import { CONTENT_TYPE } from '../../util/UriConstants';
import { validateRequestArgs, matchingTypes } from './ConversionUtil';
import type { RepresentationConverterArgs } from './RepresentationConverter';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class QuadToRdfConverter extends TypedRepresentationConverter {
const metadata = new RepresentationMetadata(quads.metadata, { [CONTENT_TYPE]: contentType });
return {
binary: true,
data: rdfSerializer.serialize(quads.data, { contentType }) as Readable,
data: guardStream(rdfSerializer.serialize(quads.data, { contentType }) as Readable),
metadata,
};
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/patch/SparqlUpdatePatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdenti
import { getLoggerFor } from '../../logging/LogUtil';
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { UnsupportedHttpError } from '../../util/errors/UnsupportedHttpError';
import { guardStream } from '../../util/GuardedStream';
import { CONTENT_TYPE } from '../../util/UriConstants';
import type { ResourceLocker } from '../ResourceLocker';
import type { ResourceStore } from '../ResourceStore';
Expand Down Expand Up @@ -77,7 +78,7 @@ export class SparqlUpdatePatchHandler extends PatchHandler {
const metadata = new RepresentationMetadata(input.identifier.path, { [CONTENT_TYPE]: INTERNAL_QUADS });
const representation: Representation = {
binary: false,
data: store.match() as Readable,
data: guardStream(store.match() as Readable),
metadata,
};
await this.source.setRepresentation(input.identifier, representation);
Expand Down
Empty file added src/util/MetadataController.ts
Empty file.
Loading

0 comments on commit e418333

Please sign in to comment.