Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(AbortSignal/fetch) fix AbortSignal.timeout, fetch lock behavior and fetch errors #6390

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/bun-usockets/src/eventing/epoll_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,18 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
#endif

#ifdef LIBUS_USE_EPOLL
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;

us_poll_stop(&cb->p, cb->loop);
close(us_poll_fd(&cb->p));

/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, cb->loop);
/* (regular) sockets are the only polls which are not freed immediately */
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, cb->loop);
}
}

void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
Expand All @@ -438,15 +442,19 @@ void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms
us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
}
#else
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;

struct kevent64_s event;
EV_SET64(&event, (uint64_t) (void*) internal_cb, EVFILT_TIMER, EV_DELETE, 0, 0, (uint64_t)internal_cb, 0, 0);
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);

/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
if(fallthrough){
us_free(timer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a scenario where:

  1. Timer is scheduled to run in the next tick
  2. Tick happens, but before timer happens, the timer is cancelled
  3. Timer is freed
  4. Callback for timer runs after the timer is freed, which becomes a use-after-free

Copy link
Member Author

@cirospaciari cirospaciari Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today we call us_poll_free with just do:

void us_poll_free(struct us_poll_t *p, struct us_loop_t *loop) {
    loop->num_polls--;
    us_free(p);
}

so is safe to assume that we are not breaking any behavior only avoiding loop->num_polls--; because on fallthrough: true we dont increment num_pools, But will check this scenario.

}else {
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
}
}

void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
void *us_timer_ext(struct us_timer_t *timer);

/* */
void us_timer_close(struct us_timer_t *timer);
void us_timer_close(struct us_timer_t *timer, int fallthrough);

/* Arm a timer with a delay from now and eventually a repeat delay.
* Specify 0 as repeat delay to disable repeating. Specify both 0 to disarm. */
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-usockets/src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void us_internal_loop_data_free(struct us_loop_t *loop) {

free(loop->data.recv_buf);

us_timer_close(loop->data.sweep_timer);
us_timer_close(loop->data.sweep_timer, 0);
us_internal_async_close(loop->data.wakeup_async);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/Broadcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -47,7 +47,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/HelloWorldAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -52,7 +52,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
4 changes: 2 additions & 2 deletions packages/bun-uws/capi/examples/UpgradeAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
Expand Down Expand Up @@ -95,7 +95,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-uws/examples/UpgradeAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int main() {

delete upgradeData;

us_timer_close(t);
us_timer_close(t, 0);
}, 5000, 0);

},
Expand Down
2 changes: 1 addition & 1 deletion packages/bun-uws/src/Loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct Loop {
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);

/* Stop and free dateTimer first */
us_timer_close(loopData->dateTimer);
us_timer_close(loopData->dateTimer, 1);

loopData->~LoopData();
/* uSockets will track whether this loop is owned by us or a borrowed alien loop */
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/api/bun.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3604,7 +3604,7 @@ pub const Timer = struct {

this.poll_ref.unref(vm);

this.timer.deinit();
this.timer.deinit(false);

// balance double unreffing in doUnref
vm.event_loop_handle.?.num_polls += @as(i32, @intFromBool(this.did_unref_timer));
Expand Down
4 changes: 2 additions & 2 deletions src/bun.js/api/ffi.zig
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ pub const FFI = struct {
};
};
};

var size = symbols.values().len;
if(size >= 63) {
if (size >= 63) {
size = 0;
}
var obj = JSC.JSValue.createEmptyObject(global, size);
Expand Down
1 change: 1 addition & 0 deletions src/bun.js/bindings/ZigGlobalObject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3717,6 +3717,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)

putDirectBuiltinFunction(vm, this, builtinNames.createFIFOPrivateName(), streamInternalsCreateFIFOCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyReadableStreamPrivateName(), readableStreamCreateEmptyReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createUsedReadableStreamPrivateName(), readableStreamCreateUsedReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.consumeReadableStreamPrivateName(), readableStreamConsumeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createNativeReadableStreamPrivateName(), readableStreamCreateNativeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.requireESMPrivateName(), importMetaObjectRequireESMCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
Expand Down
8 changes: 8 additions & 0 deletions src/bun.js/bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,14 @@ JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject)
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}

JSC__JSValue ReadableStream__used(Zig::GlobalObject* globalObject)
{
auto& vm = globalObject->vm();
auto clientData = WebCore::clientData(vm);
auto* function = globalObject->getDirect(vm, clientData->builtinNames().createUsedReadableStreamPrivateName()).getObject();
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}

JSC__JSValue JSC__JSValue__createRangeError(const ZigString* message, const ZigString* arg1,
JSC__JSGlobalObject* globalObject)
{
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ pub const EventLoop = struct {

pub fn callTask(timer: *uws.Timer) callconv(.C) void {
var task = Task.from(timer.as(*anyopaque));
timer.deinit();
defer timer.deinit(true);

JSC.VirtualMachine.get().enqueueTask(task);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/node/node_fs_stat_watcher.zig
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub const StatWatcherScheduler = struct {
prev = next;
} else {
if (this.head.load(.Monotonic) == null) {
this.timer.?.deinit();
this.timer.?.deinit(false);
this.timer = null;
// The scheduler is not deinit here, but it will get reused.
}
Expand Down
11 changes: 8 additions & 3 deletions src/bun.js/webcore/body.zig
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,10 @@ pub const Body = struct {
JSC.markBinding(@src());

switch (this.*) {
.Used, .Empty => {
.Used => {
return JSC.WebCore.ReadableStream.used(globalThis);
},
.Empty => {
return JSC.WebCore.ReadableStream.empty(globalThis);
},
.Null => {
Expand All @@ -493,6 +496,9 @@ pub const Body = struct {
if (locked.readable) |readable| {
return readable.value;
}
if (locked.promise != null) {
return JSC.WebCore.ReadableStream.used(globalThis);
}
var drain_result: JSC.WebCore.DrainResult = .{
.estimated_size = 0,
};
Expand Down Expand Up @@ -1104,8 +1110,7 @@ pub fn BodyMixin(comptime Type: type) type {
var body: *Body.Value = this.getBodyValue();

if (body.* == .Used) {
// TODO: make this closed
return JSC.WebCore.ReadableStream.empty(globalThis);
return JSC.WebCore.ReadableStream.used(globalThis);
}

return body.toReadableStream(globalThis);
Expand Down
30 changes: 15 additions & 15 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -778,27 +778,27 @@ pub const Fetch = struct {
if (!success) {
const err = this.onReject();
err.ensureStillAlive();
// if we are streaming update with error
if (this.readable_stream_ref.get()) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
}
// if we are buffering resolve the promise
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
if (body.value == .Locked) {
if (body.value.Locked.readable) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
return;
}
if (body.value.Locked.promise) |promise_| {
const promise = promise_.asAnyPromise().?;
promise.reject(globalThis, err);
}

response.body.value.toErrorInstance(err, globalThis);
return;
}
}

globalThis.throwValue(err);
return;
}

Expand Down Expand Up @@ -1708,7 +1708,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}

Expand Down Expand Up @@ -1901,7 +1901,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub const ReadableStream = struct {
extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__used(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
Expand Down Expand Up @@ -367,6 +368,12 @@ pub const ReadableStream = struct {
return ReadableStream__empty(globalThis);
}

pub fn used(globalThis: *JSGlobalObject) JSC.JSValue {
JSC.markBinding(@src());

return ReadableStream__used(globalThis);
}

const Base = @import("../../ast/base.zig");
pub const StreamTag = enum(usize) {
invalid = 0,
Expand Down Expand Up @@ -3596,6 +3603,9 @@ pub const ByteStream = struct {
this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
this.buffer.appendSliceAssumeCapacity(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
else => unreachable,
}
return;
Expand All @@ -3605,6 +3615,9 @@ pub const ByteStream = struct {
.temporary_and_done, .temporary => {
try this.buffer.appendSlice(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
// We don't support the rest of these yet
else => unreachable,
}
Expand Down
6 changes: 3 additions & 3 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,9 @@ pub const Timer = opaque {
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
}

pub fn deinit(this: *Timer) void {
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
debug("Timer.deinit()", .{});
us_timer_close(this);
us_timer_close(this, @intFromBool(fallthrough));
}

pub fn ext(this: *Timer, comptime Type: type) ?*Type {
Expand Down Expand Up @@ -946,7 +946,7 @@ const uintmax_t = c_ulong;

extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) *Timer;
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
extern fn us_timer_close(timer: ?*Timer) void;
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
pub const us_socket_context_options_t = extern struct {
Expand Down
Loading