Skip to content

Commit

Permalink
feat: replaced our model with built-in pyfeat classifier. (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderjk5 authored and Grutschus committed Dec 29, 2023
1 parent 2681ae6 commit c662fd3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
28 changes: 18 additions & 10 deletions botender/perception/detection_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

import numpy as np
import torch
from pandas import DataFrame

import botender.logging_utils as logging_utils
from botender.perception.detectors import EmotionDetector, FacialExpressionDetector
from botender.webcam_processor import Rectangle

from feat import Detector # type: ignore

warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)

Expand All @@ -25,8 +26,8 @@
class DetectionResult:
faces: list[Rectangle]
"""A list of rectangles representing the faces detected in the frame."""
features: DataFrame
"""A dataframe containing all the features extracted from the frame."""
features: list
"""A list containing all the features extracted from the frame."""
emotion: str
"""A string that defines the detected emotion."""

Expand All @@ -45,6 +46,8 @@ class DetectionWorker(Process):
_last_emotions: list[str] = []
_detect_emotion_counter: int = 0

_detector: Detector

def __init__(
self,
logging_queue: Queue,
Expand All @@ -62,17 +65,18 @@ def __init__(
self._result_connection = result_connection
self._stop_event = stop_event
self._detect_emotion_event = detect_emotion_event
self._current_result = DetectionResult(
faces=[], features=DataFrame(), emotion="neutral"
)
self._current_result = DetectionResult(faces=[], features=[], emotion="neutral")

def run(self):
"""Uses the detectors to detect faces and emotions in the newest frames."""

logging_utils.configure_publisher(self._logging_queue)
logger.debug("Successfully spawned detection worker. Initializing detector...")
self.facial_expression_detector = FacialExpressionDetector(device=_get_device())
self.emotion_detector = EmotionDetector()
self._detector = Detector(device=_get_device())
self.facial_expression_detector = FacialExpressionDetector(
detector=self._detector
)
self.emotion_detector = EmotionDetector(detector=self._detector)
logger.debug("Successfully initialized detector. Starting work loop...")
self._result_connection.send(True) # Signal that we are ready

Expand Down Expand Up @@ -115,10 +119,14 @@ def detect_emotion(self) -> bool:
return clear_flag

# extract features
features = self.facial_expression_detector.extract_features(self.work_frame)
features, faces = self.facial_expression_detector.extract_features(
self.work_frame
)
self._current_result.features = features
# predict emotion
emotion = self.emotion_detector.detect_emotion(features=features)
emotion = self.emotion_detector.detect_emotion(
frame=self.work_frame, faces=faces, features=features
)
self._last_emotions.append(emotion)

self._detect_emotion_counter += 1
Expand Down
52 changes: 26 additions & 26 deletions botender/perception/detectors/emotion_detector.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
import logging
import pickle

from pandas import DataFrame
from pkg_resources import resource_filename
from sklearn.svm import SVC # type: ignore[import-untyped]
from feat import Detector # type: ignore
from feat.utils import FEAT_EMOTION_COLUMNS # type: ignore
import numpy as np

logger = logging.getLogger(__name__)

SVM_MODEL_PATH = resource_filename(__name__, "models/svm_model.pkl")
SCALER_MODEL_PATH = resource_filename(__name__, "models/scaler.pkl")
LABEL_ENCODER_MODEL_PATH = resource_filename(__name__, "models/label_encoder.pkl")
PYFEAT_EMOTIONS_TO_EMOTIONS = {
"neutral": "neutral",
"anger": "angry",
"happy": "happy",
"sadness": "sad",
}


class EmotionDetector:
"""The EmotionDetector is responsible for predict the emotion of the user."""

_model: SVC # or whatever model we use
_detector: Detector # use built-in pyfeat classifier

def __init__(self):
# load model
logger.info("Loading emotion detection model...")
# Load the model from the file
with open(SVM_MODEL_PATH, "rb") as file:
self.loaded_model = pickle.load(file)
def __init__(self, detector: Detector):
self._detector = detector

with open(SCALER_MODEL_PATH, "rb") as file:
self.loaded_scaler = pickle.load(file)

with open(LABEL_ENCODER_MODEL_PATH, "rb") as file:
self.loaded_label_encoder = pickle.load(file)

def detect_emotion(self, features: DataFrame) -> str:
def detect_emotion(
self,
frame: np.ndarray,
faces: list[tuple[float, float, float, float, float]],
features: list,
) -> str:
"""Predicts the emotion in the given features and returns it as a string."""

if len(features) == 0:
if len(faces) == 0 or len(features) == 0:
return "neutral"

scaled_aus = self.loaded_scaler.transform(features[0])
predictions = self.loaded_model.predict(scaled_aus)
predicted_emotions = self.loaded_label_encoder.inverse_transform(predictions)
detected_emotions = self._detector.detect_emotions(frame, [faces], features)[0]

detected_emotion = FEAT_EMOTION_COLUMNS[np.argmax(detected_emotions[0])]
if detected_emotion not in PYFEAT_EMOTIONS_TO_EMOTIONS.keys():
detected_emotion = "neutral"
predicted_emotion = PYFEAT_EMOTIONS_TO_EMOTIONS[detected_emotion]

return predicted_emotions[0]
return predicted_emotion
18 changes: 9 additions & 9 deletions botender/perception/detectors/facial_expression_detector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np
from feat import Detector # type: ignore
from pandas import DataFrame
from typing import Tuple

from botender.webcam_processor import Rectangle

Expand All @@ -11,10 +11,10 @@ class FacialExpressionDetector:

_detector: Detector
_faces: list[tuple[float, float, float, float, float]]
_features: DataFrame
_features: list

def __init__(self, device: str = "cpu"):
self._detector = Detector(device=device)
def __init__(self, detector: Detector):
self._detector = detector

def detect_faces(self, frame) -> list[Rectangle]:
"""Detects faces in a frame and returns a list of rectangles representing the
Expand All @@ -23,14 +23,14 @@ def detect_faces(self, frame) -> list[Rectangle]:
self._faces = self._detector.detect_faces(frame)[0]
return [((x1, y1), (x2, y2)) for x1, y1, x2, y2, _ in self._faces]

def extract_features(self, frame: np.ndarray) -> DataFrame:
def extract_features(self, frame: np.ndarray) -> Tuple[list, list]:
"""Extracts features from the faces detected in the last frame and returns them
as a DataFrame."""
as a list. Returns additionally a list of the faces that were used to extract."""

faces = self._faces
if len(faces) == 0:
return DataFrame()
return ([], faces)

landmarks = self._detector.detect_landmarks(frame, [faces])
aus = self._detector.detect_aus(frame, landmarks)

return aus
return (landmarks, faces)

0 comments on commit c662fd3

Please sign in to comment.