Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using thread in on_event becomes slower #514

Open
bobd988 opened this issue May 27, 2024 · 3 comments
Open

Using thread in on_event becomes slower #514

bobd988 opened this issue May 27, 2024 · 3 comments
Labels
bug Something isn't working python Python API

Comments

@bobd988
Copy link
Contributor

bobd988 commented May 27, 2024

Describe the bug
I am exploring best practice for processing high frequency input data and to avoid depending on tick predefined frequency As we know ROS use spin method to keep poll ROS node running and the sampling rate is slower than a while loop that runs the thread's run method, This is true for Dora as the tick is similar as spin. So I created a thread to processing the webcam data t with a while loop to compare the performance. However the visual output has high latency. I am expecting a similar results as using the original way in examples which does not have obvious latency.

This test is based on examples/python-dataflow
The operator on_event now is just a trigger for thread to start. So everything is wrapped inside thread while loop as it does not depends on on_event frequency but in while loop running as fast as it can be. If a thread is already started then next on_event it will skip.

To Reproduce
Dora 0.3.4 , Ubuntu 22.04
replace the webcam.py with this below one and dataflow.yaml as well.


CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
video_capture = cv2.VideoCapture(CAMERA_INDEX)
font = cv2.FONT_HERSHEY_SIMPLEX

start = time.time()


import threading

class Operator:
    def __init__(self) -> None:
        self.frame = None
        self.stop_thread = False
        self.video_thread = None

    def capture_video(self, event, send_output):
        start = time.time()
        while not self.stop_thread and time.time() - start < 50:
            ret, frame = video_capture.read()
            if not ret:
                frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                cv2.putText(
                    frame,
                    "No Webcam was found at index %d" % (CAMERA_INDEX),
                    (int(30), int(30)),
                    font,
                    0.75,
                    (255, 255, 255),
                    2,
                    1,
                )
            self.frame = frame

            # Wait next dora_input
            event_type = event["type"]
            if event_type == "INPUT":
                if self.frame is not None:
                    try:
                        send_output(
                            "image",
                            cv2.imencode(".jpg", self.frame)[1].tobytes(),
                            event["metadata"],
                        )
                    except Exception as e:
                        print(f"Failed to send output: {e}")
            elif event_type == "STOP":
                self.stop_thread = True
                break
            else:
                print("received unexpected event:", event_type)
                break

        video_capture.release()

    def on_event(
            self,
            event,
            send_output,
    ) -> DoraStatus:
        # Start video capture in a separate thread if not already running
        if self.video_thread is None or not self.video_thread.is_alive():
            self.video_thread = threading.Thread(target=self.capture_video, args=(event, send_output))
            self.video_thread.start()
        return DoraStatus.CONTINUE # Continue processing events


Here is the dataflow.yml, I changed to use operator


nodes:
  - id: webcam
    operator:
      python: ./webcam.py
      inputs:
        tick:
          source: dora/timer/millis/50
          queue_size: 1000
      outputs:
        - image

  - id: object_detection
    custom:
      source: ./object_detection.py
      inputs:
        image: webcam/image
      outputs:
        - bbox

  - id: plot
    custom:
      source: ./plot.py
      inputs:
        image: webcam/image
        bbox: object_detection/bbox

to start the program, run

dora start dataflow.yml --name my-dataflow

@github-actions github-actions bot added bug Something isn't working python Python API labels May 27, 2024
@haixuanTao
Copy link
Collaborator

So, FYI, we're going to depreciate operators, so please use node API in the future.

In the meantime, it seems that you're spawning a new thread at each event which is going to add some overhead.

So it is not surprising that the thread are slower than without spawning a thread.

I wouldn't recommend using threading in general, as there is many python object that are not threadsafe such as numpy and pandas so I would recommend knowing what you're doing as well as using Mutex locks.

Also, using threading might add some GIL issues.

In time of doubts I would recommend using multiple dora python nodes.

@haixuanTao
Copy link
Collaborator

haixuanTao commented May 27, 2024

I would recommend trying:

CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
video_capture = cv2.VideoCapture(CAMERA_INDEX)
font = cv2.FONT_HERSHEY_SIMPLEX

start = time.time()


import threading

class Operator:
    def __init__(self) -> None:
        self.frame = None
        self.stop_thread = False
        self.video_thread = None

    def capture_video(self, event, send_output):
        start = time.time()
        while not self.stop_thread and time.time() - start < 50:
            ret, frame = video_capture.read()
            if not ret:
                frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
                cv2.putText(
                    frame,
                    "No Webcam was found at index %d" % (CAMERA_INDEX),
                    (int(30), int(30)),
                    font,
                    0.75,
                    (255, 255, 255),
                    2,
                    1,
                )
            self.frame = frame

            # Wait next dora_input
            event_type = event["type"]
            if event_type == "INPUT":
                if self.frame is not None:
                    try:
                        send_output(
                            "image",
                            cv2.imencode(".jpg", self.frame)[1].tobytes(),
                            event["metadata"],
                        )
                    except Exception as e:
                        print(f"Failed to send output: {e}")
            elif event_type == "STOP":
                self.stop_thread = True
                break
            else:
                print("received unexpected event:", event_type)
                break

        video_capture.release()

    def on_event(
            self,
            event,
            send_output,
    ) -> DoraStatus:
        # Start video capture in a separate thread if not already running
        if self.video_thread is None or not self.video_thread.is_alive():
            self.video_thread = threading.Thread(target=self.capture_video, args=(event, send_output))
            self.video_thread.start()
        return DoraStatus.CONTINUE # Continue processing events


op = Op()

while True:
    op.on_event({"type": "INPUT"}, print)

This run without dora, and is equal to the overhead of the threading mecanism

@bobd988
Copy link
Contributor Author

bobd988 commented May 28, 2024

Thanks. The above thread code started ok after i reboot PC. I am trying to explore the ROS Multi-Threaded Executor scenario usage in DORA. Some third party existing module are provided as it is and needed to be as threaded env with multiple callbacks (=callbackgroup in ROS). Anyway this is not a high priority. If needed threading model in dora customer can still achieve with the python native thread but with own risk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working python Python API
Projects
None yet
Development

No branches or pull requests

2 participants