Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve brimcap error handling #2955

Merged
merged 10 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions apps/zui/src/core/loader/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {LoadFormat} from "@brimdata/zed-js"
import {LoadContext} from "../../domain/loads/load-context"

export type LoadOptions = {
windowId: string
Expand All @@ -14,7 +13,7 @@ export type LoadOptions = {
}

export interface Loader {
when(context: LoadContext): PromiseLike<boolean> | boolean
run(context: LoadContext): PromiseLike<void> | void
rollback(context: LoadContext): PromiseLike<void> | void
when(): PromiseLike<boolean> | boolean
run(): PromiseLike<void> | void
rollback?(): PromiseLike<void> | void
}
25 changes: 14 additions & 11 deletions apps/zui/src/domain/loads/default-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import {createReadableStream} from "src/core/zq"
import {throttle} from "lodash"
import {errorToString} from "src/util/error-to-string"

export const defaultLoader: Loader = {
export class DefaultLoader implements Loader {
constructor(private ctx: LoadContext) {}

when() {
return true
},
}

async run(ctx: LoadContext) {
async run() {
const {ctx} = this
const client = await ctx.createClient()
const progress = createProgressTracker(ctx)
const shaper = createShaper(ctx)
Expand All @@ -26,7 +29,7 @@ export const defaultLoader: Loader = {

let res
try {
ctx.onProgress(0)
ctx.setProgress(0)
res = await client.load(body, {
pool: ctx.poolId,
branch: ctx.branch,
Expand All @@ -38,16 +41,16 @@ export const defaultLoader: Loader = {
})
} catch (e) {
const error = streamError ? new Error(streamError) : e
ctx.onWarning(errorToString(error))
ctx.onProgress(null)
ctx.addError(errorToString(error))
ctx.setProgress(null)
throw error
}
for (const warning of res?.warnings ?? []) ctx.onWarning(warning)
for (const warning of res?.warnings ?? []) ctx.addError(warning)
await ctx.onPoolChanged()
ctx.onProgress(1)
},
ctx.setProgress(1)
}

rollback() {},
rollback() {}
}

function getFileSize(path: string) {
Expand All @@ -64,7 +67,7 @@ function createShaper(ctx) {
}

function createProgressTracker(ctx) {
const onProgress = throttle((n) => ctx.onProgress(n), 500)
const onProgress = throttle((n) => ctx.setProgress(n), 500)

let total = ctx.files.reduce((sum, file) => sum + getFileSize(file), 0)
let bytes = 0
Expand Down
2 changes: 1 addition & 1 deletion apps/zui/src/domain/loads/handlers/quick-load-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export const quickLoadFiles = createHandler(
return
}

ctx.invoke("loads.create", {
return ctx.invoke("loads.create", {
windowId: globalThis.windowId,
poolId: args.poolId || "new",
files: args.files,
Expand Down
14 changes: 10 additions & 4 deletions apps/zui/src/domain/loads/load-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {createLoadRef} from "./load-ref"
import {select} from "src/core/main/select"

export class LoadContext {
abortMsg = undefined as string
private ctl = new AbortController()
private id = nanoid()
private window: SearchWindow
Expand Down Expand Up @@ -41,24 +42,29 @@ export class LoadContext {
)
}

onProgress(progress: number) {
setProgress(progress: number) {
this.main.dispatch(Loads.update({id: this.id, changes: {progress}}))
}

onWarning(warning: string) {
addError(error: string) {
const load = Loads.find(this.main.store.getState(), this.id)
const errors = [...load.errors, warning]
const errors = [...load.errors, error]
this.main.dispatch(Loads.update({id: this.id, changes: {errors}}))
}

async onPoolChanged() {
await syncPoolOp(this.opts.lakeId, this.opts.poolId)
}

abort() {
abort(msg?: string) {
this.abortMsg = msg
this.ctl.abort()
}

get abortError() {
if (this.abortMsg) return new Error(this.abortMsg)
}

get ref() {
return select((s) => Loads.find(s, this.id))
}
Expand Down
4 changes: 3 additions & 1 deletion apps/zui/src/domain/loads/operations/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const submit = createOperation(
const pool = await createPool(data)
const script = new ZedScript(data.shaper || "")
// Async so that we can return this and subscribe to updates on the load.
zui.pools
const promise = zui.pools
.load({
windowId: data.windowId,
format: data.format,
Expand All @@ -36,6 +36,8 @@ export const submit = createOperation(
})

zui.window.openTab(poolPath(pool.id))

return promise
}
)

Expand Down
36 changes: 11 additions & 25 deletions apps/zui/src/domain/loads/plugin-api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {defaultLoader} from "./default-loader"
import {DefaultLoader} from "./default-loader"
import {LoadContext} from "./load-context"
import {Loader} from "src/core/loader/types"
import Loads from "src/js/state/Loads"
Expand All @@ -12,38 +12,24 @@ type Events = {
error: (load: LoadReference) => void
}

type LoaderRef = {name: string; initialize: (ctx: LoadContext) => Loader}

export class LoadsApi extends TypedEmitter<Events> {
private list: LoaderApi[] = []
private list: LoaderRef[] = []

// Don't use this...or rename to addLoader
create(name: string, impl: Loader) {
this.list.push(new LoaderApi(name, impl))
addLoader(name: string, initialize: (ctx: LoadContext) => Loader) {
this.list.push({name, initialize})
}

async getMatch(context: LoadContext) {
let loader = defaultLoader
for (const pluginLoader of this.list) {
if (await pluginLoader.when(context)) {
loader = pluginLoader
break
}
async initialize(context: LoadContext) {
for (const ref of this.list) {
const customLoader = ref.initialize(context)
if (await customLoader.when()) return customLoader
}
return loader
return new DefaultLoader(context)
}

get all() {
return select(Loads.all)
}
}

class LoaderApi {
when: Loader["when"]
run: Loader["run"]
rollback: Loader["rollback"]

constructor(public name: string, impl: Loader) {
this.when = impl.when
this.run = impl.run
this.rollback = impl.rollback
}
}
9 changes: 5 additions & 4 deletions apps/zui/src/domain/pools/plugin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {syncPoolOp} from "src/electron/ops/sync-pool-op"
import {LoadOptions} from "src/core/loader/types"
import {getMainObject} from "src/core/main"
import {TypedEmitter} from "src/util/typed-emitter"
import {call} from "src/util/call"

type Events = {
create: (event: {pool: Pool}) => void
Expand Down Expand Up @@ -39,16 +40,16 @@ export class PoolsApi extends TypedEmitter<Events> {
async load(opts: LoadOptions) {
const main = getMainObject()
const context = new LoadContext(main, opts)
const loader = await loads.getMatch(context)
const loader = await loads.initialize(context)
try {
await context.setup()
await loader.run(context)
await loader.run()
await waitForPoolStats(context)
loads.emit("success", context.ref)
} catch (e) {
await loader.rollback(context)
await call(loader.rollback)
loads.emit("error", context.ref)
throw e
throw context.abortError || e
} finally {
context.teardown()
}
Expand Down
27 changes: 27 additions & 0 deletions apps/zui/src/plugins/brimcap/analyze.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {compact} from "lodash"
import {configurations} from "src/zui"
import {pluginNamespace, yamlConfigPropName} from "./config"
import {AnalyzeOptions, createCli} from "./cli"

function getAnalyzeOptions(): AnalyzeOptions {
return {
json: true,
config: configurations.get(pluginNamespace, yamlConfigPropName) || "",
}
}

export function createAnalyzeProcess(signal) {
const cli = createCli()
const sub = cli.analyze("-", getAnalyzeOptions(), signal)
return sub
}

export function monitorAnalyzeProgress(analyzeProc, callback) {
analyzeProc.stderr
.once("data", () => analyzeProc.stdout.emit("start"))
.on("data", (data) => {
const lines = compact(data.toString().split("\n")) as string[]
const stats = lines.map((line) => JSON.parse(line))
stats.forEach(callback)
})
}
18 changes: 4 additions & 14 deletions apps/zui/src/plugins/brimcap/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,11 @@ export default class BrimcapCLI {
return this.execSpawnSync("search", [...toCliOpts(opts)])
}

private execSpawn(
subCommand: string,
optsAndArgs: string[],
signal?: AbortSignal
) {
// don't detach if is windows
const p = spawn(this.binPath, [subCommand, ...optsAndArgs], this.spawnOpts)
signal?.addEventListener("abort", () => {
if (this.isWin) {
spawnSync("taskkill", ["/pid", p.pid.toString(), "/f", "/t"])
} else {
process.kill(-p.pid, "SIGINT")
}
private execSpawn(subCommand: string, optsAndArgs: string[], signal) {
return spawn(this.binPath, [subCommand, ...optsAndArgs], {
...this.spawnOpts,
signal,
})
return p
}

private execSpawnSync(subCommand: string, opts: string[]) {
Expand Down
10 changes: 10 additions & 0 deletions apps/zui/src/plugins/brimcap/configure-zeek-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {zeekColorMap} from "./zeek/colors"
import {pools} from "src/zui"

export function configureZeekPool(poolId: string) {
pools
.configure(poolId)
.set("timeField", "ts")
.set("colorField", "_path")
.set("colorMap", zeekColorMap)
}
Loading
Loading