Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/lightning_app/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ async def post_state(
last_state = global_app_state_store.get_served_state(x_lightning_session_uuid)
state = deepcopy(last_state)
state["app_state"]["stage"] = body["stage"]
deep_diff = DeepDiff(last_state, state)
deep_diff = DeepDiff(last_state, state, verbose_level=2)
else:
state = body["state"]
last_state = global_app_state_store.get_served_state(x_lightning_session_uuid)
deep_diff = DeepDiff(last_state, state)
deep_diff = DeepDiff(last_state, state, verbose_level=2)
update_delta = Delta(deep_diff)
api_app_delta_queue.put(update_delta)

Expand Down
4 changes: 2 additions & 2 deletions src/lightning_app/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def set_last_state(self, state):

@staticmethod
def populate_changes(last_state, new_state):
diff = DeepDiff(last_state, new_state, view="tree")
diff = DeepDiff(last_state, new_state, view="tree", verbose_level=2)

changes_categories = [diff[key] for key in diff.to_dict()]

Expand Down Expand Up @@ -307,7 +307,7 @@ def maybe_apply_changes(self) -> bool:
if not deltas:
# When no deltas are received from the Rest API or work queues,
# we need to check if the flow modified the state and populate changes.
if Delta(DeepDiff(self.last_state, self.state)).to_dict():
if Delta(DeepDiff(self.last_state, self.state, verbose_level=2)).to_dict():
# new_state = self.populate_changes(self.last_state, self.state)
self.set_state(self.state)
self._has_updated = True
Expand Down
14 changes: 9 additions & 5 deletions src/lightning_app/utilities/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def run_once(self) -> None:
self._delta_memory.clear()

# The remaining delta is the result of state updates triggered outside the setattr, e.g, by a list append
delta = Delta(DeepDiff(self._last_state, self._work.state))
delta = Delta(DeepDiff(self._last_state, self._work.state, verbose_level=2))
if not delta.to_dict():
return
self._last_state = deepcopy(self._work.state)
Expand Down Expand Up @@ -256,7 +256,7 @@ def __call__(self, name: str, value: Any) -> None:
with _state_observer_lock:
state = deepcopy(self.work.state)
self.work._default_setattr(name, value)
delta = Delta(DeepDiff(state, self.work.state))
delta = Delta(DeepDiff(state, self.work.state, verbose_level=2))
if not delta.to_dict():
return

Expand Down Expand Up @@ -408,7 +408,9 @@ def run_once(self):
make_status(WorkStageStatus.FAILED, message=str(e), reason=WorkFailureReasons.USER_EXCEPTION)
)
self.delta_queue.put(
ComponentDelta(id=self.work_name, delta=Delta(DeepDiff(reference_state, self.work.state)))
ComponentDelta(
id=self.work_name, delta=Delta(DeepDiff(reference_state, self.work.state, verbose_level=2))
)
)
self.work.on_exception(e)
print("########## CAPTURED EXCEPTION ###########")
Expand Down Expand Up @@ -437,7 +439,9 @@ def run_once(self):
reference_state = deepcopy(self.work.state)
self.work._calls[call_hash]["statuses"].append(make_status(WorkStageStatus.SUCCEEDED))
self.work._calls[call_hash]["ret"] = ret
self.delta_queue.put(ComponentDelta(id=self.work_name, delta=Delta(DeepDiff(reference_state, self.work.state))))
self.delta_queue.put(
ComponentDelta(id=self.work_name, delta=Delta(DeepDiff(reference_state, self.work.state, verbose_level=2)))
)

# 18. Update the work for the next delta if any.
self._proxy_setattr(cleanup=True)
Expand All @@ -452,7 +456,7 @@ def _sigterm_signal_handler(self, signum, frame, call_hash: str) -> None:
self.work._calls[call_hash]["statuses"].append(
make_status(WorkStageStatus.STOPPED, reason=WorkStopReasons.SIGTERM_SIGNAL_HANDLER)
)
delta = Delta(DeepDiff(state, self.work.state))
delta = Delta(DeepDiff(state, self.work.state, verbose_level=2))
self.delta_queue.put(ComponentDelta(id=self.work_name, delta=delta))

# kill the thread as the job is going to be terminated.
Expand Down
4 changes: 3 additions & 1 deletion src/lightning_app/utilities/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def run_once(self):
flow = self._app.get_component_by_name(metadata["name"])
previous_state = deepcopy(flow.state)
flow._enable_schedule(call_hash)
component_delta = ComponentDelta(id=flow.name, delta=Delta(DeepDiff(previous_state, flow.state)))
component_delta = ComponentDelta(
id=flow.name, delta=Delta(DeepDiff(previous_state, flow.state, verbose_level=2))
)
self._app.delta_queue.put(component_delta)
metadata["start_time"] = next_event.isoformat()

Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/utilities/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _store_state(self, state: Dict[str, Any]) -> None:

def send_delta(self) -> None:
app_url = f"{self._url}/api/v1/delta"
deep_diff = DeepDiff(_LAST_STATE, _STATE)
deep_diff = DeepDiff(_LAST_STATE, _STATE, verbose_level=2)
assert self._plugin is not None
# TODO: Find how to prevent the infinite loop on refresh without storing the DeepDiff
if self._plugin.should_update_app(deep_diff):
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/core/test_lightning_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _change_stage(self, enum):
previous_state = deepcopy(self.state)
current_state = self.state
current_state["app_state"]["stage"] = enum.value
deep_diff = DeepDiff(previous_state, current_state)
deep_diff = DeepDiff(previous_state, current_state, verbose_level=2)
self.api_delta_queue.put(Delta(deep_diff))

def maybe_apply_changes(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/core/test_lightning_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def run(self):
work_state = flow_a.work.state
flow_a.work.counter = 1
work_state_2 = flow_a.work.state
delta = Delta(DeepDiff(work_state, work_state_2))
delta = Delta(DeepDiff(work_state, work_state_2, verbose_level=2))
delta = _delta_to_appstate_delta(flow_a, flow_a.work, delta)
new_flow_state = LightningApp.populate_changes(flow_state, flow_state + delta)
flow_a.set_state(new_flow_state)
Expand Down
4 changes: 3 additions & 1 deletion tests/tests_app/utilities/test_proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ def __call__(self):
"message": None,
}
)
self.delta_queue.put(ComponentDelta(id=self.work_name, delta=Delta(DeepDiff(state, self.work.state))))
self.delta_queue.put(
ComponentDelta(id=self.work_name, delta=Delta(DeepDiff(state, self.work.state, verbose_level=2)))
)
self.counter += 1
except Exception as e:
logger.error(traceback.format_exc())
Expand Down