Skip to content

Commit

Permalink
Added Support for Cloning to Tee'd Default Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Redfire75369 committed Aug 5, 2024
1 parent 206741e commit 7a36f17
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
1 change: 1 addition & 0 deletions runtime/src/globals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub fn init_globals(cx: &Context, global: &Object) -> bool {
&& streams::define(cx, global)
&& url::define(cx, global)
&& Iterator::init_class(cx, global).0;

#[cfg(feature = "fetch")]
{
result && fetch::define(cx, global)
Expand Down
44 changes: 39 additions & 5 deletions runtime/src/globals/streams/readable/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ use std::rc::Rc;

use bytes::{Buf, Bytes};
use mozjs::gc::HandleObject;
use mozjs::jsapi::{Heap, JSFunction, JSObject};
use mozjs::jsapi::{CloneDataPolicy, Heap, JSFunction, JSObject, StructuredCloneScope};
use mozjs::jsval::{JSVal, UndefinedValue};

use ion::{ClassDefinition, Context, Exception, Function, Local, Object, Promise, Result, ResultExc, TracedHeap, Value};
use ion::class::NativeObject;
use ion::clone::StructuredCloneBuffer;
use ion::conversions::{FromValue, ToValue};
use ion::function::Opt;
use ion::typedarray::{ArrayBuffer, ArrayBufferView, Uint8Array};

use crate::globals::clone;
use crate::globals::clone::STRUCTURED_CLONE_CALLBACKS;
use crate::globals::streams::readable::{ByobRequest, ByteStreamController, ReadableStream, ReaderOptions};
use crate::globals::streams::readable::controller::ControllerInternals;
use crate::globals::streams::readable::reader::{ReaderKind, Request};
Expand Down Expand Up @@ -95,8 +97,40 @@ impl StreamSource {
promise.then(cx, move |cx, _| {
state.read_again.set(false);
let chunk = Value::from(chunk.to_local());

// TODO: CloneForBranch2
let mut chunk2 = None;

if !state.common.cancelled[1].get() && state.clone_branch_2 {
let policy = CloneDataPolicy {
allowIntraClusterClonableSharedObjects_: false,
allowSharedMemoryObjects_: true,
};

let mut buffer = StructuredCloneBuffer::new(
StructuredCloneScope::SameProcess,
&STRUCTURED_CLONE_CALLBACKS,
);
let result =
buffer.write(cx, &chunk, None, &policy).and_then(|_| buffer.read(cx, &policy));

match result {
Ok(chunk) => {
chunk2 = Some(chunk);
}
Err(e) => {
let value = e.as_value(cx);
let branch1 = state.common.branch(cx, false)?;
let controller1 = branch1.native_controller(cx)?.into_default().unwrap();
controller1.error_internal(cx, &value)?;

let branch2 = state.common.branch(cx, true)?;
let controller2 = branch2.native_controller(cx)?.into_default().unwrap();
controller2.error_internal(cx, &value)?;

state.common.cancel(cx, &value);
return Ok(Value::undefined_handle());
}
}
}

if !state.common.cancelled[0].get() {
let branch = state.common.branch(cx, false)?;
Expand All @@ -106,7 +140,7 @@ impl StreamSource {
if !state.common.cancelled[1].get() {
let branch = state.common.branch(cx, true)?;
let controller = branch.native_controller(cx)?.into_default().unwrap();
controller.enqueue_internal(cx, &chunk)?;
controller.enqueue_internal(cx, chunk2.as_ref().unwrap_or(&chunk))?;
}

state.common.reading.set(false);
Expand Down

0 comments on commit 7a36f17

Please sign in to comment.