Skip to content

Commit

Permalink
fix(AbortSignal/fetch) fix AbortSignal.timeout, fetch lock behavior a…
Browse files Browse the repository at this point in the history
…nd fetch errors (#6390)

* fix abort signal and fetch error

* fix fetch error and lock behavior
  • Loading branch information
cirospaciari authored Oct 10, 2023
1 parent 3667b93 commit 6301778
Show file tree
Hide file tree
Showing 29 changed files with 303 additions and 61 deletions.
18 changes: 13 additions & 5 deletions packages/bun-usockets/src/eventing/epoll_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,18 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
#endif

#ifdef LIBUS_USE_EPOLL
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;

us_poll_stop(&cb->p, cb->loop);
close(us_poll_fd(&cb->p));

/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, cb->loop);
/* (regular) sockets are the only polls which are not freed immediately */
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, cb->loop);
}
}

void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
Expand All @@ -438,15 +442,19 @@ void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms
us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
}
#else
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;

struct kevent64_s event;
EV_SET64(&event, (uint64_t) (void*) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, (uint64_t)internal_cb, 0, 0);
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);

/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
}
}

void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
void *us_timer_ext(struct us_timer_t *timer);

/* */
void us_timer_close(struct us_timer_t *timer);
void us_timer_close(struct us_timer_t *timer, int fallthrough);

/* Arm a timer with a delay from now and eventually a repeat delay.
* Specify 0 as repeat delay to disable repeating. Specify both 0 to disarm. */
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-usockets/src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void us_internal_loop_data_free(struct us_loop_t *loop) {

free(loop->data.recv_buf);

us_timer_close(loop->data.sweep_timer);
us_timer_close(loop->data.sweep_timer, 0);
us_internal_async_close(loop->data.wakeup_async);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/Broadcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -47,7 +47,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/HelloWorldAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -52,7 +52,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/UpgradeAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -95,7 +95,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-uws/examples/UpgradeAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int main() {

delete upgradeData;

us_timer_close(t);
us_timer_close(t, 0);
}, 5000, 0);

},
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-uws/src/Loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct Loop {
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);

/* Stop and free dateTimer first */
us_timer_close(loopData->dateTimer);
us_timer_close(loopData->dateTimer, 1);

loopData->~LoopData();
/* uSockets will track whether this loop is owned by us or a borrowed alien loop */
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/api/bun.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3604,7 +3604,7 @@ pub const Timer = struct {

this.poll_ref.unref(vm);

this.timer.deinit();
this.timer.deinit(false);

// balance double unreffing in doUnref
vm.event_loop_handle.?.num_polls += @as(i32, @intFromBool(this.did_unref_timer));
Expand Down
4 changes: 2 additions & 2 deletions src/bun.js/api/ffi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ pub const FFI = struct {
};
};
};

var size = symbols.values().len;
if(size >= 63) {
if (size >= 63) {
size = 0;
}
var obj = JSC.JSValue.createEmptyObject(global, size);
Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/ZigGlobalObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3717,6 +3717,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)

putDirectBuiltinFunction(vm, this, builtinNames.createFIFOPrivateName(), streamInternalsCreateFIFOCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyReadableStreamPrivateName(), readableStreamCreateEmptyReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createUsedReadableStreamPrivateName(), readableStreamCreateUsedReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.consumeReadableStreamPrivateName(), readableStreamConsumeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createNativeReadableStreamPrivateName(), readableStreamCreateNativeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.requireESMPrivateName(), importMetaObjectRequireESMCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
Expand Down
8 changes: 8 additions & 0 deletions src/bun.js/bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,14 @@ JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject)
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}

JSC__JSValue ReadableStream__used(Zig::GlobalObject* globalObject)
{
auto& vm = globalObject->vm();
auto clientData = WebCore::clientData(vm);
auto* function = globalObject->getDirect(vm, clientData->builtinNames().createUsedReadableStreamPrivateName()).getObject();
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}

JSC__JSValue JSC__JSValue__createRangeError(const ZigString* message, const ZigString* arg1,
JSC__JSGlobalObject* globalObject)
{
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ pub const EventLoop = struct {

pub fn callTask(timer: *uws.Timer) callconv(.C) void {
var task = Task.from(timer.as(*anyopaque));
timer.deinit();
defer timer.deinit(true);

JSC.VirtualMachine.get().enqueueTask(task);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/node/node_fs_stat_watcher.zig
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub const StatWatcherScheduler = struct {
prev = next;
} else {
if (this.head.load(.Monotonic) == null) {
this.timer.?.deinit();
this.timer.?.deinit(false);
this.timer = null;
// The scheduler is not deinit here, but it will get reused.
}
Expand Down
11 changes: 8 additions & 3 deletions src/bun.js/webcore/body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,10 @@ pub const Body = struct {
JSC.markBinding(@src());

switch (this.*) {
.Used, .Empty => {
.Used => {
return JSC.WebCore.ReadableStream.used(globalThis);
},
.Empty => {
return JSC.WebCore.ReadableStream.empty(globalThis);
},
.Null => {
Expand All @@ -493,6 +496,9 @@ pub const Body = struct {
if (locked.readable) |readable| {
return readable.value;
}
if (locked.promise != null) {
return JSC.WebCore.ReadableStream.used(globalThis);
}
var drain_result: JSC.WebCore.DrainResult = .{
.estimated_size = 0,
};
Expand Down Expand Up @@ -1104,8 +1110,7 @@ pub fn BodyMixin(comptime Type: type) type {
var body: *Body.Value = this.getBodyValue();

if (body.* == .Used) {
// TODO: make this closed
return JSC.WebCore.ReadableStream.empty(globalThis);
return JSC.WebCore.ReadableStream.used(globalThis);
}

return body.toReadableStream(globalThis);
Expand Down
30 changes: 15 additions & 15 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -778,27 +778,27 @@ pub const Fetch = struct {
if (!success) {
const err = this.onReject();
err.ensureStillAlive();
// if we are streaming update with error
if (this.readable_stream_ref.get()) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
}
// if we are buffering resolve the promise
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
if (body.value == .Locked) {
if (body.value.Locked.readable) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
return;
}
if (body.value.Locked.promise) |promise_| {
const promise = promise_.asAnyPromise().?;
promise.reject(globalThis, err);
}

response.body.value.toErrorInstance(err, globalThis);
return;
}
}

globalThis.throwValue(err);
return;
}

Expand Down Expand Up @@ -1708,7 +1708,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}

Expand Down Expand Up @@ -1901,7 +1901,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub const ReadableStream = struct {
extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__used(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
Expand Down Expand Up @@ -367,6 +368,12 @@ pub const ReadableStream = struct {
return ReadableStream__empty(globalThis);
}

pub fn used(globalThis: *JSGlobalObject) JSC.JSValue {
JSC.markBinding(@src());

return ReadableStream__used(globalThis);
}

const Base = @import("../../ast/base.zig");
pub const StreamTag = enum(usize) {
invalid = 0,
Expand Down Expand Up @@ -3596,6 +3603,9 @@ pub const ByteStream = struct {
this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
this.buffer.appendSliceAssumeCapacity(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
else => unreachable,
}
return;
Expand All @@ -3605,6 +3615,9 @@ pub const ByteStream = struct {
.temporary_and_done, .temporary => {
try this.buffer.appendSlice(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
// We don't support the rest of these yet
else => unreachable,
}
Expand Down
6 changes: 3 additions & 3 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,9 @@ pub const Timer = opaque {
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
}

pub fn deinit(this: *Timer) void {
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
debug("Timer.deinit()", .{});
us_timer_close(this);
us_timer_close(this, @intFromBool(fallthrough));
}

pub fn ext(this: *Timer, comptime Type: type) ?*Type {
Expand Down Expand Up @@ -946,7 +946,7 @@ const uintmax_t = c_ulong;

extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) *Timer;
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
extern fn us_timer_close(timer: ?*Timer) void;
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
pub const us_socket_context_options_t = extern struct {
Expand Down
Loading

0 comments on commit 6301778

Please sign in to comment.