Skip to content

Commit

Permalink
Use mp queue pool and dont pass fgMask to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
fayaaz committed Apr 20, 2021
1 parent 28c4fc4 commit 21cba1f
Showing 1 changed file with 53 additions and 43 deletions.
96 changes: 53 additions & 43 deletions camera-app/birbcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
REGION_NAME = os.getenv("REGION_NAME", "Canada")
MIN_CORRECT_CONF = os.getenv("MIN_CORRECT_CONF", 0.75)
MIN_UNREVIEWED_CONF = os.getenv("MIN_UNREVIEWED_CONF", 0.9)
WORKER_PROC = int(os.getenv("BIRBCAM_WORKER_PROC", "2"))

# Streamer
ENABLE_STREAMER = os.getenv("ENABLE_STREAMER", False)

streamer_port = 3030
Expand Down Expand Up @@ -74,7 +77,7 @@

# Create the Videoapture object for the webcam
capture = cv.VideoCapture()
capture.set(cv.CAP_PROP_FPS, 30)
capture.set(cv.CAP_PROP_FPS, 1)
CV_MASK_THRESH = 255 # Threhold for foreground mask: 255 indicates objects
CV_KERNEL_SIZE = 25 # nxn kernel size for 2d median filter on foreground mask

Expand Down Expand Up @@ -111,7 +114,14 @@ def camera_loop(queue, stop_time):
if i < burn_in:
i += 1
continue
queue.put((frame, fgMask, timestamp, utc_timestamp))

# Threshold mask
fgMaskMedian = medfilt2d(fgMask, CV_KERNEL_SIZE)
if (fgMaskMedian >= CV_MASK_THRESH).any():
# Put the frame and corresponding timestamp into the
# queue to be processed by the fastai models
queue.put((frame, timestamp, utc_timestamp))
logging.info(f'Passed image with timestamp {timestamp} for processing')

# Release the webcam
capture.release()
Expand Down Expand Up @@ -207,57 +217,57 @@ def main_loop(queue):
def image_processor(queue, DB_PATH=DB_PATH, save_dir=save_dir, model_path=MODEL_PATH):
x = None
learn = load_learner(model_path)
while True:
try:
# Threshold mask
x = queue.get()
frame, fgMask, timestamp, utc_timestamp = x
fgMaskMedian = medfilt2d(fgMask, CV_KERNEL_SIZE)
if (fgMaskMedian >= CV_MASK_THRESH).any():
# Get the frame and timestamp for the image to be processed
logging.debug(f'Processing image with timestamp {timestamp}')
# Convert the OpenCV image from BGR to RGB for fastai
rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
try:
x = queue.get()
# Get the frame and timestamp for the image to be processed
frame, timestamp, utc_timestamp = x
logging.debug(f'Processing image with timestamp {timestamp}')
# Convert the OpenCV image from BGR to RGB for fastai
rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)

# Get the predicted label and confidence
pred = learn.predict(rgb_frame)
labels = pred[0]
confidences = pred[2].tolist()
if len(labels) == 0:
labels = ['none']
confidence = 1 - max(confidences)
else:
confidence = min([c for c in confidences if c > 0.5])
fname_label = '_'.join(labels)
pred_label = ','.join(labels)
# Save the image with time stamp and label
filename = f'{timestamp}_{fname_label}.jpg'
filepath = f'{save_dir}{timestamp}_{fname_label}.jpg'
cv.imwrite(filepath, frame)
# Write results to sqlite3 database
conn = sqlite3.connect(DB_PATH, timeout=60)
with conn:
conn.execute("INSERT INTO results VALUES (?,?,?,?,?,?,?)",
(utc_timestamp, timestamp, filename, pred_label, confidence, None, None))
conn.close()
logging.info(f'Processed image with timestamp {timestamp} and found label(s) {pred_label}')
except Exception as e:
logging.error(traceback.format_exc())
pass
# Get the predicted label and confidence
pred = learn.predict(rgb_frame)
labels = pred[0]
confidences = pred[2].tolist()
if len(labels) == 0:
labels = ['none']
confidence = 1 - max(confidences)
else:
confidence = min([c for c in confidences if c > 0.5])
fname_label = '_'.join(labels)
pred_label = ','.join(labels)
# Save the image with time stamp and label
filename = f'{timestamp}_{fname_label}.jpg'
filepath = f'{save_dir}{timestamp}_{fname_label}.jpg'
cv.imwrite(filepath, frame)
# Write results to sqlite3 database
conn = sqlite3.connect(DB_PATH, timeout=60)
with conn:
conn.execute("INSERT INTO results VALUES (?,?,?,?,?,?,?)",
(utc_timestamp, timestamp, filename, pred_label, confidence, None, None))
conn.close()
logging.info(f'Processed image with timestamp {timestamp} and found label(s) {pred_label}')
except Exception as e:
logging.error(traceback.format_exc())
pass


def main():
# Use a multiprocessing queue to offload slow image processing
# to other processes/cores and keep the camera_loop from being
# blocked and missing frames.
q = mp.Queue()
m = mp.Manager()
q = m.Queue()
pool = mp.Pool(WORKER_PROC)
p1 = mp.Process(target=main_loop, args=(q,))
p2 = mp.Process(target=image_processor, args=(q,))
workers = []
for i in range(WORKER_PROC):
workers.append(
pool.apply_async(image_processor, (q,))
)
p1.start()
p2.start()
p1.join()
p2.join()

[output.get() for output in workers]

if __name__ == '__main__':
main()

0 comments on commit 21cba1f

Please sign in to comment.