Skip to content

Commit

Permalink
Multimodal dataprep (opea-project#575)
Browse files Browse the repository at this point in the history
* multimodal embedding for MM RAG for videos

Signed-off-by: Tiep Le <tiep.le@intel.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* develop data prep first commit

Signed-off-by: Tiep Le <tiep.le@intel.com>

* develop dataprep microservice for multimodal data

Signed-off-by: Tiep Le <tiep.le@intel.com>

* multimodal langchain for dataprep

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update README

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update README

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update README

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update README

Signed-off-by: Tiep Le <tiep.le@intel.com>

* cosmetic

Signed-off-by: Tiep Le <tiep.le@intel.com>

* test for multimodal dataprep

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update test

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update test

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update test

Signed-off-by: Tiep Le <tiep.le@intel.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* cosmetic update

Signed-off-by: Tiep Le <tiep.le@intel.com>

* remove langsmith

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update API to remove /dataprep from API names and remove langsmith

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update test

Signed-off-by: Tiep Le <tiep.le@intel.com>

* update the error message per PR reviewer

Signed-off-by: Tiep Le <tiep.le@intel.com>

---------

Signed-off-by: Tiep Le <tiep.le@intel.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Signed-off-by: Dong, Bo1 <bo1.dong@intel.com>
  • Loading branch information
2 people authored and a32543254 committed Sep 3, 2024
1 parent 1845be6 commit a617036
Show file tree
Hide file tree
Showing 31 changed files with 2,799 additions and 3 deletions.
4 changes: 4 additions & 0 deletions comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
GraphDoc,
LVMDoc,
LVMVideoDoc,
ImageDoc,
TextImageDoc,
MultimodalDoc,
EmbedMultimodalDoc,
)

# Constants
Expand Down
39 changes: 37 additions & 2 deletions comps/cores/proto/docarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
from docarray import BaseDoc, DocList
from docarray.documents import AudioDoc, VideoDoc
from docarray.typing import AudioUrl
from docarray.typing import AudioUrl, ImageUrl
from pydantic import Field, conint, conlist, field_validator


Expand All @@ -17,7 +17,30 @@ class TopologyInfo:


class TextDoc(BaseDoc, TopologyInfo):
text: str
text: str = None


class ImageDoc(BaseDoc):
url: Optional[ImageUrl] = Field(
description="The path to the image. It can be remote (Web) URL, or a local file path",
default=None,
)
base64_image: Optional[str] = Field(
description="The base64-based encoding of the image",
default=None,
)


class TextImageDoc(BaseDoc):
image: ImageDoc = None
text: TextDoc = None


MultimodalDoc = Union[
TextDoc,
ImageDoc,
TextImageDoc,
]


class Base64ByteStrDoc(BaseDoc):
Expand All @@ -43,6 +66,18 @@ class EmbedDoc(BaseDoc):
score_threshold: float = 0.2


class EmbedMultimodalDoc(EmbedDoc):
# extend EmbedDoc with these attributes
url: Optional[ImageUrl] = Field(
description="The path to the image. It can be remote (Web) URL, or a local file path.",
default=None,
)
base64_image: Optional[str] = Field(
description="The base64-based encoding of the image.",
default=None,
)


class Audio2TextDoc(AudioDoc):
url: Optional[AudioUrl] = Field(
description="The path to the audio.",
Expand Down
258 changes: 258 additions & 0 deletions comps/dataprep/multimodal_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import base64
import json
import os
import uuid
from pathlib import Path
from typing import Iterator

import cv2
import requests
import webvtt
import whisper
from moviepy.editor import VideoFileClip


def create_upload_folder(upload_path):
"""Create a directory to store uploaded video data."""
if not os.path.exists(upload_path):
Path(upload_path).mkdir(parents=True, exist_ok=True)


def load_json_file(file_path):
"""Read contents of json file."""
with open(file_path, "r") as file:
data = json.load(file)
return data


def clear_upload_folder(upload_path):
"""Clear the upload directory."""
for root, dirs, files in os.walk(upload_path, topdown=False):
for file in files:
file_path = os.path.join(root, file)
os.remove(file_path)
for dir in dirs:
dir_path = os.path.join(root, dir)
os.rmdir(dir_path)


def generate_video_id():
"""Generates a unique identifier for a video file."""
return str(uuid.uuid4())


def convert_video_to_audio(video_path: str, output_audio_path: str):
"""Converts video to audio using MoviePy library that uses `ffmpeg` under the hood.
:param video_path: file path of video file (.mp4)
:param output_audio_path: file path of audio file (.wav) to be created
"""
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(output_audio_path)
video_clip.close()
audio_clip.close()


def load_whisper_model(model_name: str = "base"):
"""Load a whisper model for generating video transcripts."""
return whisper.load_model(model_name)


def extract_transcript_from_audio(whisper_model, audio_path: str):
"""Generate transcript from audio file.
:param whisper_model: a pre-loaded whisper model object
:param audio_path: file path of audio file (.wav)
"""
options = dict(task="translate", best_of=5, language="en")
return whisper_model.transcribe(audio_path, **options)


def format_timestamp_for_transcript(seconds: float, always_include_hours: bool = True, fractionalSeperator: str = "."):
"""Format timestamp for video transcripts."""
milliseconds = round(seconds * 1000.0)

hours = milliseconds // 3_600_000
milliseconds -= hours * 3_600_000

minutes = milliseconds // 60_000
milliseconds -= minutes * 60_000

seconds = milliseconds // 1_000
milliseconds -= seconds * 1_000

hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else ""
return f"{hours_marker}{minutes:02d}:{seconds:02d}{fractionalSeperator}{milliseconds:03d}"


def write_vtt(transcript: Iterator[dict], vtt_path: str):
"""Write transcripts to a .vtt file."""
with open(vtt_path, "a") as file:
file.write("WEBVTT\n\n")
for segment in transcript["segments"]:
text = (segment["text"]).replace("-->", "->")
file.write(
f"{format_timestamp_for_transcript(segment['start'])} --> {format_timestamp_for_transcript(segment['end'])}\n"
)
file.write(f"{text.strip()}\n\n")


def delete_audio_file(audio_path: str):
"""Delete audio file after extracting transcript."""
os.remove(audio_path)


def time_to_frame(time: float, fps: float):
"""Convert time in seconds into frame number."""
return int(time * fps - 1)


def str2time(strtime: str):
"""Get time in seconds from string."""
strtime = strtime.strip('"')
hrs, mins, seconds = [float(c) for c in strtime.split(":")]

total_seconds = hrs * 60**2 + mins * 60 + seconds

return total_seconds


def convert_img_to_base64(image):
"Convert image to base64 string"
_, buffer = cv2.imencode(".jpg", image)
encoded_string = base64.b64encode(buffer)
return encoded_string.decode()


def extract_frames_and_annotations_from_transcripts(video_id: str, video_path: str, vtt_path: str, output_dir: str):
"""Extract frames (.jpg) and annotations (.json) from video file (.mp4) and captions file (.vtt)"""
# Set up location to store frames and annotations
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)

# Load video and get fps
vidcap = cv2.VideoCapture(video_path)
fps = vidcap.get(cv2.CAP_PROP_FPS)

# read captions file
captions = webvtt.read(vtt_path)

annotations = []
for idx, caption in enumerate(captions):
start_time = str2time(caption.start)
end_time = str2time(caption.end)

mid_time = (end_time + start_time) / 2
text = caption.text.replace("\n", " ")

frame_no = time_to_frame(mid_time, fps)
mid_time_ms = mid_time * 1000
vidcap.set(cv2.CAP_PROP_POS_MSEC, mid_time_ms)
success, frame = vidcap.read()

if success:
# Save frame for further processing
img_fname = f"frame_{idx}"
img_fpath = os.path.join(output_dir, "frames", img_fname + ".jpg")
cv2.imwrite(img_fpath, frame)

# Convert image to base64 encoded string
b64_img_str = convert_img_to_base64(frame)

# Create annotations for frame from transcripts
annotations.append(
{
"video_id": video_id,
"video_name": os.path.basename(video_path),
"b64_img_str": b64_img_str,
"caption": text,
"time": mid_time_ms,
"frame_no": frame_no,
"sub_video_id": idx,
}
)

# Save transcript annotations as json file for further processing
with open(os.path.join(output_dir, "annotations.json"), "w") as f:
json.dump(annotations, f)

vidcap.release()
return annotations


def use_lvm(endpoint: str, img_b64_string: str, prompt: str = "Provide a short description for this scene."):
"""Generate image captions/descriptions using LVM microservice."""
inputs = {"image": img_b64_string, "prompt": prompt, "max_new_tokens": 32}
response = requests.post(url=endpoint, data=json.dumps(inputs))
print(response)
return response.json()["text"]


def extract_frames_and_generate_captions(
video_id: str, video_path: str, lvm_endpoint: str, output_dir: str, key_frame_per_second: int = 1
):
"""Extract frames (.jpg) and annotations (.json) from video file (.mp4) by generating captions using LVM microservice."""
# Set up location to store frames and annotations
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "frames"), exist_ok=True)

# Load video and get fps
vidcap = cv2.VideoCapture(video_path)
fps = vidcap.get(cv2.CAP_PROP_FPS)

annotations = []
hop = round(fps / key_frame_per_second)
curr_frame = 0
idx = -1

while True:
ret, frame = vidcap.read()
if not ret:
break

if curr_frame % hop == 0:
idx += 1

mid_time = vidcap.get(cv2.CAP_PROP_POS_MSEC)
mid_time_ms = mid_time * 1000

frame_no = curr_frame
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

# Save frame for further processing
img_fname = f"frame_{idx}"
img_fpath = os.path.join(output_dir, "frames", img_fname + ".jpg")
cv2.imwrite(img_fpath, frame)

# Convert image to base64 encoded string
b64_img_str = convert_img_to_base64(frame)

# Caption generation using LVM microservice
caption = use_lvm(lvm_endpoint, b64_img_str)
caption = caption.strip()
text = caption.replace("\n", " ")

# Create annotations for frame from transcripts
annotations.append(
{
"video_id": video_id,
"video_name": os.path.basename(video_path),
"b64_img_str": b64_img_str,
"caption": text,
"time": mid_time_ms,
"frame_no": frame_no,
"sub_video_id": idx,
}
)

curr_frame += 1

# Save caption annotations as json file for further processing
with open(os.path.join(output_dir, "annotations.json"), "w") as f:
json.dump(annotations, f)

vidcap.release()
4 changes: 3 additions & 1 deletion comps/dataprep/redis/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Dataprep Microservice with Redis

For dataprep microservice, we provide two frameworks: `Langchain` and `LlamaIndex`. We also provide `Langchain_ray` which uses ray to parallel the data prep for multi-file performance improvement(observed 5x - 15x speedup by processing 1000 files/links.).
We have provided dataprep microservice for multimodal data input (e.g., text and image) [here](multimodal_langchain/README.md).

For dataprep microservice for text input, we provide here two frameworks: `Langchain` and `LlamaIndex`. We also provide `Langchain_ray` which uses ray to parallel the data prep for multi-file performance improvement(observed 5x - 15x speedup by processing 1000 files/links.).

We organized these two folders in the same way, so you can use either framework for dataprep microservice with the following constructions.

Expand Down
Loading

0 comments on commit a617036

Please sign in to comment.