-
Notifications
You must be signed in to change notification settings - Fork 14
/
whisper_gui.py
472 lines (392 loc) · 17 KB
/
whisper_gui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
import sys
import sounddevice as sd
import numpy as np
import whisper
import queue
import threading
import time
import torch
import traceback
from datetime import datetime
from PySide6.QtCore import Qt, QTimer, Signal, QPropertyAnimation, QEasingCurve, QPointF, QRectF
from PySide6.QtGui import (QPainter, QColor, QPen, QLinearGradient, QRadialGradient,
QPainterPath, QTextCharFormat, QFont, QTextCursor)
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QTextEdit, QPushButton, QComboBox, QLabel, QHBoxLayout, QFrame, QMessageBox)
from transformers import WhisperProcessor, WhisperForConditionalGeneration
class WaveformWidget(QWidget):
def __init__(self):
super().__init__()
self.setMinimumHeight(100)
self.waves = []
self.target_waves = []
self.animation_timer = QTimer()
self.animation_timer.timeout.connect(self.update_waves)
self.animation_timer.start(30) # Faster updates for smoother animation
self.is_recording = False
self.transition_speed = 0.15 # Controls how fast waves transition
def start_animation(self):
self.is_recording = True
self.waves = [0.1] * 30 # Start with small waves
def stop_animation(self):
self.is_recording = False
self.target_waves = [0] * 30
def update_audio_data(self, data):
if len(data) > 0:
normalized = np.abs(data) / np.max(np.abs(data) + 1e-10)
if self.is_recording:
# Create more varied wave heights with higher amplitude
chunk_size = len(normalized) // 30
self.target_waves = [
# Increased amplitude
normalized[i:i + chunk_size].mean() * 1.2
for i in range(0, len(normalized), chunk_size)
][:30]
# Add more randomness for dynamic look (±30% variation)
self.target_waves = [
w * (1 + np.random.uniform(-0.3, 0.3)) for w in self.target_waves]
else:
self.target_waves = [0] * 30
self.update()
def update_waves(self):
if not self.waves:
self.waves = [0] * 30
self.target_waves = [0] * 30
# More responsive wave transitions
for i in range(len(self.waves)):
if self.is_recording:
# Enhanced dynamic variation
target = self.target_waves[i] * \
(1 + np.sin(time.time() * 6 + i) * 0.15)
target *= 1 + np.cos(time.time() * 4) * \
0.1 # Additional wave motion
else:
target = 0
# Faster transitions
self.waves[i] += (target - self.waves[i]) * 0.2
self.update()
def paintEvent(self, event):
if not self.waves:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
try:
width = self.width()
height = self.height()
center_y = height / 2
bar_width = width / (len(self.waves) * 1.5)
max_height = height * 0.85 # Slightly higher bars
# Green theme gradient
gradient = QLinearGradient(0, 0, 0, height)
if self.is_recording:
# Vibrant green colors during recording
gradient.setColorAt(0, QColor(46, 204, 113)) # Bright green
gradient.setColorAt(0.5, QColor(39, 174, 96)) # Medium green
gradient.setColorAt(1, QColor(33, 150, 83)) # Dark green
else:
# Subtle green when not recording
gradient.setColorAt(0, QColor(46, 204, 113, 200))
gradient.setColorAt(1, QColor(33, 150, 83, 200))
painter.setPen(Qt.NoPen)
painter.setBrush(gradient)
for i, amplitude in enumerate(self.waves):
x = width * i / len(self.waves)
# Enhanced wave motion
wave_effect = np.sin(time.time() * 4 + i * 0.5) * 0.08
bar_height = max_height * (amplitude + wave_effect)
if self.is_recording:
# Enhanced pulsing effect
pulse = 1 + np.sin(time.time() * 5) * 0.08
bar_height *= pulse
rect = QRectF(
x + bar_width/2,
center_y - bar_height/2,
bar_width,
bar_height
)
# Green glow effect when recording
if self.is_recording and amplitude > 0.1:
glow = QPainterPath()
glow.addRoundedRect(rect, bar_width/2, bar_width/2)
painter.fillPath(glow, QColor(46, 204, 113, 40))
painter.drawRoundedRect(rect, bar_width/2, bar_width/2)
finally:
painter.end()
class WhisperGUI(QMainWindow):
update_text = Signal(str)
add_newline = Signal()
def __init__(self):
super().__init__()
self.current_transcription = "" # Current transcription text
self.history_text = [] # Array to store history
self.current_segment_start = None # Track start time of current segment
# Show startup message
self.statusBar().showMessage("Application is starting...")
self.init_ui()
self.init_whisper()
self.last_buffer_reset = time.time()
self.update_text.connect(self.update_display)
self.add_newline.connect(self._add_newline)
def init_ui(self):
# Layout chính
main_layout = QVBoxLayout()
# Frame cho controls
controls_frame = QFrame()
controls_frame.setFrameStyle(QFrame.Panel | QFrame.Raised)
controls_layout = QHBoxLayout()
# Model selection
model_label = QLabel("Model:")
self.model_combo = QComboBox()
self.model_combo.addItems(["tiny", "base", "small", "medium", "large"])
self.model_combo.currentTextChanged.connect(self.load_model)
# Record button
self.record_button = QPushButton("Start Recording")
self.record_button.clicked.connect(self.toggle_recording)
# Add controls to layout
controls_layout.addWidget(model_label)
controls_layout.addWidget(self.model_combo)
controls_layout.addWidget(self.record_button)
controls_frame.setLayout(controls_layout)
# Text display
self.text_display = QTextEdit()
self.text_display.setReadOnly(True)
# Set dark theme
self.text_display.setStyleSheet(
"QTextEdit { background-color: #2b2b2b; color: white; }")
# Waveform
self.waveform = WaveformWidget()
# Add everything to main layout
main_layout.addWidget(controls_frame)
main_layout.addWidget(self.waveform)
main_layout.addWidget(self.text_display)
# Central widget
central_widget = QWidget()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
# Window properties
self.setWindowTitle("Whisper GUI")
self.setGeometry(100, 100, 800, 600)
def init_whisper(self):
self.recording = False
self.audio_queue = queue.Queue()
self.sample_rate = 16000
self.channels = 1
self.blocksize = int(self.sample_rate * 0.3) # 0.3 giây mỗi chunk
self.model = None
self.processor = None
self.process_thread = None
self.stable_tokens = None
self.unstable_tokens = None
self.eos_token = None
self.device = torch.device(
'mps' if torch.backends.mps.is_available() else 'cpu')
# Load model ngay khi khởi tạo
self.load_model()
def load_model(self):
model_name = self.model_combo.currentText()
self.statusBar().showMessage(f"Loading model {model_name}...")
print(f"Loading model {model_name}...")
try:
# Đổi tên model để phù hợp với transformers
if model_name == "tiny":
model_name = "openai/whisper-tiny"
elif model_name == "base":
model_name = "openai/whisper-base"
elif model_name == "small":
model_name = "openai/whisper-small"
elif model_name == "medium":
model_name = "openai/whisper-medium"
elif model_name == "large":
model_name = "openai/whisper-large"
# Load processor and model with specific configuration
self.processor = WhisperProcessor.from_pretrained(model_name)
self.model = WhisperForConditionalGeneration.from_pretrained(
model_name,
torch_dtype=torch.float32,
).to(self.device)
# Reset các biến streaming
self.stable_tokens = None
self.unstable_tokens = None
self.eos_token = None
print("Model loaded successfully!")
except Exception as e:
print(f"Error loading model: {str(e)}")
raise e
def process_audio(self):
"""Process audio data from the queue"""
if self.model is None or self.processor is None:
print("Model not loaded. Please load the model first.")
return
audio_buffer = np.array([], dtype=np.float32)
try:
while self.recording:
# Get audio data from queue
if self.audio_queue.empty():
time.sleep(0.1)
continue
# Lấy audio data mới và thêm vào buffer
audio_data = self.audio_queue.get()
audio_data = audio_data.flatten().astype(np.float32)
audio_buffer = np.concatenate([audio_buffer, audio_data])
# Giới hạn độ dài buffer để tránh quá tải
max_buffer_size = self.sample_rate * 15 # 15 giây
current_time = time.time()
buffer_reset_time = 15
# Reset buffer sau mỗi 15 giây
if current_time - self.last_buffer_reset > buffer_reset_time:
# Save current transcription to history before reset
if self.current_transcription.strip():
# Format timestamps
end_time = datetime.fromtimestamp(current_time)
start_time = datetime.fromtimestamp(
self.current_segment_start or (current_time - buffer_reset_time))
timestamp = f"[{start_time.strftime('%H:%M:%S')}-{end_time.strftime('%H:%M:%S')}]"
# Add to history with timestamp
self.history_text.append(
f"{timestamp} {self.current_transcription.strip()}")
self.current_transcription = ""
self.current_segment_start = current_time # Set start time for new segment
self.last_buffer_reset = current_time
self.add_newline.emit() # Emit signal instead of direct modification
audio_buffer = audio_data # Reset buffer
elif len(audio_buffer) > max_buffer_size:
audio_buffer = audio_buffer[-max_buffer_size:]
try:
# Xử lý audio thành features
inputs = self.processor(
audio_buffer,
sampling_rate=self.sample_rate,
return_tensors="pt"
)
input_features = inputs.input_features.to(self.device)
# Tạo attention mask
attention_mask = torch.ones_like(
input_features, dtype=torch.long, device=self.device)
# Generate token ids
predicted_ids = self.model.generate(
input_features,
attention_mask=attention_mask,
task="transcribe",
return_timestamps=False,
max_new_tokens=128,
num_beams=1, # Giảm số beam để tăng tốc độ
do_sample=False # Tắt sampling để ổn định hơn
)
# Decode token ids to text
transcription = self.processor.batch_decode(
predicted_ids,
skip_special_tokens=True
)[0]
# Update the display with new text
self.current_transcription = transcription
self.update_text.emit(transcription)
except Exception as e:
print(f"Error in transcription: {str(e)}")
traceback.print_exc()
continue
except Exception as e:
print(f"Error in process_audio: {str(e)}")
traceback.print_exc()
def toggle_recording(self):
if not self.recording:
self.start_recording()
else:
self.stop_recording()
def start_recording(self):
if not self.model:
self.load_model()
self.recording = True
self.record_button.setText("Stop Recording")
self.waveform.start_animation()
# Start processing thread
self.process_thread = threading.Thread(target=self.process_audio)
self.process_thread.start()
# Start audio input stream
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self.audio_callback,
blocksize=self.blocksize
)
self.stream.start()
def stop_recording(self):
self.recording = False
self.record_button.setText("Start Recording")
self.waveform.stop_animation()
if hasattr(self, 'stream'):
self.stream.stop()
self.stream.close()
if self.process_thread:
self.process_thread.join()
def audio_callback(self, indata, frames, time, status):
"""Callback for audio input"""
if status:
print(status)
# Add the new audio data to the queue
self.audio_queue.put(indata.copy())
self.waveform.update_audio_data(indata.copy())
def merge_text(self, text1, text2):
"""
Merge two texts intelligently, keeping the context and avoiding duplicates
"""
if not text1:
return text2
# Tìm phần chung dài nhất giữa cuối text1 và đầu text2
words1 = text1.lower().split()
words2 = text2.lower().split()
max_overlap = 0
overlap_pos = 0
# Tìm vị trí overlap tốt nhất
for i in range(len(words1)):
for j in range(len(words2)):
k = 0
while (i + k < len(words1) and
k < len(words2) and
words1[i + k] == words2[k]):
k += 1
if k > max_overlap:
max_overlap = k
overlap_pos = i
if max_overlap > 0:
# Kết hợp text với phần overlap
result = " ".join(words1[:overlap_pos] + words2)
else:
# Nếu không có overlap, nối trực tiếp
result = text1 + " " + text2
return result
def _add_newline(self):
if len(self.text_display.toPlainText().strip()) > 0:
self.text_display.append("")
def update_display(self, text):
# Create display text with history above and current transcription below
display_text = ""
if self.history_text:
display_text = "\n".join(self.history_text) + "\n\n"
# Add current transcription with timestamp if available
if self.current_segment_start and text.strip():
current_time = time.time()
start_time = datetime.fromtimestamp(self.current_segment_start)
end_time = datetime.fromtimestamp(current_time)
timestamp = f"[{start_time.strftime('%H:%M:%S')}-{end_time.strftime('%H:%M:%S')}]"
display_text += f"Current: {timestamp} {text}"
else:
display_text += f"Current: {text}"
self.text_display.setPlainText(display_text)
cursor = self.text_display.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
self.text_display.setTextCursor(cursor)
def closeEvent(self, event):
# Stop any background processes or threads
if self.recording:
self.stop_recording()
# Perform any additional cleanup here
print("Application is closing...")
# Accept the event to close the application
event.accept()
def main():
app = QApplication(sys.argv)
window = WhisperGUI()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()