Skip to content

Commit

Permalink
feat: let $ be piped from stream (#953)
Browse files Browse the repository at this point in the history
continues #949
  • Loading branch information
antongolub authored Nov 24, 2024
1 parent 8900e45 commit 4bb470b
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 37 deletions.
6 changes: 3 additions & 3 deletions .size-limit.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{
"name": "zx/core",
"path": ["build/core.cjs", "build/util.cjs", "build/vendor-core.cjs"],
"limit": "72 kB",
"limit": "73 kB",
"brotli": false,
"gzip": false
},
Expand All @@ -16,7 +16,7 @@
{
"name": "dts libdefs",
"path": "build/*.d.ts",
"limit": "36 kB",
"limit": "37 kB",
"brotli": false,
"gzip": false
},
Expand All @@ -30,7 +30,7 @@
{
"name": "all",
"path": "build/*",
"limit": "835 kB",
"limit": "840 kB",
"brotli": false,
"gzip": false
}
Expand Down
59 changes: 57 additions & 2 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import {
once,
parseDuration,
preferLocalBin,
promisifyStream,
quote,
quotePowerShell,
} from './util.js'
Expand Down Expand Up @@ -373,7 +372,7 @@ export class ProcessPromise extends Promise<ProcessOutput> {
return dest
}
from.once('end', () => dest.emit('end-piped-from')).pipe(dest)
return promisifyStream(dest)
return promisifyStream(dest, this)
}

abort(reason?: string) {
Expand Down Expand Up @@ -544,6 +543,32 @@ export class ProcessPromise extends Promise<ProcessOutput> {
): Promise<ProcessOutput | T> {
return super.catch(onrejected)
}

// Stream-like API
private writable = true
private emit(event: string, ...args: any[]) {
return this
}
private on(event: string, cb: any) {
this._stdin.on(event, cb)
return this
}
private once(event: string, cb: any) {
this._stdin.once(event, cb)
return this
}
private write(data: any, encoding: BufferEncoding, cb: any) {
this._stdin.write(data, encoding, cb)
return this
}
private end(chunk: any, cb: any) {
this._stdin.end(chunk, cb)
return this
}
private removeListener(event: string, cb: any) {
this._stdin.removeListener(event, cb)
return this
}
}

type GettersRecord<T extends Record<any, any>> = { [K in keyof T]: () => T[K] }
Expand Down Expand Up @@ -841,3 +866,33 @@ export function log(entry: LogEntry) {
process.stderr.write(entry.error + '\n')
}
}

export const promisifyStream = <S extends Writable>(
stream: S,
from?: ProcessPromise
): S & PromiseLike<S> =>
new Proxy(stream as S & PromiseLike<S>, {
get(target, key) {
if (key === 'run') return from?.run.bind(from)
if (key === 'then') {
return (res: any = noop, rej: any = noop) =>
new Promise((_res, _rej) =>
target
.once('error', (e) => _rej(rej(e)))
.once('finish', () => _res(res(target)))
.once('end-piped-from', () => _res(res(target)))
)
}
const value = Reflect.get(target, key)
if (key === 'pipe' && typeof value === 'function') {
return function (...args: any) {
const piped = value.apply(target, args)
piped._pipedFrom = from
return piped instanceof ProcessPromise
? piped
: promisifyStream(piped, from)
}
}
return value
},
})
25 changes: 0 additions & 25 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import os from 'node:os'
import path from 'node:path'
import fs from 'node:fs'
import { chalk } from './vendor-core.js'
import type { Writable } from 'node:stream'

export { isStringLiteral } from './vendor-core.js'

Expand Down Expand Up @@ -450,27 +449,3 @@ export const once = <T extends (...args: any[]) => any>(fn: T) => {
return (result = fn(...args))
}
}

export const promisifyStream = <S extends Writable>(
stream: S
): S & PromiseLike<S> =>
new Proxy(stream as S & PromiseLike<S>, {
get(target, key) {
if (key === 'then') {
return (res: any = noop, rej: any = noop) =>
new Promise((_res, _rej) =>
target
.once('error', (e) => _rej(rej(e)))
.once('finish', () => _res(res(target)))
.once('end-piped-from', () => _res(res(target)))
)
}
const value = Reflect.get(target, key)
if (key === 'pipe' && typeof value === 'function') {
return function (...args: any) {
return promisifyStream(value.apply(target, args))
}
}
return value
},
})
48 changes: 41 additions & 7 deletions test/core.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ describe('core', () => {
})

describe('supports chaining', () => {
const getUpperCaseTransform = () =>
new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase())
},
})

test('$ > $', async () => {
const { stdout: o1 } = await $`echo "hello"`
.pipe($`awk '{print $1" world"}'`)
Expand Down Expand Up @@ -466,13 +473,7 @@ describe('core', () => {
const file = tempfile()
const fileStream = fs.createWriteStream(file)
const p = $`echo "hello"`
.pipe(
new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase())
},
})
)
.pipe(getUpperCaseTransform())
.pipe(fileStream)

assert.ok(p instanceof WriteStream)
Expand All @@ -481,6 +482,39 @@ describe('core', () => {
await fs.rm(file)
})

test('$ halted > stream', async () => {
const file = tempfile()
const fileStream = fs.createWriteStream(file)
const p = $({ halt: true })`echo "hello"`
.pipe(getUpperCaseTransform())
.pipe(fileStream)

assert.ok(p instanceof WriteStream)
assert.ok(p.run() instanceof ProcessPromise)
await p
assert.equal((await p.run()).stdout, 'hello\n')
assert.equal((await fs.readFile(file)).toString(), 'HELLO\n')
await fs.rm(file)
})

test('stream > $', async () => {
const file = tempfile()
await fs.writeFile(file, 'test')
const { stdout } = await fs
.createReadStream(file)
.pipe(getUpperCaseTransform())
.pipe($`cat`)

assert.equal(stdout, 'TEST')
})

test('$ > stream > $', async () => {
const p = $`echo "hello"`
const { stdout } = await p.pipe(getUpperCaseTransform()).pipe($`cat`)

assert.equal(stdout, 'HELLO\n')
})

test('$ > stdout', async () => {
const p = $`echo 1`.pipe(process.stdout)
assert.equal(await p, process.stdout)
Expand Down

0 comments on commit 4bb470b

Please sign in to comment.