diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fe88abd --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv +.env +__pycache__/ +whisper_models diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6dee9ca --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM ubuntu:22.04 + +RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ + python3.11 \ + python3-distutils \ + python3-pip \ + ffmpeg + +WORKDIR /app + +ADD ./whisper_models whisper_models +ADD ./requirements.txt requirements.txt + +RUN python3.11 -m pip install --upgrade pip +RUN python3.11 -m pip install -r requirements.txt + +ADD ./speech_to_text.py speech_to_text.py + +ENTRYPOINT ["python3.11", "speech_to_text.py"] diff --git a/README.md b/README.md index e35840c..b70e63c 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,122 @@ -# SUL Speech to Text Tools +# speech-to-text -For now, this is a placeholder repo where we can ticket some things that don't yet have a natural home (and which may end up living in this repo after prototyping/implementation, e.g. a definition for the Docker container we run on cloud based GPU instances, supporting tools and docs, etc). +This repository contains a Docker configuration for performing serverless speech-to-text processing with Whisper using an Amazon Simple Storage Service (S3) bucket for media files, and Amazon Simple Queue Service (SQS) for coordinating work. + +## Build + +To build the container you will need to first download the pytorch models that Whisper uses. This is about 13GB of data and can take some time! The idea here is to bake the models into Docker image so they don't need to be fetched dynamically every time the container runs (which will add to the runtime). If you know you only need one size model, and want to just include that then edit the `whisper_models/urls.txt` file accordingly before running the `wget` command. + +```shell +wget --directory-prefix whisper_models --input-file whisper_models/urls.txt +``` + +Then you can build the image: + +```shell +docker build --tag sul-speech-to-text . +``` + +## Configure AWS + +Create two queues, one for new jobs, and one for completed jobs: + +```shell +$ aws sqs create-queue --queue-name sul-speech-to-text-todo-your-username +$ aws sqs create-queue --queue-name sul-speech-to-text-done-your-username +``` + +Create a bucket: + +```shell +aws s3 mb s3://sul-speech-to-text-dev-your-username +``` + +Configure `.env` with your AWS credentials so the Docker container can find them: + +```shell +cp env-example .env +vi .env +``` + +## Run + +### Create a Job + +Usually common-accessioning robots will initiate new speech-to-text work by: + +1. minting a new job ID: +3. copying a media file to the S3 bucket +5. putting a job in the TODO queue + +For testing you can simulate these things by running the Docker container with the `--create` flag. For example if you have a `file.mp4` file you'd like to create a job for you can: + +```shell +docker run --rm --tty --volume .:/app --env-file .env sul-speech-to-text --create file.mp4 +``` + +### Run the Job + +Now you can run the container and have it pick up the job you placed into the queue: + +```shell +docker run --rm --tty --env-file .env sul-speech-to-text --no-daemon +``` + +Wait for the results to appear: + +```shell +aws s3 ls s3://sul-speech-to-text-dev-your-username/out/${JOB_ID}/ +``` + +Usually the message on the DONE queue will be processed by the captionWF in common-accessioning, but if you want you can pop messages off manually: + +```shell +docker run --rm --tty --env-file .env sul-speech-to-text --receive +``` + +## The Job File + +The job file is a JSON object that contains information about how to run Whisper. Minimally it contains the Job ID, which will be used to locate media files in S3 that need to be processed. + +```json +{ + "id": "8EB51B59-BDFF-4507-B1AA-0DE91ACA388F", +} +``` + +You can also pass in options for Whisper: + +```json +{ + "id": "8EB51B59-BDFF-4507-B1AA-0DE91ACA388F", + "options": { + "model": "large", + "max_line_count": 80, + "beam_size": 10 + } +} +``` + +When you receive the message on the DONE SQS queue it will contain the JSON: + +```json +{ + "id": "8EB51B59-BDFF-4507-B1AA-0DE91ACA388F", + "options": { + "model": "large", + "max_line_count": 80, + "beam_size": 10 + } +} +``` + +## Testing + +To run the tests it is probably easiest to create a virtual environment and run the tests with pytest: + +```shell +python -mvenv .venv +source .venv/bin/activate +pip install -r requirements.txt +pytest +``` diff --git a/env-example b/env-example new file mode 100644 index 0000000..6e18611 --- /dev/null +++ b/env-example @@ -0,0 +1,7 @@ +AWS_ACCESS_KEY_ID=CHANGE_ME +AWS_SECRET_ACCESS_KEY=CHANGE_ME +AWS_REGION=us-west-2 +AWS_ROLE_ARN=arn:aws:iam::418214828013:role/DevelopersRole +SPEECH_TO_TEXT_S3_BUCKET=sul-speech-to-text-dev-your-username +SPEECH_TO_TEXT_TODO_SQS_QUEUE=sul-speech-to-text-todo-dev-your-username +SPEECH_TO_TEXT_DONE_SQS_QUEUE=sul-speech-to-text-done-dev-your-username diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..a635c5c --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = . diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0801cc8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +boto3 +openai-whisper +python-dotenv +pytest diff --git a/speech_to_text.py b/speech_to_text.py new file mode 100755 index 0000000..945d504 --- /dev/null +++ b/speech_to_text.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3 + +import argparse +import datetime +import json +import os +import sys +import uuid +from functools import cache +from pathlib import Path + +import boto3 +import dotenv +import torch +import whisper +from whisper.utils import get_writer + +dotenv.load_dotenv() + + +def main(daemon=True): + # loop forever looking for jobs unless daemon says not to + while True: + job = get_job() + + try: + if job is None: + print("There are no jobs waiting...") + else: + print(f"Processing job {job['id']}") + download_media(job) + run_whisper(job) + upload_results(job) + finish_job(job) + except KeyboardInterrupt: + print("Exiting...") + sys.exit() + except Exception as e: + print(f"Error while processing job {job['id']}: {e}") + report_error(job, e) + + if not daemon: + break + + +def get_job(): + """ + Fetch the next job that is queued for processing. If no job is found in 90 + seconds None will be returned. + """ + queue = get_todo_queue() + jobs = queue.receive_messages(MaxNumberOfMessages=1, WaitTimeSeconds=10) + + if len(jobs) == 1: + msg = jobs[0] + job = json.loads(msg.body) + + # The default Visibilty Timeout for a queue is 30 seconds. If we don't delete the + # message or update its Visibility Timeout in that time SQS will automatically + # requeue the message. This could lead to multiple workers working on + # the same job! + # + # While we could set the Visibility Timeout to some higher value (less than the 12 hour + # maximum), the approach taken here is to delete the message once it has been received. + # This means if processing fails for whatever reason we won't get the job automatically + # requeued. But this is a good thing since Whisper processing on a GPU is expensive. + # Instead exceptions will be caught and an error message will be sent to the Done queue. + # + # See: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html + msg.delete() + + return job + elif len(jobs) == 0: + return None + else: + # this should never happen + raise Exception(f"expected 1 job from queue but got {len(jobs)}") + + +def download_media(job): + bucket = get_bucket() + output_dir = get_output_dir(job) + for media_file in job["media"]: + key = f"media/{job['id']}/{media_file}" + bucket.download_file(key, output_dir / media_file) + + +def run_whisper(job): + # this code and writer_options() below is adapted from + # https://github.com/openai/whisper/blob/main/whisper/transcribe.py + options = job.get("options", {}).copy() + + if torch.cuda.is_available(): + device = "cuda" + else: + # setting fp16 avoids a warning when running on cpu + device = "cpu" + options["fp16"] = False + + model_name = options.get("model", "small") + model = whisper.load_model( + model_name, download_root="whisper_models", device=device + ) + + output_dir = get_output_dir(job) + + writer = get_writer(output_format="all", output_dir=output_dir) + writer_options = get_writer_options(options) + + for media_file in job["media"]: + result = whisper.transcribe( + audio=str(output_dir / media_file), model=model, **options + ) + writer(result, media_file, writer_options) + + job["finished"] = now() + + return job + + +def upload_results(job): + """ + Upload the Whisper output to S3, and put the job file there too. + """ + bucket = get_bucket() + + bucket.put_object(Key=f"out/{job['id']}.json", Body=json.dumps(job, indent=2)) + + output_dir = get_output_dir(job) + for path in output_dir.iterdir(): + # don't upload the media again + if path.name in job["media"]: + continue + + key = f"out/{job['id']}/{path.name}" + bucket.upload_file(path, key) + print(f"generated s3://{bucket.name}/{key}") + + +def finish_job(job): + queue = get_done_queue() + queue.send_message(MessageBody=json.dumps(job)) + + # clean up media on s3 + bucket = get_bucket() + for media_file in job["media"]: + obj = bucket.Object(f"media/{job['id']}/{media_file}") + obj.delete() + + +def get_writer_options(job): + word_options = [ + "highlight_words", + "max_line_count", + "max_line_width", + "max_words_per_line", + ] + + opts = {option: job.get(option) for option in word_options} + + # ensure word_timestamps is set if any of the word options are + if job.get("word_timestamps") is None: + for option in word_options: + if job.get(option) is not None: + opts["word_timestamps"] = True + + return opts + + +@cache +def get_aws(service_name): + role = os.environ.get("AWS_ROLE_ARN") + + # This would be a lot easier if boto3 read AWS_ROLE_ARN like it does other + # environment variables: + # + # see: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-api.html + + if role: + sts_client = boto3.client("sts") + response = sts_client.assume_role( + RoleArn=role, RoleSessionName="speech-to-text" + ) + cred = response["Credentials"] + aws = boto3.resource( + service_name, + aws_access_key_id=cred["AccessKeyId"], + aws_secret_access_key=cred["SecretAccessKey"], + aws_session_token=cred["SessionToken"], + ) + else: + aws = boto3.resource(service_name) + + return aws + + +@cache +def get_bucket(): + s3 = get_aws("s3") + bucket_name = os.environ.get("SPEECH_TO_TEXT_S3_BUCKET") + return s3.Bucket(bucket_name) + + +@cache +def get_todo_queue(): + return get_queue(os.environ.get("SPEECH_TO_TEXT_TODO_SQS_QUEUE")) + + +@cache +def get_done_queue(): + return get_queue(os.environ.get("SPEECH_TO_TEXT_DONE_SQS_QUEUE")) + + +@cache +def get_queue(queue_name): + sqs = get_aws("sqs") + return sqs.get_queue_by_name(QueueName=queue_name) + + +def report_error(job, exception): + """ + Add the job to the done queue with an error. + """ + job["error"] = str(exception) + queue = get_done_queue() + queue.send_message(MessageBody=json.dumps(job)) + + +def check_env(): + names = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "SPEECH_TO_TEXT_S3_BUCKET"] + for name in names: + if os.environ.get(name) is None: + sys.exit(f"{name} is not defined in the environment") + + +def now(): + return datetime.datetime.now(datetime.UTC).isoformat() + + +def get_output_dir(job): + path = Path(job["id"]) + if not path.is_dir(): + path.mkdir() + + return path + + +def create_job(media_path: Path): + """ + Create a job for a given media file by placing the media file in S3 and + sending a message to the TODO queue. This is really just for testing since + the jobs are created by common-accessioning robot during captionWF. + """ + job_id = str(uuid.uuid4()) + add_media(media_path, job_id) + + job = {"id": job_id, "media": [media_path.name]} + add_job(job) + + return job_id + + +def add_media(media_path, job_id): + """ + Upload a media file to the bucket, and return the filename. + """ + path = Path(media_path) + key = f"media/{job_id}/{path.name}" + + bucket = get_bucket() + bucket.upload_file(media_path, key) + + return path.name + + +def add_job(job): + """ + Create a job JSON file in the S3 bucket. + """ + queue = get_todo_queue() + queue.send_message(MessageBody=json.dumps(job)) + + return + + +def receive_job(): + """ + Receives jobs on the DONE queue. + """ + queue = get_done_queue() + messages = queue.receive_messages(MaxNumberOfMessages=1) + if len(messages) == 1: + msg = messages[0] + msg.delete() + return json.loads(msg.body) + elif len(messages) == 0: + return None + else: + raise Exception("received more than one message from todo queue!") + + +if __name__ == "__main__": + check_env() + + parser = argparse.ArgumentParser(prog="speech_to_text") + parser.add_argument("-c", "--create", help="Create a job for a given media file") + parser.add_argument("-r", "--receive", action="store_true", help="Retrieve a job from the done queue.") + parser.add_argument( + "-d", + "--daemon", + action="store_true", + default=True, + dest="daemon", + help="Run forever looking for jobs", + ) + parser.add_argument( + "-n", + "--no-daemon", + action="store_false", + default=False, + dest="daemon", + help="Run one job and exit.", + ) + args = parser.parse_args() + + if args.create: + media_path = Path(args.create) + if not media_path.is_file(): + print(f"No such file {media_path}") + + job_id = create_job(media_path) + print(f"Created job {job_id}") + + elif args.receive: + job = receive_job() + if job is not None: + print(json.dumps(job, indent=2)) + else: + print("No messages were found in the todo queue...") + + else: + main(daemon=args.daemon) diff --git a/tests/data/en.vtt b/tests/data/en.vtt new file mode 100644 index 0000000..bcda098 --- /dev/null +++ b/tests/data/en.vtt @@ -0,0 +1,5 @@ +WEBVTT + +00:00.000 --> 00:03.040 +This is a test for whisper reading in English. + diff --git a/tests/data/en.wav b/tests/data/en.wav new file mode 100644 index 0000000..db725dd Binary files /dev/null and b/tests/data/en.wav differ diff --git a/tests/test_speech_to_text.py b/tests/test_speech_to_text.py new file mode 100644 index 0000000..bdcb83e --- /dev/null +++ b/tests/test_speech_to_text.py @@ -0,0 +1,58 @@ +import json +from pathlib import Path + +import dotenv +import pytest +import speech_to_text + +# set AWS_PROFILE from .env in the environment +dotenv.load_dotenv() + + +# ignore utcnow warning until https://github.com/boto/boto3/issues/3889 is resolved +@pytest.mark.filterwarnings("ignore:datetime.datetime.utcnow") +def test_speech_to_text(): + clean() + + job_id = speech_to_text.create_job(Path("tests/data/en.wav")) + + speech_to_text.main(daemon=False) + + # check that the vtt file is present + test_bucket = speech_to_text.get_bucket() + vtt_file = test_bucket.Object(f"out/{job_id}/en.vtt").get() + assert vtt_file, "vtt file created" + assert "This is a test" in vtt_file["Body"].read().decode("utf-8"), "vtt content" + + # make sure there's a message in the "done" queue + queue = speech_to_text.get_done_queue() + msgs = queue.receive_messages(MaxNumberOfMessages=1, WaitTimeSeconds=10) + assert len(msgs) == 1, "found a done message" + assert msgs[0].delete(), "delete the message from the queue" + + # make sure the job looks ok + job = json.loads(msgs[0].body) + assert job["id"] == job_id + assert job["finished"] + + jobs = queue.receive_messages(MaxNumberOfMessages=1) + assert len(jobs) == 0, "queue empty" + + +def clean(): + """ + Ensure that the bucket and queues are empty. + """ + todo = speech_to_text.get_todo_queue() + while messages := todo.receive_messages(): + for m in messages: + m.delete() + + done = speech_to_text.get_done_queue() + while messages := done.receive_messages(): + for m in messages: + m.delete() + + bucket = speech_to_text.get_bucket() + for obj in bucket.objects.all(): + obj.delete() diff --git a/whisper_models/urls.txt b/whisper_models/urls.txt new file mode 100644 index 0000000..d7e90af --- /dev/null +++ b/whisper_models/urls.txt @@ -0,0 +1,12 @@ +# these are pulled from https://openaipublic.azureedge.net/main/whisper/models/d3dd57d32accea0b295c96e26691aa14d8822fac7d9d27d5dc00b4ca2826dd03/tiny.en.pt +https://openaipublic.azureedge.net/main/whisper/models/65147644a518d12f04e32d6f3b26facc3f8dd46e5390956a9424a650c0ce22b9/tiny.pt +https://openaipublic.azureedge.net/main/whisper/models/25a8566e1d0c1e2231d1c762132cd20e0f96a85d16145c3a00adf5d1ac670ead/base.en.pt +https://openaipublic.azureedge.net/main/whisper/models/ed3a0b6b1c0edf879ad9b11b1af5a0e6ab5db9205f891f668f8b0e6c6326e34e/base.pt +https://openaipublic.azureedge.net/main/whisper/models/f953ad0fd29cacd07d5a9eda5624af0f6bcf2258be67c92b79389873d91e0872/small.en.pt +https://openaipublic.azureedge.net/main/whisper/models/9ecf779972d90ba49c06d968637d720dd632c55bbf19d441fb42bf17a411e794/small.pt +https://openaipublic.azureedge.net/main/whisper/models/d7440d1dc186f76616474e0ff0b3b6b879abc9d1a4926b7adfa41db2d497ab4f/medium.en.pt +https://openaipublic.azureedge.net/main/whisper/models/345ae4da62f9b3d59415adc60127b97c714f32e89e936602e85993674d08dcb1/medium.pt +https://openaipublic.azureedge.net/main/whisper/models/e4b87e7e0bf463eb8e6956e646f1e277e901512310def2c24bf0e11bd3c28e9a/large-v1.pt +https://openaipublic.azureedge.net/main/whisper/models/81f7c96c852ee8fc832187b0132e569d6c3065a3252ed18e56effd0b6a73e524/large-v2.pt +https://openaipublic.azureedge.net/main/whisper/models/e5b1a55b89c1367dacf97e3e19bfd829a01529dbfdeefa8caeb59b3f1b81dadb/large-v3.pt +https://openaipublic.azureedge.net/main/whisper/models/e5b1a55b89c1367dacf97e3e19bfd829a01529dbfdeefa8caeb59b3f1b81dadb/large-v3.pt