Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Specify common types TDE-1030 #849

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
25f890c
refactor: Specify common branded types
l0b0 Feb 1, 2024
243ceeb
docs: Explain branded types
l0b0 Feb 1, 2024
a119ba7
chore: Remove unused special case for JSON input
l0b0 Feb 2, 2024
907f18a
refactor: Use built-in `URL` type
l0b0 Feb 2, 2024
8f8c146
Merge remote-tracking branch 'origin/master' into refactor/types
l0b0 Feb 6, 2024
8d716c3
fix: Revert wrong use of base64url encoding
l0b0 Feb 7, 2024
3fe4a21
refactor: Re-use `UrlParser`
l0b0 Feb 7, 2024
9ca2b9e
fix: Handle local path in `$ACTION_PATH`
l0b0 Feb 7, 2024
aeeb40a
refactor: Use `UrlParser` instead of `tryParseUrl`
l0b0 Feb 7, 2024
48f5b2d
refactor: Use `fsa.join` instead of `normaliseHref`
l0b0 Feb 7, 2024
cf7aad3
fix: Use `URL` for STAC URL concatenation
l0b0 Feb 7, 2024
ab8603d
build(deps): Bump the chunkd group with 3 updates
dependabot[bot] Feb 1, 2024
73e08c1
fix: Work with latest chunkd update
l0b0 Feb 7, 2024
e0f9241
refactor: fixup dependencies
blacha Feb 7, 2024
a5aa796
build: remove esno as its not used
blacha Feb 7, 2024
f341b8f
refactor: fixup linting
blacha Feb 7, 2024
f4f916d
refactor: apply linter
blacha Feb 7, 2024
67a8bc0
refactor: correct import locations
blacha Feb 7, 2024
1eeb1cc
refactor: createTiff is no longer needed
blacha Feb 7, 2024
804b355
refactor: fixup tests for filter
blacha Feb 7, 2024
86cfa0d
refactor: correct sample.json loading
blacha Feb 7, 2024
40581de
refactor: missed a few tests
blacha Feb 7, 2024
2cd22d5
fix: correct logic of relative test
blacha Feb 7, 2024
c6dc92b
Merge remote-tracking branch 'origin/master' into refactor/types
l0b0 Feb 8, 2024
fa98b63
Update src/commands/copy/copy.ts
l0b0 Feb 8, 2024
1f36e73
refactor: Join imports
l0b0 Feb 8, 2024
3b5eb17
fix: Use absolute path for file URL
l0b0 Feb 12, 2024
dcd9868
chore: Uncomment test
l0b0 Feb 12, 2024
f09b7d3
chore: Remove redundant lookup of `URL.href`
l0b0 Feb 12, 2024
5306cc8
refactor: Use existing `fsa.toUrl`
l0b0 Feb 12, 2024
273235e
refactor: Simplify by using synchronous URL parser
l0b0 Feb 12, 2024
cc9ddb1
Merge remote-tracking branch 'origin/master' into refactor/types
l0b0 Feb 12, 2024
87c913f
fix: Pass plain strings to workers
l0b0 Feb 13, 2024
894218c
Update src/commands/basemaps-mapsheet/create-mapsheet.ts
l0b0 Feb 13, 2024
1a26e94
refactor: Let `URL` constructor deal with relative `path`s
l0b0 Feb 13, 2024
767edc0
fix: Syntax and type
l0b0 Feb 13, 2024
fc92748
Merge remote-tracking branch 'origin/master' into refactor/types
l0b0 Feb 19, 2024
ffd1d02
refactor: Simplify URL generation
l0b0 Feb 19, 2024
8191600
refactor: Create URL with library method
l0b0 Feb 19, 2024
79737fc
Merge remote-tracking branch 'origin/master' into refactor/types
l0b0 Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/commands/basemaps-mapsheet/create-mapsheet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { gunzip } from 'zlib';

import { CliInfo } from '../../cli.info.js';
import { logger } from '../../log.js';
import { UrlParser } from '../../utils/parsers.js';
import { registerCli, verbose } from '../common.js';

const gunzipProm = promisify(gunzip);
Expand All @@ -22,9 +23,9 @@ export function isGzip(b: Buffer): boolean {
*
* If the file ends with .gz or is a GZIP like {@link isGzip} file it will automatically be decompressed.
*/
async function readConfig(config: string): Promise<ConfigBundled> {
const obj = await fsa.read(config);
if (config.endsWith('.gz') || isGzip(obj)) {
async function readConfig(config: URL): Promise<ConfigBundled> {
const obj = await fsa.read(config.href);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not safe to use .href here. If its a file url it has to be converted with path.fileURLToPath.

does fsa.read not accept a URL? would imply fsa is out of date?

Copy link
Contributor Author

@l0b0 l0b0 Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not safe to use .href here. If its a file url it has to be converted with path.fileURLToPath.

I'd rather upgrade fsa, but:

does fsa.read not accept a URL? would imply fsa is out of date?

Yep; that PR needs a bunch of fixes, though.

if (config.href.endsWith('.gz') || isGzip(obj)) {
const data = await gunzipProm(obj);
return JSON.parse(data.toString());
}
Expand All @@ -39,17 +40,17 @@ interface Output {
export const CommandCreateMapSheetArgs = {
verbose,
path: option({
type: string,
type: UrlParser,
long: 'path',
description: 'Path of flatgeobuf, this can be both a local path or s3 location',
}),
bmConfig: option({
type: string,
type: UrlParser,
long: 'bm-config',
description: 'Path of basemaps config json, this can be both a local path or s3 location',
}),
output: option({
type: string,
type: UrlParser,
long: 'output',
description: 'Output of the mapsheet file',
}),
Expand Down Expand Up @@ -80,7 +81,7 @@ export const basemapsCreateMapSheet = command({
const exclude = args.exclude ? new RegExp(args.exclude.toLowerCase(), 'i') : undefined;

logger.info({ path }, 'MapSheet:LoadFgb');
const buf = await fsa.read(path);
const buf = await fsa.read(path.href);
logger.info({ config }, 'MapSheet:LoadConfig');
const configJson = await readConfig(config);
const mem = ConfigProviderMemory.fromJson(configJson);
Expand All @@ -95,7 +96,7 @@ export const basemapsCreateMapSheet = command({
const outputs = await createMapSheet(aerial, mem, rest, include, exclude);

logger.info({ outputPath }, 'MapSheet:WriteOutput');
const outputWritePromise = fsa.write(outputPath, JSON.stringify(outputs, null, 2));
const outputWritePromise = fsa.write(outputPath.href, JSON.stringify(outputs, null, 2));

await Promise.all([featuresWritePromise, outputWritePromise]);
},
Expand Down
24 changes: 12 additions & 12 deletions src/commands/copy/__test__/copy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ describe('copyFiles', () => {
id: '1',
manifest: [
{
source: 'memory://source/topographic.json',
target: 'memory://target/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/topographic.json'),
},
{
source: 'memory://source/foo/bar/topographic.png',
target: 'memory://target/topographic.png',
source: new URL('memory://source/foo/bar/topographic.png'),
target: new URL('memory://target/topographic.png'),
},
],
start: 0,
Expand Down Expand Up @@ -77,12 +77,12 @@ describe('copyFiles', () => {
id: '1',
manifest: [
{
source: 'memory://source/topographic.json',
target: 'memory://target/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/topographic.json'),
},
{
source: 'memory://source/foo/bar/topographic.tiff',
target: 'memory://target/topographic.tiff',
source: new URL('memory://source/foo/bar/topographic.tiff'),
target: new URL('memory://target/topographic.tiff'),
},
],
start: 0,
Expand Down Expand Up @@ -121,12 +121,12 @@ describe('copyFiles', () => {
id: '1',
manifest: [
{
source: 'memory://source/topographic.json',
target: 'memory://target/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/topographic.json'),
},
{
source: 'memory://source/foo/bar/topographic.tiff',
target: 'memory://target/topographic.tiff',
source: new URL('memory://source/foo/bar/topographic.tiff'),
target: new URL('memory://target/topographic.tiff'),
},
],
start: 0,
Expand Down
2 changes: 1 addition & 1 deletion src/commands/copy/copy-rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export interface CopyContractArgs {
/** Copy ID for tracing */
id: string;
/** List of files that need to be copied */
manifest: { source: string; target: string }[];
manifest: { source: URL; target: URL }[];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can worker threads actually pass URLS? I thought everything was meant to be a POJO for passing around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean either JSON or JS object, not "Plain old Java object". I have no idea how WorkerRpc works; I guess since you ask we don't have tests for this message passing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_threads are a mechanisim of using small scripts to add more threads to do processing, they are limited to posting messages to/from the thread using a rpc type mechanisim, WorkerRpc wraps the posting so it is somewhat typesafe.

I am not sure we can pass a URL between threads as its not really a easily searliazable thing, just like you cant pass a date but you can pass a ISO string.

/** Offset into the manifest to start at */
start: number;
/** Number of records to copy */
Expand Down
22 changes: 11 additions & 11 deletions src/commands/copy/copy-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ export const FixableContentType = new Set(['binary/octet-stream', 'application/o
* If the file has been written with a unknown binary contentType attempt to fix it with common content types
*
*
* @param path File path to fix the metadata of
* @param url File path to fix the metadata of
* @param meta File metadata
* @returns New fixed file metadata if fixed other wise source file metadata
*/
export function fixFileMetadata(path: string, meta: FileInfo): FileInfo {
export function fixFileMetadata(url: URL, meta: FileInfo): FileInfo {
// If the content is encoded we do not know what the content-type should be
if (meta.contentEncoding != null) return meta;
if (!FixableContentType.has(meta.contentType ?? 'binary/octet-stream')) return meta;

// Assume our tiffs are cloud optimized
if (isTiff(path)) return { ...meta, contentType: 'image/tiff; application=geotiff; profile=cloud-optimized' };
if (isTiff(url)) return { ...meta, contentType: 'image/tiff; application=geotiff; profile=cloud-optimized' };

// overwrite with application/json
if (path.endsWith('.json')) return { ...meta, contentType: 'application/json' };
if (url.href.endsWith('.json')) return { ...meta, contentType: 'application/json' };

return meta;
}
Expand All @@ -42,13 +42,13 @@ export function fixFileMetadata(path: string, meta: FileInfo): FileInfo {
*
* try reading the path {retryCount} times before aborting, with a delay of 250ms between requests
*
* @param filePath File to head
* @param url File to head
* @param retryCount number of times to retry
* @returns file size if it exists or null
*/
async function tryHead(filePath: string, retryCount = 3): Promise<number | null> {
async function tryHead(url: URL, retryCount = 3): Promise<number | null> {
for (let i = 0; i < retryCount; i++) {
const ret = await fsa.head(filePath);
const ret = await fsa.head(url.href);
if (ret?.size) return ret.size;
await new Promise((r) => setTimeout(r, 250));
}
Expand All @@ -66,7 +66,7 @@ export const worker = new WorkerRpc<CopyContract>({
if (todo == null) continue;

Q.push(async () => {
const [source, target] = await Promise.all([fsa.head(todo.source), fsa.head(todo.target)]);
const [source, target] = await Promise.all([fsa.head(todo.source.href), fsa.head(todo.target.href)]);
if (source == null) return;
if (source.size == null) return;
if (target != null) {
Expand All @@ -86,8 +86,8 @@ export const worker = new WorkerRpc<CopyContract>({
log.trace(todo, 'File:Copy:start');
const startTime = performance.now();
await fsa.write(
todo.target,
fsa.stream(todo.source),
todo.target.href,
fsa.stream(todo.source.href),
args.fixContentType ? fixFileMetadata(todo.source, source) : source,
);

Expand All @@ -96,7 +96,7 @@ export const worker = new WorkerRpc<CopyContract>({
if (targetSize !== source.size) {
log.fatal({ ...todo }, 'Copy:Failed');
// Cleanup the failed copy so it can be retried
if (targetSize != null) await fsa.delete(todo.target);
if (targetSize != null) await fsa.delete(todo.target.href);
throw new Error(`Failed to copy source:${todo.source} target:${todo.target}`);
}
log.debug({ ...todo, size: targetSize, duration: performance.now() - startTime }, 'File:Copy');
Expand Down
34 changes: 13 additions & 21 deletions src/commands/copy/copy.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
import { fsa } from '@chunkd/fs';
import { WorkerRpcPool } from '@wtrpc/core';
import { boolean, command, flag, number, option, restPositionals, string } from 'cmd-ts';
import { boolean, command, flag, number, option, restPositionals } from 'cmd-ts';
import { performance } from 'perf_hooks';
import { gunzipSync } from 'zlib';
import * as z from 'zod';

import { CliInfo } from '../../cli.info.js';
import { logger, logId } from '../../log.js';
import { ActionCopy } from '../../utils/actions.js';
import { UrlParser } from '../../utils/parsers.js';
import { config, registerCli, verbose } from '../common.js';
import { CopyContract } from './copy-rpc.js';

const CopyValidator = z.object({ source: z.string(), target: z.string() });
const CopyManifest = z.array(CopyValidator);

/**
* Attempt to figure out how the configuration is pass to us
* - Could be a path to a S3 location s3://foo/bar.json
* - Could be a JSON document "[{}]"
* - Could be a Base64'd Gzipped document
*/
async function tryParse(x: string): Promise<unknown> {
if (x.startsWith('s3://') || x.startsWith('./') || x.startsWith('/')) {
const json = await fsa.readJson<ActionCopy>(x);
if (json.action !== 'copy') throw new Error('Invalid action: ' + json.action + ' from:' + x);
return json.parameters.manifest;
}
if (x.startsWith('[') || x.startsWith('{')) return JSON.parse(x);
return JSON.parse(gunzipSync(Buffer.from(x, 'base64url')).toString());
}

export const commandCopy = command({
name: 'copy',
description: 'Copy a manifest of files',
Expand Down Expand Up @@ -66,7 +50,11 @@ export const commandCopy = command({
defaultValueIsSerializable: true,
}),
concurrency: option({ type: number, long: 'concurrency', defaultValue: () => 4 }),
manifest: restPositionals({ type: string, displayName: 'location', description: 'Manifest of file to copy' }),
manifest: restPositionals({
type: UrlParser,
displayName: 'location',
description: 'Manifest of file to copy',
}),
},
async handler(args) {
registerCli(this, args);
Expand All @@ -87,8 +75,12 @@ export const commandCopy = command({
const chunks = [];
const startTime = performance.now();
for (const m of args.manifest) {
const data = await tryParse(m);
const manifest = CopyManifest.parse(data);
const json = await fsa.readJson<ActionCopy>(m.href);
if (json.action !== 'copy') throw new Error('Invalid action: ' + json.action + ' from:' + m);
l0b0 marked this conversation as resolved.
Show resolved Hide resolved
const data = CopyManifest.parse(json.parameters.manifest);
const manifest: ActionCopy['parameters']['manifest'] = data.map(({ source, target }) => {
return { source: new URL(source), target: new URL(target) };
});

const chunkSize = Math.ceil(manifest.length / args.concurrency);
for (let i = 0; i < manifest.length; i += chunkSize) {
Expand Down
46 changes: 25 additions & 21 deletions src/commands/create-manifest/__test__/create-manifest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ describe('createManifest', () => {
fsa.write('memory://source/foo/bar/topographic.png', Buffer.from('test')),
]);

const outputFiles = await createManifest('memory://source/', 'memory://target/', { flatten: true });
const outputFiles = await createManifest(new URL('memory://source/'), new URL('memory://target/'), {
flatten: true,
});
assert.deepEqual(outputFiles[0], [
{
source: 'memory://source/topographic.json',
target: 'memory://target/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/topographic.json'),
},
{
source: 'memory://source/foo/bar/topographic.png',
target: 'memory://target/topographic.png',
source: new URL('memory://source/foo/bar/topographic.png'),
target: new URL('memory://target/topographic.png'),
},
]);
});
Expand All @@ -36,18 +38,18 @@ describe('createManifest', () => {
fsa.write('memory://source/topographic.json', Buffer.from(JSON.stringify({ test: true }))),
fsa.write('memory://source/foo/bar/topographic.png', Buffer.from('test')),
]);
const outputFiles = await createManifest('memory://source/', 'memory://target/sub/', {
const outputFiles = await createManifest(new URL('memory://source/'), new URL('memory://target/sub/'), {
flatten: false,
transform: 'f.replace("topographic", "test")',
});
assert.deepEqual(outputFiles[0], [
{
source: 'memory://source/topographic.json',
target: 'memory://target/sub/test.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/sub/test.json'),
},
{
source: 'memory://source/foo/bar/topographic.png',
target: 'memory://target/sub/foo/bar/test.png',
source: new URL('memory://source/foo/bar/topographic.png'),
target: new URL('memory://target/sub/foo/bar/test.png'),
},
]);
});
Expand All @@ -58,15 +60,17 @@ describe('createManifest', () => {
fsa.write('memory://source/foo/bar/topographic.png', Buffer.from('test')),
]);

const outputFiles = await createManifest('memory://source/', 'memory://target/sub/', { flatten: false });
const outputFiles = await createManifest(new URL('memory://source/'), new URL('memory://target/sub/'), {
flatten: false,
});
assert.deepEqual(outputFiles[0], [
{
source: 'memory://source/topographic.json',
target: 'memory://target/sub/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/sub/topographic.json'),
},
{
source: 'memory://source/foo/bar/topographic.png',
target: 'memory://target/sub/foo/bar/topographic.png',
source: new URL('memory://source/foo/bar/topographic.png'),
target: new URL('memory://target/sub/foo/bar/topographic.png'),
},
]);
});
Expand All @@ -75,26 +79,26 @@ describe('createManifest', () => {
await Promise.all([fsa.write('memory://source/topographic.json', Buffer.from(JSON.stringify({ test: true })))]);

const outputFiles = await createManifest(
'memory://source/topographic.json',
'memory://target/sub/topographic.json',
new URL('memory://source/topographic.json'),
new URL('memory://target/sub/topographic.json'),
{ flatten: false },
);
assert.deepEqual(outputFiles[0], [
{
source: 'memory://source/topographic.json',
target: 'memory://target/sub/topographic.json',
source: new URL('memory://source/topographic.json'),
target: new URL('memory://target/sub/topographic.json'),
},
]);
});
describe('validatePaths', () => {
it('Should throw error for Missmatch Paths', () => {
assert.throws(() => {
validatePaths('memory://source/', 'memory://target/sub/test.tiff');
validatePaths(new URL('memory://source/'), new URL('memory://target/sub/test.tiff'));
}, Error);
});
it('Should also throw error for Missmatch Paths', () => {
assert.throws(() => {
validatePaths('memory://source/test.tiff', 'memory://target/sub/');
validatePaths(new URL('memory://source/test.tiff'), new URL('memory://target/sub/'));
}, Error);
});
});
Expand Down
Loading
Loading