From 0c8192222fe8a6258e768fef82336dce3566671f Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Mon, 2 Dec 2024 16:29:06 -0800 Subject: [PATCH] rx.upload must include _var_data from props (#4463) * rx.upload must include _var_data from props str-casting the dropzone arguments removed any VarData they depended on, like the state context. update test_upload to include passing a prop from a state var * Handle large payload delta from upload event handler Fix update chunk chaining logic; try/catch wasn't catching errors from the async inner function. --- reflex/.templates/web/utils/state.js | 39 ++++++++++++++++------------ reflex/components/core/upload.py | 13 ++++++---- tests/integration/test_upload.py | 10 ++++++- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/reflex/.templates/web/utils/state.js b/reflex/.templates/web/utils/state.js index f6541c7ae9..622f171ade 100644 --- a/reflex/.templates/web/utils/state.js +++ b/reflex/.templates/web/utils/state.js @@ -457,7 +457,7 @@ export const connect = async ( socket.current.on("reload", async (event) => { event_processing = false; queueEvents([...initialEvents(), JSON5.parse(event)], socket); - }) + }); document.addEventListener("visibilitychange", checkVisibility); }; @@ -490,23 +490,30 @@ export const uploadFiles = async ( return false; } + // Track how many partial updates have been processed for this upload. let resp_idx = 0; const eventHandler = (progressEvent) => { - // handle any delta / event streamed from the upload event handler + const event_callbacks = socket._callbacks.$event; + // Whenever called, responseText will contain the entire response so far. const chunks = progressEvent.event.target.responseText.trim().split("\n"); + // So only process _new_ chunks beyond resp_idx. chunks.slice(resp_idx).map((chunk) => { - try { - socket._callbacks.$event.map((f) => { - f(chunk); - }); - resp_idx += 1; - } catch (e) { - if (progressEvent.progress === 1) { - // Chunk may be incomplete, so only report errors when full response is available. - console.log("Error parsing chunk", chunk, e); - } - return; - } + event_callbacks.map((f, ix) => { + f(chunk) + .then(() => { + if (ix === event_callbacks.length - 1) { + // Mark this chunk as processed. + resp_idx += 1; + } + }) + .catch((e) => { + if (progressEvent.progress === 1) { + // Chunk may be incomplete, so only report errors when full response is available. + console.log("Error parsing chunk", chunk, e); + } + return; + }); + }); }); }; @@ -711,7 +718,7 @@ export const useEventLoop = ( const combined_name = events.map((e) => e.name).join("+++"); if (event_actions?.temporal) { if (!socket.current || !socket.current.connected) { - return; // don't queue when the backend is not connected + return; // don't queue when the backend is not connected } } if (event_actions?.throttle) { @@ -852,7 +859,7 @@ export const useEventLoop = ( if (router.components[router.pathname].error) { delete router.components[router.pathname].error; } - } + }; router.events.on("routeChangeStart", change_start); router.events.on("routeChangeComplete", change_complete); router.events.on("routeChangeError", change_error); diff --git a/reflex/components/core/upload.py b/reflex/components/core/upload.py index 33dfae40fe..87488d98aa 100644 --- a/reflex/components/core/upload.py +++ b/reflex/components/core/upload.py @@ -293,13 +293,15 @@ def create(cls, *children, **props) -> Component: format.to_camel_case(key): value for key, value in upload_props.items() } - use_dropzone_arguments = { - "onDrop": event_var, - **upload_props, - } + use_dropzone_arguments = Var.create( + { + "onDrop": event_var, + **upload_props, + } + ) left_side = f"const {{getRootProps: {root_props_unique_name}, getInputProps: {input_props_unique_name}}} " - right_side = f"useDropzone({str(Var.create(use_dropzone_arguments))})" + right_side = f"useDropzone({str(use_dropzone_arguments)})" var_data = VarData.merge( VarData( @@ -307,6 +309,7 @@ def create(cls, *children, **props) -> Component: hooks={Hooks.EVENTS: None}, ), event_var._get_all_var_data(), + use_dropzone_arguments._get_all_var_data(), VarData( hooks={ callback_str: None, diff --git a/tests/integration/test_upload.py b/tests/integration/test_upload.py index fe8ebb4d76..b7f14b03d0 100644 --- a/tests/integration/test_upload.py +++ b/tests/integration/test_upload.py @@ -19,10 +19,14 @@ def UploadFile(): import reflex as rx + LARGE_DATA = "DUMMY" * 1024 * 512 + class UploadState(rx.State): _file_data: Dict[str, str] = {} event_order: List[str] = [] progress_dicts: List[dict] = [] + disabled: bool = False + large_data: str = "" async def handle_upload(self, files: List[rx.UploadFile]): for file in files: @@ -33,6 +37,7 @@ async def handle_upload_secondary(self, files: List[rx.UploadFile]): for file in files: upload_data = await file.read() self._file_data[file.filename or ""] = upload_data.decode("utf-8") + self.large_data = LARGE_DATA yield UploadState.chain_event def upload_progress(self, progress): @@ -41,13 +46,15 @@ def upload_progress(self, progress): self.progress_dicts.append(progress) def chain_event(self): + assert self.large_data == LARGE_DATA + self.large_data = "" self.event_order.append("chain_event") def index(): return rx.vstack( rx.input( value=UploadState.router.session.client_token, - is_read_only=True, + read_only=True, id="token", ), rx.heading("Default Upload"), @@ -56,6 +63,7 @@ def index(): rx.button("Select File"), rx.text("Drag and drop files here or click to select files"), ), + disabled=UploadState.disabled, ), rx.button( "Upload",