-
Notifications
You must be signed in to change notification settings - Fork 13
/
av_align.py
176 lines (132 loc) · 5.32 KB
/
av_align.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
AV-Align Metric: Audio-Video Alignment Evaluation
AV-Align is a metric for evaluating the alignment between audio and video modalities in multimedia data.
It assesses synchronization by detecting audio and video peaks and calculating their Intersection over Union (IoU).
A higher IoU score indicates better alignment.
Usage:
- Provide a folder of video files as input.
- The script calculates the AV-Align score for the set of videos.
"""
import argparse
import glob
import cv2
import librosa
import librosa.display
# Function to extract frames from a video file
def extract_frames(video_path):
"""
Extract frames from a video file.
Args:
video_path (str): Path to the input video file.
Returns:
frames (list): List of frames extracted from the video.
frame_rate (float): Frame rate of the video.
"""
frames = []
cap = cv2.VideoCapture(video_path)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
if not cap.isOpened():
raise ValueError("Error: Unable to open the video file.")
while True:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
cap.release()
return frames, frame_rate
# Function to detect audio peaks using the Onset Detection algorithm
def detect_audio_peaks(audio_file):
"""
Detect audio peaks using the Onset Detection algorithm.
Args:
audio_file (str): Path to the audio file.
Returns:
onset_times (list): List of times (in seconds) where audio peaks occur.
"""
y, sr = librosa.load(audio_file)
# Calculate the onset envelope
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
# Get the onset events
onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr)
onset_times = librosa.frames_to_time(onset_frames, sr=sr)
return onset_times
# Function to find local maxima in a list
def find_local_max_indexes(arr, fps):
"""
Find local maxima in a list.
Args:
arr (list): List of values to find local maxima in.
fps (float): Frames per second, used to convert indexes to time.
Returns:
local_extrema_indexes (list): List of times (in seconds) where local maxima occur.
"""
local_extrema_indexes = []
for i in range(1, n - 1):
if arr[i - 1] < arr[i] > arr[i + 1]: # Local maximum
local_extrema_indexes.append(i / fps)
return local_extrema_indexes
# Function to detect video peaks using Optical Flow
def detect_video_peaks(frames, fps):
"""
Detect video peaks using Optical Flow.
Args:
frames (list): List of video frames.
fps (float): Frame rate of the video.
Returns:
flow_trajectory (list): List of optical flow magnitudes for each frame.
video_peaks (list): List of times (in seconds) where video peaks occur.
"""
flow_trajectory = [compute_of(frames[0], frames[1])] + [compute_of(frames[i - 1], frames[i]) for i in range(1, len(frames))]
video_peaks = find_local_max_indexes(flow_trajectory, fps)
return flow_trajectory, video_peaks
# Function to compute the optical flow magnitude between two frames
def compute_of(img1, img2):
"""
Compute the optical flow magnitude between two video frames.
Args:
img1 (numpy.ndarray): First video frame.
img2 (numpy.ndarray): Second video frame.
Returns:
avg_magnitude (float): Average optical flow magnitude for the frame pair.
"""
# Calculate the optical flow
prev_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
# Calculate the magnitude of the optical flow vectors
magnitude = cv2.magnitude(flow[..., 0], flow[..., 1])
avg_magnitude = cv2.mean(magnitude)[0]
return avg_magnitude
# Function to calculate Intersection over Union (IoU) for audio and video peaks
def calc_intersection_over_union(audio_peaks, video_peaks, fps):
"""
Calculate Intersection over Union (IoU) between audio and video peaks.
Args:
audio_peaks (list): List of audio peak times (in seconds).
video_peaks (list): List of video peak times (in seconds).
fps (float): Frame rate of the video.
Returns:
iou (float): Intersection over Union score.
"""
intersection_length = 0
for audio_peak in audio_peaks:
for video_peak in video_peaks:
if video_peak - 1 / fps < audio_peak < video_peak + 1 / fps:
intersection_length += 1
break
return intersection_length / (len(audio_peaks) + len(video_peaks) - intersection_length)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input_dir", type=str, required=True, help='Insert the videos folder path')
args = parser.parse_args()
files = [file for file in glob.glob(args.input_dir + '*.mp4')]
score = 0
for file in files:
file = file[:-4]
video_path = f'{file}.mp4'
audio_path = f'{file}.wav'
frames, fps = extract_frames(video_path)
audio_peaks = detect_audio_peaks(audio_path)
flow_trajectory, video_peaks = detect_video_peaks(frames, fps)
score += calc_intersection_over_union(audio_peaks, video_peaks, fps)
print('AV-Align: ', score/len(files))