diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 7fba6b6b4cf1..1da909a4a784 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -23,6 +23,7 @@ jobs: matrix: python-version: - "3.11" + - "3.12" django-version: - "pinned" # When updating the shards, remember to make the same changes in diff --git a/cms/djangoapps/contentstore/video_storage_handlers.py b/cms/djangoapps/contentstore/video_storage_handlers.py index 87086c9951ac..d90dab5dc9e1 100644 --- a/cms/djangoapps/contentstore/video_storage_handlers.py +++ b/cms/djangoapps/contentstore/video_storage_handlers.py @@ -13,12 +13,11 @@ import shutil import pathlib import zipfile +import boto3 from contextlib import closing from datetime import datetime, timedelta from uuid import uuid4 -from boto.s3.connection import S3Connection -from boto import s3 from django.conf import settings from django.contrib.staticfiles.storage import staticfiles_storage from django.http import FileResponse, HttpResponseNotFound, StreamingHttpResponse @@ -826,7 +825,7 @@ def videos_post(course, request): return {'error': error_msg}, 400 edx_video_id = str(uuid4()) - key = storage_service_key(bucket, file_name=edx_video_id) + key_name = storage_service_key(bucket, file_name=edx_video_id) metadata_list = [ ('client_video_id', file_name), @@ -846,12 +845,20 @@ def videos_post(course, request): if transcript_preferences is not None: metadata_list.append(('transcript_preferences', json.dumps(transcript_preferences))) - for metadata_name, value in metadata_list: - key.set_metadata(metadata_name, value) - upload_url = key.generate_url( - KEY_EXPIRATION_IN_SECONDS, - 'PUT', - headers={'Content-Type': req_file['content_type']} + # Prepare metadata for presigned URL + metadata = dict(metadata_list) + + # Generate presigned URL using boto3 + s3_client = bucket.meta.client + upload_url = s3_client.generate_presigned_url( + 'put_object', + Params={ + 'Bucket': bucket.name, + 'Key': key_name, + 'ContentType': req_file['content_type'], + 'Metadata': metadata + }, + ExpiresIn=KEY_EXPIRATION_IN_SECONDS ) # persist edx_video_id in VAL @@ -886,24 +893,19 @@ def storage_service_bucket(): 'aws_secret_access_key': settings.AWS_SECRET_ACCESS_KEY } - conn = S3Connection(**params) + conn = boto3.resource("s3", **params) - # We don't need to validate our bucket, it requires a very permissive IAM permission - # set since behind the scenes it fires a HEAD request that is equivalent to get_all_keys() - # meaning it would need ListObjects on the whole bucket, not just the path used in each - # environment (since we share a single bucket for multiple deployments in some configurations) - return conn.get_bucket(settings.VIDEO_UPLOAD_PIPELINE['VEM_S3_BUCKET'], validate=False) + return conn.Bucket(settings.VIDEO_UPLOAD_PIPELINE['VEM_S3_BUCKET']) def storage_service_key(bucket, file_name): """ - Returns an S3 key to the given file in the given bucket. + Returns an S3 key name for the given file in the given bucket. """ - key_name = "{}/{}".format( + return "{}/{}".format( settings.VIDEO_UPLOAD_PIPELINE.get("ROOT_PATH", ""), file_name ) - return s3.key.Key(bucket, key_name) def send_video_status_update(updates): diff --git a/cms/djangoapps/contentstore/views/tests/test_videos.py b/cms/djangoapps/contentstore/views/tests/test_videos.py index 2d17229f59c1..de89fa2a52e0 100644 --- a/cms/djangoapps/contentstore/views/tests/test_videos.py +++ b/cms/djangoapps/contentstore/views/tests/test_videos.py @@ -1,1746 +1,1793 @@ -""" -Unit tests for video-related REST APIs. -""" - - -import csv -import json -import re -from contextlib import contextmanager -from datetime import datetime -from io import StringIO -from unittest.mock import Mock, patch - -import dateutil.parser -from common.djangoapps.student.tests.factories import UserFactory -import ddt -import pytz -from django.test import TestCase -from django.conf import settings -from django.test.utils import override_settings -from django.urls import reverse -from edx_toggles.toggles.testutils import override_waffle_flag, override_waffle_switch -from edxval.api import ( - create_or_update_transcript_preferences, - create_or_update_video_transcript, - create_profile, - create_video, - get_course_video_image_url, - get_transcript_preferences, - get_video_info -) -from cms.djangoapps.contentstore.models import VideoUploadConfig -from cms.djangoapps.contentstore.tests.utils import CourseTestCase -from cms.djangoapps.contentstore.utils import reverse_course_url -from openedx.core.djangoapps.profile_images.tests.helpers import make_image_file -from openedx.core.djangoapps.video_pipeline.config.waffle import ( - DEPRECATE_YOUTUBE, - ENABLE_DEVSTACK_VIDEO_UPLOADS, -) -from openedx.core.djangoapps.waffle_utils.models import WaffleFlagCourseOverrideModel -from xmodule.modulestore.django import modulestore -from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase -from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order - -from ..videos import ( - ENABLE_VIDEO_UPLOAD_PAGINATION, - KEY_EXPIRATION_IN_SECONDS, - VIDEO_IMAGE_UPLOAD_ENABLED, -) -from cms.djangoapps.contentstore.video_storage_handlers import ( - _get_default_video_image_url, - TranscriptProvider, - StatusDisplayStrings, - convert_video_status, - storage_service_bucket, - storage_service_key, - PUBLIC_VIDEO_SHARE -) - - -class VideoUploadTestBase: - """ - Test cases for the video upload feature - """ - - def get_url_for_course_key(self, course_key, kwargs=None): - """Return video handler URL for the given course""" - return reverse_course_url(self.VIEW_NAME, course_key, kwargs) # lint-amnesty, pylint: disable=no-member - - def setUp(self): - super().setUp() # lint-amnesty, pylint: disable=no-member - self.url = self.get_url_for_course_key(self.course.id) - self.test_token = "test_token" - self.course.video_upload_pipeline = { - "course_video_upload_token": self.test_token, - } - self.save_course() # lint-amnesty, pylint: disable=no-member - - # create another course for videos belonging to multiple courses - self.course2 = CourseFactory.create() - self.course2.video_upload_pipeline = { - "course_video_upload_token": self.test_token, - } - self.course2.save() - self.store.update_item(self.course2, self.user.id) # lint-amnesty, pylint: disable=no-member - - # course ids for videos - course_ids = [str(self.course.id), str(self.course2.id)] - created = datetime.now(pytz.utc) - - self.profiles = ["profile1", "profile2"] - self.previous_uploads = [ - { - "edx_video_id": "test1", - "client_video_id": "test1.mp4", - "duration": 42.0, - "status": "upload", - "courses": course_ids, - "encoded_videos": [], - "created": created - }, - { - "edx_video_id": "test2", - "client_video_id": "test2.mp4", - "duration": 128.0, - "status": "file_complete", - "courses": course_ids, - "created": created, - "encoded_videos": [ - { - "profile": "profile1", - "url": "http://example.com/profile1/test2.mp4", - "file_size": 1600, - "bitrate": 100, - }, - { - "profile": "profile2", - "url": "http://example.com/profile2/test2.mov", - "file_size": 16000, - "bitrate": 1000, - }, - ], - }, - { - "edx_video_id": "non-ascii", - "client_video_id": "nón-ascii-näme.mp4", - "duration": 256.0, - "status": "transcode_active", - "courses": course_ids, - "created": created, - "encoded_videos": [ - { - "profile": "profile1", - "url": "http://example.com/profile1/nón-ascii-näme.mp4", - "file_size": 3200, - "bitrate": 100, - }, - ] - }, - ] - # Ensure every status string is tested - self.previous_uploads += [ - { - "edx_video_id": f"status_test_{status}", - "client_video_id": "status_test.mp4", - "duration": 3.14, - "status": status, - "courses": course_ids, - "created": created, - "encoded_videos": [], - } - for status in ( - list(StatusDisplayStrings._STATUS_MAP.keys()) + # pylint:disable=protected-access - ["non_existent_status"] - ) - ] - for profile in self.profiles: - create_profile(profile) - for video in self.previous_uploads: - create_video(video) - - def _get_previous_upload(self, edx_video_id): - """Returns the previous upload with the given video id.""" - return next( - video - for video in self.previous_uploads - if video["edx_video_id"] == edx_video_id - ) - - -class VideoStudioAccessTestsMixin: - """ - Base Access tests for studio video views - """ - def test_anon_user(self): - self.client.logout() - response = self.client.get(self.url) - self.assertEqual(response.status_code, 302) - - def test_put(self): - response = self.client.put(self.url) - self.assertEqual(response.status_code, 405) - - def test_invalid_course_key(self): - response = self.client.get( - self.get_url_for_course_key("Non/Existent/Course") - ) - self.assertEqual(response.status_code, 404) - - def test_non_staff_user(self): - client, __ = self.create_non_staff_authed_user_client() - response = client.get(self.url) - self.assertEqual(response.status_code, 403) - - -class VideoPipelineStudioAccessTestsMixin: - """ - Access tests for video views that rely on the video pipeline - """ - def test_video_pipeline_not_enabled(self): - settings.FEATURES["ENABLE_VIDEO_UPLOAD_PIPELINE"] = False - self.assertEqual(self.client.get(self.url).status_code, 404) - - def test_video_pipeline_not_configured(self): - settings.VIDEO_UPLOAD_PIPELINE = None - self.assertEqual(self.client.get(self.url).status_code, 404) - - def test_course_not_configured(self): - self.course.video_upload_pipeline = {} - self.save_course() - self.assertEqual(self.client.get(self.url).status_code, 404) - - -class VideoUploadPostTestsMixin: - """ - Shared test cases for video post tests. - """ - @override_settings(AWS_ACCESS_KEY_ID='test_key_id', AWS_SECRET_ACCESS_KEY='test_secret') - @patch('boto.s3.key.Key') - @patch('cms.djangoapps.contentstore.video_storage_handlers.S3Connection') - def test_post_success(self, mock_conn, mock_key): - files = [ - { - 'file_name': 'first.mp4', - 'content_type': 'video/mp4', - }, - { - 'file_name': 'second.mp4', - 'content_type': 'video/mp4', - }, - { - 'file_name': 'third.mov', - 'content_type': 'video/quicktime', - }, - { - 'file_name': 'fourth.mp4', - 'content_type': 'video/mp4', - }, - ] - - bucket = Mock() - mock_conn.return_value = Mock(get_bucket=Mock(return_value=bucket)) - mock_key_instances = [ - Mock( - generate_url=Mock( - return_value='http://example.com/url_{}'.format(file_info['file_name']) - ) - ) - for file_info in files - ] - # If extra calls are made, return a dummy - mock_key.side_effect = mock_key_instances + [Mock()] - - response = self.client.post( - self.url, - json.dumps({'files': files}), - content_type='application/json' - ) - self.assertEqual(response.status_code, 200) - response_obj = json.loads(response.content.decode('utf-8')) - - mock_conn.assert_called_once_with( - aws_access_key_id=settings.AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY - ) - self.assertEqual(len(response_obj['files']), len(files)) - self.assertEqual(mock_key.call_count, len(files)) - for i, file_info in enumerate(files): - # Ensure Key was set up correctly and extract id - key_call_args, __ = mock_key.call_args_list[i] - self.assertEqual(key_call_args[0], bucket) - path_match = re.match( - ( - settings.VIDEO_UPLOAD_PIPELINE['ROOT_PATH'] + - '/([a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12})$' - ), - key_call_args[1] - ) - self.assertIsNotNone(path_match) - video_id = path_match.group(1) - mock_key_instance = mock_key_instances[i] - - mock_key_instance.set_metadata.assert_any_call( - 'course_video_upload_token', - self.test_token - ) - - mock_key_instance.set_metadata.assert_any_call( - 'client_video_id', - file_info['file_name'] - ) - mock_key_instance.set_metadata.assert_any_call('course_key', str(self.course.id)) - mock_key_instance.generate_url.assert_called_once_with( - KEY_EXPIRATION_IN_SECONDS, - 'PUT', - headers={'Content-Type': file_info['content_type']} - ) - - # Ensure VAL was updated - val_info = get_video_info(video_id) - self.assertEqual(val_info['status'], 'upload') - self.assertEqual(val_info['client_video_id'], file_info['file_name']) - self.assertEqual(val_info['status'], 'upload') - self.assertEqual(val_info['duration'], 0) - self.assertEqual(val_info['courses'], [{str(self.course.id): None}]) - - # Ensure response is correct - response_file = response_obj['files'][i] - self.assertEqual(response_file['file_name'], file_info['file_name']) - self.assertEqual(response_file['upload_url'], mock_key_instance.generate_url()) - - def test_post_non_json(self): - response = self.client.post(self.url, {"files": []}) - self.assertEqual(response.status_code, 400) - - def test_post_malformed_json(self): - response = self.client.post(self.url, "{", content_type="application/json") - self.assertEqual(response.status_code, 400) - - def test_post_invalid_json(self): - def assert_bad(content): - """Make request with content and assert that response is 400""" - response = self.client.post( - self.url, - json.dumps(content), - content_type="application/json" - ) - self.assertEqual(response.status_code, 400) - - # Top level missing files key - assert_bad({}) - - # Entry missing file_name - assert_bad({"files": [{"content_type": "video/mp4"}]}) - - # Entry missing content_type - assert_bad({"files": [{"file_name": "test.mp4"}]}) - - -@ddt.ddt -@patch.dict("django.conf.settings.FEATURES", {"ENABLE_VIDEO_UPLOAD_PIPELINE": True}) -@override_settings(VIDEO_UPLOAD_PIPELINE={ - "VEM_S3_BUCKET": "vem_test_bucket", "BUCKET": "test_bucket", "ROOT_PATH": "test_root" -}) -class VideosHandlerTestCase( - VideoUploadTestBase, - VideoStudioAccessTestsMixin, - VideoPipelineStudioAccessTestsMixin, - VideoUploadPostTestsMixin, - CourseTestCase -): - """Test cases for the main video upload endpoint""" - - VIEW_NAME = 'videos_handler' - - def test_get_json(self): - response = self.client.get_json(self.url) - self.assertEqual(response.status_code, 200) - response_videos = json.loads(response.content.decode('utf-8'))['videos'] - self.assertEqual(len(response_videos), len(self.previous_uploads)) - for i, response_video in enumerate(response_videos): - # Videos should be returned by creation date descending - original_video = self.previous_uploads[-(i + 1)] - print(response_video.keys()) - self.assertEqual( - set(response_video.keys()), - { - 'edx_video_id', - 'client_video_id', - 'created', - 'duration', - 'status', - 'status_nontranslated', - 'course_video_image_url', - 'file_size', - 'download_link', - 'transcripts', - 'transcription_status', - 'transcript_urls', - 'error_description', - } - ) - dateutil.parser.parse(response_video['created']) - for field in ['edx_video_id', 'client_video_id', 'duration']: - self.assertEqual(response_video[field], original_video[field]) - self.assertEqual( - response_video['status'], - convert_video_status(original_video) - ) - - @ddt.data( - ( - [ - 'edx_video_id', 'client_video_id', 'created', 'duration', - 'status', 'status_nontranslated', 'course_video_image_url', 'file_size', - 'download_link', 'transcripts', 'transcription_status', 'transcript_urls', - 'error_description' - ], - [ - { - 'video_id': 'test1', - 'language_code': 'en', - 'file_name': 'edx101.srt', - 'file_format': 'srt', - 'provider': 'Cielo24' - } - ], - ['en'] - ), - ( - [ - 'edx_video_id', 'client_video_id', 'created', 'duration', - 'status', 'status_nontranslated', 'course_video_image_url', 'file_size', - 'download_link', 'transcripts', 'transcription_status', 'transcript_urls', - 'error_description' - ], - [ - { - 'video_id': 'test1', - 'language_code': 'en', - 'file_name': 'edx101_en.srt', - 'file_format': 'srt', - 'provider': 'Cielo24' - }, - { - 'video_id': 'test1', - 'language_code': 'es', - 'file_name': 'edx101_es.srt', - 'file_format': 'srt', - 'provider': 'Cielo24' - } - ], - ['en', 'es'] - ) - ) - @ddt.unpack - def test_get_json_transcripts(self, expected_video_keys, uploaded_transcripts, expected_transcripts): - """ - Test that transcripts are attached based on whether the video transcript feature is enabled. - """ - for transcript in uploaded_transcripts: - create_or_update_video_transcript( - transcript['video_id'], - transcript['language_code'], - metadata={ - 'file_name': transcript['file_name'], - 'file_format': transcript['file_format'], - 'provider': transcript['provider'] - } - ) - - response = self.client.get_json(self.url) - self.assertEqual(response.status_code, 200) - response_videos = json.loads(response.content.decode('utf-8'))['videos'] - self.assertEqual(len(response_videos), len(self.previous_uploads)) - for response_video in response_videos: - print(response_video) - - self.assertEqual(set(response_video.keys()), set(expected_video_keys)) - if response_video['edx_video_id'] == self.previous_uploads[0]['edx_video_id']: - self.assertEqual(response_video.get('transcripts', []), expected_transcripts) - - def test_get_html(self): - response = self.client.get(self.url) - self.assertEqual(response.status_code, 200) - self.assertRegex(response["Content-Type"], "^text/html(;.*)?$") - self.assertContains(response, _get_default_video_image_url()) - # Crude check for presence of data in returned HTML - for video in self.previous_uploads: - self.assertContains(response, video["edx_video_id"]) - self.assertNotContains(response, 'video_upload_pagination') - - @override_waffle_flag(ENABLE_VIDEO_UPLOAD_PAGINATION, active=True) - def test_get_html_paginated(self): - """ - Tests that response is paginated. - """ - response = self.client.get(self.url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, 'video_upload_pagination') - - @override_settings(AWS_ACCESS_KEY_ID="test_key_id", AWS_SECRET_ACCESS_KEY="test_secret") - @patch("boto.s3.key.Key") - @patch("cms.djangoapps.contentstore.video_storage_handlers.S3Connection") - @ddt.data( - ( - [ - { - "file_name": "supported-1.mp4", - "content_type": "video/mp4", - }, - { - "file_name": "supported-2.mov", - "content_type": "video/quicktime", - }, - ], - 200 - ), - ( - [ - { - "file_name": "unsupported-1.txt", - "content_type": "text/plain", - }, - { - "file_name": "unsupported-2.png", - "content_type": "image/png", - }, - ], - 400 - ) - ) - @ddt.unpack - def test_video_supported_file_formats(self, files, expected_status, mock_conn, mock_key): - """ - Test that video upload works correctly against supported and unsupported file formats. - """ - mock_conn.get_bucket = Mock() - mock_key_instances = [ - Mock( - generate_url=Mock( - return_value="http://example.com/url_{}".format(file_info["file_name"]) - ) - ) - for file_info in files - ] - # If extra calls are made, return a dummy - mock_key.side_effect = mock_key_instances + [Mock()] - - # Check supported formats - response = self.client.post( - self.url, - json.dumps({"files": files}), - content_type="application/json" - ) - self.assertEqual(response.status_code, expected_status) - response = json.loads(response.content.decode('utf-8')) - - if expected_status == 200: - self.assertNotIn('error', response) - else: - self.assertIn('error', response) - self.assertEqual(response['error'], "Request 'files' entry contain unsupported content_type") - - @override_settings(AWS_ACCESS_KEY_ID='test_key_id', AWS_SECRET_ACCESS_KEY='test_secret') - @patch('cms.djangoapps.contentstore.video_storage_handlers.S3Connection') - def test_upload_with_non_ascii_charaters(self, mock_conn): - """ - Test that video uploads throws error message when file name contains special characters. - """ - mock_conn.get_bucket = Mock() - file_name = 'test\u2019_file.mp4' - files = [{'file_name': file_name, 'content_type': 'video/mp4'}] - - bucket = Mock() - mock_conn.return_value = Mock(get_bucket=Mock(return_value=bucket)) - - response = self.client.post( - self.url, - json.dumps({'files': files}), - content_type='application/json' - ) - self.assertEqual(response.status_code, 400) - response = json.loads(response.content.decode('utf-8')) - self.assertEqual(response['error'], 'The file name for %s must contain only ASCII characters.' % file_name) - - @override_settings(AWS_ACCESS_KEY_ID='test_key_id', AWS_SECRET_ACCESS_KEY='test_secret', AWS_SECURITY_TOKEN='token') - @patch('boto.s3.key.Key') - @patch('cms.djangoapps.contentstore.video_storage_handlers.S3Connection') - @override_waffle_flag(ENABLE_DEVSTACK_VIDEO_UPLOADS, active=True) - def test_devstack_upload_connection(self, mock_conn, mock_key): - files = [{'file_name': 'first.mp4', 'content_type': 'video/mp4'}] - mock_conn.get_bucket = Mock() - mock_key_instances = [ - Mock( - generate_url=Mock( - return_value='http://example.com/url_{}'.format(file_info['file_name']) - ) - ) - for file_info in files - ] - mock_key.side_effect = mock_key_instances - response = self.client.post( - self.url, - json.dumps({'files': files}), - content_type='application/json' - ) - - self.assertEqual(response.status_code, 200) - mock_conn.assert_called_once_with( - aws_access_key_id=settings.AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, - security_token=settings.AWS_SECURITY_TOKEN - ) - - @patch('boto.s3.key.Key') - @patch('cms.djangoapps.contentstore.video_storage_handlers.S3Connection') - def test_send_course_to_vem_pipeline(self, mock_conn, mock_key): - """ - Test that uploads always go to VEM S3 bucket by default. - """ - mock_conn.get_bucket = Mock() - files = [{'file_name': 'first.mp4', 'content_type': 'video/mp4'}] - mock_key_instances = [ - Mock( - generate_url=Mock( - return_value='http://example.com/url_{}'.format(file_info['file_name']) - ) - ) - for file_info in files - ] - mock_key.side_effect = mock_key_instances - - response = self.client.post( - self.url, - json.dumps({'files': files}), - content_type='application/json' - ) - - self.assertEqual(response.status_code, 200) - mock_conn.return_value.get_bucket.assert_called_once_with( - settings.VIDEO_UPLOAD_PIPELINE['VEM_S3_BUCKET'], validate=False # pylint: disable=unsubscriptable-object - ) - - @override_settings(AWS_ACCESS_KEY_ID='test_key_id', AWS_SECRET_ACCESS_KEY='test_secret') - @patch('boto.s3.key.Key') - @patch('cms.djangoapps.contentstore.video_storage_handlers.S3Connection') - @ddt.data( - { - 'global_waffle': True, - 'course_override': WaffleFlagCourseOverrideModel.ALL_CHOICES.off, - 'expect_token': True - }, - { - 'global_waffle': False, - 'course_override': WaffleFlagCourseOverrideModel.ALL_CHOICES.on, - 'expect_token': False - }, - { - 'global_waffle': False, - 'course_override': WaffleFlagCourseOverrideModel.ALL_CHOICES.off, - 'expect_token': True - } - ) - def test_video_upload_token_in_meta(self, data, mock_conn, mock_key): - """ - Test video upload token in s3 metadata. - """ - @contextmanager - def proxy_manager(manager, ignore_manager): - """ - This acts as proxy to the original manager in the arguments given - the original manager is not set to be ignored. - """ - if ignore_manager: - yield - else: - with manager: - yield - - file_data = { - 'file_name': 'first.mp4', - 'content_type': 'video/mp4', - } - mock_conn.get_bucket = Mock() - mock_key_instance = Mock( - generate_url=Mock( - return_value='http://example.com/url_{}'.format(file_data['file_name']) - ) - ) - # If extra calls are made, return a dummy - mock_key.side_effect = [mock_key_instance] - - # expected args to be passed to `set_metadata`. - expected_args = ('course_video_upload_token', self.test_token) - - with patch.object(WaffleFlagCourseOverrideModel, 'override_value', return_value=data['course_override']): - with override_waffle_flag(DEPRECATE_YOUTUBE, active=data['global_waffle']): - response = self.client.post( - self.url, - json.dumps({'files': [file_data]}), - content_type='application/json' - ) - self.assertEqual(response.status_code, 200) - - with proxy_manager(self.assertRaises(AssertionError), data['expect_token']): - # if we're not expecting token then following should raise assertion error and - # if we're expecting token then we will be able to find the call to set the token - # in s3 metadata. - mock_key_instance.set_metadata.assert_any_call(*expected_args) - - def _assert_video_removal(self, url, edx_video_id, deleted_videos): - """ - Verify that if correct video is removed from a particular course. - - Arguments: - url (str): URL to get uploaded videos - edx_video_id (str): video id - deleted_videos (int): how many videos are deleted - """ - response = self.client.get_json(url) - self.assertEqual(response.status_code, 200) - response_videos = json.loads(response.content.decode('utf-8'))["videos"] - self.assertEqual(len(response_videos), len(self.previous_uploads) - deleted_videos) - - if deleted_videos: - self.assertNotIn(edx_video_id, [video.get('edx_video_id') for video in response_videos]) - else: - self.assertIn(edx_video_id, [video.get('edx_video_id') for video in response_videos]) - - def test_video_removal(self): - """ - Verifies that video removal is working as expected. - """ - edx_video_id = 'test1' - remove_url = self.get_url_for_course_key(self.course.id, {'edx_video_id': edx_video_id}) - response = self.client.delete(remove_url, HTTP_ACCEPT="application/json") - self.assertEqual(response.status_code, 204) - - self._assert_video_removal(self.url, edx_video_id, 1) - - def test_video_removal_multiple_courses(self): - """ - Verifies that video removal is working as expected for multiple courses. - - If a video is used by multiple courses then removal from one course shouldn't effect the other course. - """ - # remove video from course1 - edx_video_id = 'test1' - remove_url = self.get_url_for_course_key(self.course.id, {'edx_video_id': edx_video_id}) - response = self.client.delete(remove_url, HTTP_ACCEPT="application/json") - self.assertEqual(response.status_code, 204) - - # verify that video is only deleted from course1 only - self._assert_video_removal(self.url, edx_video_id, 1) - self._assert_video_removal(self.get_url_for_course_key(self.course2.id), edx_video_id, 0) - - def test_convert_video_status(self): - """ - Verifies that convert_video_status works as expected. - """ - video = self.previous_uploads[0] - - # video status should be failed if it's in upload state for more than 24 hours - video['created'] = datetime(2016, 1, 1, 10, 10, 10, 0, pytz.UTC) - status = convert_video_status(video) - self.assertEqual(status, StatusDisplayStrings.get('upload_failed')) - - # `invalid_token` should be converted to `youtube_duplicate` - video['created'] = datetime.now(pytz.UTC) - video['status'] = 'invalid_token' - status = convert_video_status(video) - self.assertEqual(status, StatusDisplayStrings.get('youtube_duplicate')) - - # The "encode status" should be converted to `file_complete` if video encodes are complete - video['status'] = 'transcription_in_progress' - status = convert_video_status(video, is_video_encodes_ready=True) - self.assertEqual(status, StatusDisplayStrings.get('file_complete')) - - # If encoding is not complete return the status as it is - video['status'] = 's3_upload_failed' - status = convert_video_status(video) - self.assertEqual(status, StatusDisplayStrings.get('s3_upload_failed')) - - # for all other status, there should not be any conversion - statuses = list(StatusDisplayStrings._STATUS_MAP.keys()) # pylint: disable=protected-access - statuses.remove('invalid_token') - for status in statuses: - video['status'] = status - new_status = convert_video_status(video) - self.assertEqual(new_status, StatusDisplayStrings.get(status)) - - def assert_video_status(self, url, edx_video_id, status): - """ - Verifies that video with `edx_video_id` has `status` - """ - response = self.client.get_json(url) - self.assertEqual(response.status_code, 200) - videos = json.loads(response.content.decode('utf-8'))["videos"] - for video in videos: - if video['edx_video_id'] == edx_video_id: - return self.assertEqual(video['status'], status) - - # Test should fail if video not found - self.assertEqual(True, False, 'Invalid edx_video_id') - - @patch('cms.djangoapps.contentstore.video_storage_handlers.LOGGER') - def test_video_status_update_request(self, mock_logger): - """ - Verifies that video status update request works as expected. - """ - url = self.get_url_for_course_key(self.course.id) - edx_video_id = 'test1' - self.assert_video_status(url, edx_video_id, 'Uploading') - - response = self.client.post( - url, - json.dumps([{ - 'edxVideoId': edx_video_id, - 'status': 'upload_failed', - 'message': 'server down' - }]), - content_type="application/json" - ) - - mock_logger.info.assert_called_with( - 'VIDEOS: Video status update with id [%s], status [%s] and message [%s]', - edx_video_id, - 'upload_failed', - 'server down' - ) - - self.assertEqual(response.status_code, 204) - - self.assert_video_status(url, edx_video_id, 'Failed') - - @ddt.data( - ('test_video_token', "Transcription in Progress"), - ('', "Ready"), - ) - @ddt.unpack - def test_video_transcript_status_conversion(self, course_video_upload_token, expected_video_status_text): - """ - Verifies that video status `transcription_in_progress` gets converted - correctly into the `file_complete` for the new video workflow and - stays as it is, for the old video workflow. - """ - self.course.video_upload_pipeline = { - 'course_video_upload_token': course_video_upload_token - } - self.save_course() - - url = self.get_url_for_course_key(self.course.id) - edx_video_id = 'test1' - self.assert_video_status(url, edx_video_id, 'Uploading') - - response = self.client.post( - url, - json.dumps([{ - 'edxVideoId': edx_video_id, - 'status': 'transcription_in_progress', - 'message': 'Transcription is in progress' - }]), - content_type="application/json" - ) - self.assertEqual(response.status_code, 204) - - self.assert_video_status(url, edx_video_id, expected_video_status_text) - - @ddt.data(True, False) - @patch('openedx.core.djangoapps.video_config.models.VideoTranscriptEnabledFlag.feature_enabled') - def test_video_index_transcript_feature_enablement(self, is_video_transcript_enabled, video_transcript_feature): - """ - Test that when video transcript is enabled/disabled, correct response is rendered. - """ - video_transcript_feature.return_value = is_video_transcript_enabled - response = self.client.get(self.url) - self.assertEqual(response.status_code, 200) - - # Verify that course video button is present in the response if videos transcript feature is enabled. - button_html = '