diff --git a/cms/djangoapps/import_from_modulestore/api.py b/cms/djangoapps/import_from_modulestore/api.py
new file mode 100644
index 000000000000..7f8dc16b76cb
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/api.py
@@ -0,0 +1,45 @@
+"""
+API for course to library import.
+"""
+from typing import Sequence
+
+from opaque_keys.edx.keys import LearningContextKey, UsageKey
+
+from .helpers import cancel_incomplete_old_imports
+from .models import Import as _Import
+from .tasks import import_staged_content_to_library_task, save_legacy_content_to_staged_content_task
+from .validators import validate_usage_keys_to_import
+
+
+def stage_content_for_import(source_key: LearningContextKey, user_id: int) -> _Import:
+ """
+ Create a new import event to import a course to a library and save course to staged content.
+ """
+ import_from_modulestore = _Import.objects.create(source_key=source_key, user_id=user_id)
+ cancel_incomplete_old_imports(import_from_modulestore)
+ save_legacy_content_to_staged_content_task.delay_on_commit(import_from_modulestore.uuid)
+ return import_from_modulestore
+
+
+def import_staged_content_to_library(
+ usage_ids: Sequence[str | UsageKey],
+ import_uuid: str,
+ target_learning_package_id: int,
+ user_id: int,
+ composition_level: str,
+ override: bool,
+) -> None:
+ """
+ Import staged content to a library from staged content.
+ """
+ validate_usage_keys_to_import(usage_ids)
+ import_staged_content_to_library_task.apply_async(
+ kwargs={
+ 'usage_key_strings': usage_ids,
+ 'import_uuid': import_uuid,
+ 'learning_package_id': target_learning_package_id,
+ 'user_id': user_id,
+ 'composition_level': composition_level,
+ 'override': override,
+ },
+ )
diff --git a/cms/djangoapps/import_from_modulestore/constants.py b/cms/djangoapps/import_from_modulestore/constants.py
new file mode 100644
index 000000000000..09e0d4e30f1a
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/constants.py
@@ -0,0 +1,5 @@
+"""
+Constants for import_from_modulestore app
+"""
+
+IMPORT_FROM_MODULESTORE_STAGING_PURPOSE = "import_from_modulestore"
diff --git a/cms/djangoapps/import_from_modulestore/data.py b/cms/djangoapps/import_from_modulestore/data.py
index 7821e463a76a..998ea8dfc745 100644
--- a/cms/djangoapps/import_from_modulestore/data.py
+++ b/cms/djangoapps/import_from_modulestore/data.py
@@ -1,6 +1,10 @@
"""
This module contains the data models for the import_from_modulestore app.
"""
+from collections import namedtuple
+from enum import Enum
+from openedx.core.djangoapps.content_libraries import api as content_libraries_api
+
from django.db.models import TextChoices
from django.utils.translation import gettext_lazy as _
@@ -18,3 +22,33 @@ class ImportStatus(TextChoices):
IMPORTING_FAILED = 'importing_failed', _('Failed to import staged content')
IMPORTED = 'imported', _('Successfully imported content')
CANCELED = 'canceled', _('Canceled')
+
+
+class CompositionLevel(Enum):
+ """
+ Enumeration of composition levels for course content.
+ Defines the different levels of composition for course content,
+ including chapters, sequentials, verticals, and xblocks.
+ It also categorizes these levels into complicated and flat
+ levels for easier processing.
+ """
+
+ CHAPTER = content_libraries_api.ContainerType.Section
+ SEQUENTIAL = content_libraries_api.ContainerType.Subsection
+ VERTICAL = content_libraries_api.ContainerType.Unit
+ COMPONENT = 'component'
+ OLX_COMPLEX_LEVELS = [
+ VERTICAL.olx_tag,
+ SEQUENTIAL.olx_tag,
+ CHAPTER.olx_tag,
+ ]
+
+ @classmethod
+ def values(cls):
+ """
+ Returns all levels of composition levels.
+ """
+ return [composition_level.value for composition_level in cls]
+
+
+PublishableVersionWithMapping = namedtuple('PublishableVersionWithMapping', ['publishable_version', 'mapping'])
diff --git a/cms/djangoapps/import_from_modulestore/helpers.py b/cms/djangoapps/import_from_modulestore/helpers.py
new file mode 100644
index 000000000000..e540e0ff3dba
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/helpers.py
@@ -0,0 +1,466 @@
+"""
+Helper functions for importing course content into a library.
+"""
+from datetime import datetime, timezone
+from functools import partial
+import logging
+import mimetypes
+import os
+import secrets
+from typing import TYPE_CHECKING
+
+from django.db import transaction
+from django.db.utils import IntegrityError
+from lxml import etree
+
+from opaque_keys.edx.keys import UsageKey
+from opaque_keys.edx.locator import CourseLocator
+from openedx_learning.api import authoring as authoring_api
+from openedx_learning.api.authoring_models import Component, Container, ContainerVersion, PublishableEntity
+
+from openedx.core.djangoapps.content_libraries import api
+from openedx.core.djangoapps.content_staging import api as content_staging_api
+from xmodule.modulestore.django import modulestore
+
+from .data import CompositionLevel, ImportStatus, PublishableVersionWithMapping
+from .models import Import, PublishableEntityMapping
+
+if TYPE_CHECKING:
+ from openedx_learning.apps.authoring_models import LearningPackage
+ from xblock.core import XBlock
+
+ from openedx.core.djangoapps.content_staging.api import _StagedContent as StagedContent
+
+
+log = logging.getLogger(__name__)
+parser = etree.XMLParser(strip_cdata=False)
+
+
+class ImportClient:
+ """
+ Client for importing course content into a library.
+
+ This class handles the import of course content from staged content into a
+ content library, creating the appropriate container hierarchy based on the
+ specified composition level.
+ """
+
+ # The create functions have different kwarg names for the child list,
+ # so we need to use partial to set the child list to empty.
+ CONTAINER_CREATORS_MAP: dict[str, partial] = {
+ api.ContainerType.Section.olx_tag: partial(authoring_api.create_section_and_version, subsections=[]),
+ api.ContainerType.Subsection.olx_tag: partial(authoring_api.create_subsection_and_version, units=[]),
+ api.ContainerType.Unit.olx_tag: partial(authoring_api.create_unit_and_version, components=[]),
+ }
+
+ CONTAINER_OVERRIDERS_MAP: dict[str, partial] = {
+ api.ContainerType.Section.olx_tag: partial(authoring_api.create_next_section_version, subsections=[]),
+ api.ContainerType.Subsection.olx_tag: partial(authoring_api.create_next_subsection_version, units=[]),
+ api.ContainerType.Unit.olx_tag: partial(authoring_api.create_next_unit_version, components=[]),
+ }
+
+ def __init__(
+ self,
+ import_event: Import,
+ block_usage_key_to_import: str,
+ target_learning_package: 'LearningPackage',
+ staged_content: 'StagedContent',
+ composition_level: str,
+ override: bool = False,
+ ):
+ self.import_event = import_event
+ self.block_usage_key_to_import = block_usage_key_to_import
+ self.learning_package = target_learning_package
+ self.staged_content = staged_content
+ self.composition_level = composition_level
+ self.override = override
+
+ self.user_id = import_event.user_id
+ self.content_library = target_learning_package.contentlibrary
+ self.library_key = self.content_library.library_key
+ self.parser = etree.XMLParser(strip_cdata=False)
+
+ def import_from_staged_content(self) -> list[PublishableVersionWithMapping]:
+ """
+ Import staged content into a library.
+ """
+ node = etree.fromstring(self.staged_content.olx, parser=parser)
+ usage_key = UsageKey.from_string(self.block_usage_key_to_import)
+ block_to_import = get_node_for_usage_key(node, usage_key)
+ if block_to_import is None:
+ return []
+
+ return self._process_import(self.block_usage_key_to_import, block_to_import)
+
+ def _process_import(self, usage_key_string, block_to_import) -> list[PublishableVersionWithMapping]:
+ """
+ Process import of a block from staged content into a library.
+
+ Imports a block and its children into the library based on the
+ composition level. It handles both simple and complicated blocks, creating
+ the necessary container hierarchy.
+ """
+ usage_key = UsageKey.from_string(usage_key_string)
+ result = []
+
+ if block_to_import.tag not in CompositionLevel.OLX_COMPLEX_LEVELS.value:
+ return self._import_simple_block(block_to_import, usage_key)
+
+ for child in block_to_import.getchildren():
+ child_usage_key_string = get_usage_key_string_from_staged_content(
+ self.staged_content, child.get('url_name')
+ )
+ if not child_usage_key_string:
+ continue
+
+ result.extend(self._import_child_block(child, child_usage_key_string))
+
+ if self.composition_level == CompositionLevel.COMPONENT.value:
+ return [
+ publishable_version_with_mapping for publishable_version_with_mapping in result
+ if not isinstance(publishable_version_with_mapping.publishable_version, ContainerVersion)
+ ]
+ return result
+
+ def _import_simple_block(self, block_to_import, usage_key) -> list[PublishableVersionWithMapping]:
+ """
+ Import a simple block into the library.
+
+ Creates a block in the library from the staged content block.
+ It returns a list containing the created component version.
+ """
+ publishable_version_with_mapping = self._create_block_in_library(block_to_import, usage_key)
+ return [publishable_version_with_mapping] if publishable_version_with_mapping else []
+
+ def _import_child_block(self, child, child_usage_key_string):
+ """
+ Import a child block into the library.
+
+ Determines whether the child block is simple or complicated and
+ delegates the import process to the appropriate helper method.
+ """
+ child_usage_key = UsageKey.from_string(child_usage_key_string)
+ if child.tag in CompositionLevel.OLX_COMPLEX_LEVELS.value:
+ return self._import_complicated_child(child, child_usage_key_string)
+ else:
+ return self._import_simple_block(child, child_usage_key)
+
+ def _import_complicated_child(self, child, child_usage_key_string):
+ """
+ Import a complicated child block into the library.
+
+ Handles the import of complicated child blocks, including creating
+ containers and updating components.
+ Returns a list containing the created container version.
+ """
+ if not self._should_create_container(child.tag):
+ return self._process_import(child_usage_key_string, child)
+
+ container_version_with_mapping = self.get_or_create_container(
+ child.tag,
+ child.get('url_name'),
+ child.get('display_name', child.tag),
+ child_usage_key_string,
+ )
+ child_component_versions_with_mapping = self._process_import(child_usage_key_string, child)
+ child_component_versions = [
+ child_component_version.publishable_version for child_component_version
+ in child_component_versions_with_mapping
+ ]
+ self._update_container_components(container_version_with_mapping.publishable_version, child_component_versions)
+ return [container_version_with_mapping] + child_component_versions_with_mapping
+
+ def _should_create_container(self, container_type: str) -> bool:
+ """
+ Determine if a new container should be created.
+
+ Container type should be at a lower level than the current composition level.
+ """
+ composition_hierarchy = CompositionLevel.OLX_COMPLEX_LEVELS.value
+ return (
+ container_type in composition_hierarchy and
+ self.composition_level in composition_hierarchy and
+ composition_hierarchy.index(container_type) <= composition_hierarchy.index(self.composition_level)
+ )
+
+ def get_or_create_container(
+ self,
+ container_type: str,
+ key: str,
+ display_name: str,
+ block_usage_key_string: str
+ ) -> PublishableVersionWithMapping:
+ """
+ Create a container of the specified type.
+
+ Creates a container (e.g., chapter, sequential, vertical) in the
+ content library.
+ """
+ try:
+ container_creator_func = self.CONTAINER_CREATORS_MAP[container_type]
+ container_override_func = self.CONTAINER_OVERRIDERS_MAP[container_type]
+ except KeyError as exc:
+ raise ValueError(f"Unknown container type: {container_type}") from exc
+
+ try:
+ container_version = self.content_library.learning_package.publishable_entities.get(key=key)
+ except PublishableEntity.DoesNotExist:
+ container_version = None
+
+ if container_version and self.override:
+ container_version = container_override_func(
+ container_version.container,
+ title=display_name or f"New {container_type}",
+ created=datetime.now(tz=timezone.utc),
+ created_by=self.import_event.user_id,
+ )
+ elif not container_version:
+ _, container_version = container_creator_func(
+ self.learning_package.id,
+ key=key or secrets.token_hex(16),
+ title=display_name or f"New {container_type}",
+ created=datetime.now(tz=timezone.utc),
+ created_by=self.import_event.user_id,
+ )
+
+ publishable_entity_mapping, _ = get_or_create_publishable_entity_mapping(
+ block_usage_key_string,
+ container_version.container
+ )
+
+ return PublishableVersionWithMapping(container_version, publishable_entity_mapping)
+
+ def _update_container_components(self, container_version, component_versions):
+ """
+ Update components of a container.
+ """
+ entity_rows = [
+ authoring_api.ContainerEntityRow(
+ entity_pk=cv.container.pk if isinstance(cv, ContainerVersion) else cv.component.pk,
+ version_pk=cv.pk,
+ )
+ for cv in component_versions
+ ]
+ return authoring_api.create_next_container_version(
+ container_pk=container_version.container.pk,
+ title=container_version.title,
+ entity_rows=entity_rows,
+ created=datetime.now(tz=timezone.utc),
+ created_by=self.import_event.user_id,
+ container_version_cls=container_version.__class__,
+ )
+
+ def _create_block_in_library(self, block_to_import, usage_key) -> PublishableVersionWithMapping | None:
+ """
+ Create a block in a library from a staged content block.
+ """
+ now = datetime.now(tz=timezone.utc)
+ staged_content_files = content_staging_api.get_staged_content_static_files(self.staged_content.id)
+
+ with transaction.atomic():
+ component_type = authoring_api.get_or_create_component_type("xblock.v1", usage_key.block_type)
+ does_component_exist = authoring_api.get_components(
+ self.learning_package.id
+ ).filter(local_key=usage_key.block_id).exists()
+
+ if does_component_exist:
+ if not self.override:
+ log.info(f"Component {usage_key.block_id} already exists in library {self.library_key}, skipping.")
+ return None
+ else:
+ component_version = self._handle_component_override(usage_key, etree.tostring(block_to_import))
+ else:
+ try:
+ _, library_usage_key = api.validate_can_add_block_to_library(
+ self.library_key,
+ block_to_import.tag,
+ usage_key.block_id,
+ )
+ except api.IncompatibleTypesError as e:
+ log.error(f"Error validating block {usage_key} for library {self.library_key}: {e}")
+ return None
+
+ authoring_api.create_component(
+ self.learning_package.id,
+ component_type=component_type,
+ local_key=usage_key.block_id,
+ created=now,
+ created_by=self.import_event.user_id,
+ )
+ component_version = api.set_library_block_olx(library_usage_key, etree.tostring(block_to_import))
+
+ self._process_staged_content_files(
+ component_version,
+ staged_content_files,
+ usage_key,
+ block_to_import,
+ now,
+ )
+ publishable_entity_mapping, _ = get_or_create_publishable_entity_mapping(
+ usage_key,
+ component_version.component
+ )
+ return PublishableVersionWithMapping(component_version, publishable_entity_mapping)
+
+ def _handle_component_override(self, usage_key, new_content):
+ """
+ Create new ComponentVersion for overridden component.
+ """
+ component_version = None
+ try:
+ component = authoring_api.get_components(self.learning_package.id).get(local_key=usage_key.block_id)
+ except Component.DoesNotExist:
+ return component_version
+ library_usage_key = api.library_component_usage_key(self.library_key, component)
+
+ component_version = api.set_library_block_olx(library_usage_key, new_content)
+
+ return component_version
+
+ def _process_staged_content_files(
+ self,
+ component_version,
+ staged_content_files,
+ usage_key,
+ block_to_import,
+ created_at,
+ ):
+ """
+ Process staged content files for a component.
+
+ Processes the staged content files for a component, creating the
+ necessary file content and associating it with the component version.
+ """
+ block_olx = etree.tostring(block_to_import).decode('utf-8')
+
+ for staged_content_file_data in staged_content_files:
+ original_filename = staged_content_file_data.filename
+ file_basename = os.path.basename(original_filename)
+ file_basename_no_ext, _ = os.path.splitext(file_basename)
+
+ # Skip files not referenced in the block
+ if file_basename not in block_olx and file_basename_no_ext not in block_olx:
+ log.info(f"Skipping file {original_filename} as it is not referenced in block {usage_key}")
+ continue
+
+ file_data = content_staging_api.get_staged_content_static_file_data(
+ self.staged_content.id,
+ original_filename,
+ )
+ if not file_data:
+ log.error(
+ f"Staged content {self.staged_content.id} included referenced "
+ f"file {original_filename}, but no file data was found."
+ )
+ continue
+
+ filename = f"static/{file_basename}"
+ media_type_str, _ = mimetypes.guess_type(filename)
+ if not media_type_str:
+ media_type_str = "application/octet-stream"
+
+ media_type = authoring_api.get_or_create_media_type(media_type_str)
+ content = authoring_api.get_or_create_file_content(
+ self.learning_package.id,
+ media_type.id,
+ data=file_data,
+ created=created_at,
+ )
+
+ try:
+ authoring_api.create_component_version_content(component_version.pk, content.id, key=filename)
+ except IntegrityError:
+ pass # Content already exists
+
+
+def import_from_staged_content(
+ import_event: Import,
+ usage_key_string: str,
+ target_learning_package: 'LearningPackage',
+ staged_content: 'StagedContent',
+ composition_level: str,
+ override: bool = False,
+) -> list[PublishableVersionWithMapping]:
+ """
+ Import staged content to a library from staged content.
+
+ Returns a list of PublishableVersionWithMappings created during the import.
+ """
+ import_client = ImportClient(
+ import_event,
+ usage_key_string,
+ target_learning_package,
+ staged_content,
+ composition_level,
+ override,
+ )
+ return import_client.import_from_staged_content()
+
+
+def get_or_create_publishable_entity_mapping(usage_key, component) -> tuple[PublishableEntityMapping, bool]:
+ """
+ Creates a mapping between the source usage key and the target publishable entity.
+ """
+ if isinstance(component, Container):
+ target_package = component.publishable_entity.learning_package
+ else:
+ target_package = component.learning_package
+ return PublishableEntityMapping.objects.get_or_create(
+ source_usage_key=usage_key,
+ target_entity=component.publishable_entity,
+ target_package=target_package,
+ )
+
+
+def get_usage_key_string_from_staged_content(staged_content: 'StagedContent', block_id: str) -> str | None:
+ """
+ Get the usage ID from a staged content by block ID.
+ """
+ if staged_content.tags is None:
+ return None
+ return next((block_usage_id for block_usage_id in staged_content.tags if block_usage_id.endswith(block_id)), None)
+
+
+def get_node_for_usage_key(node: etree._Element, usage_key: UsageKey) -> etree._Element:
+ """
+ Get the node in an XML tree which matches to the usage key.
+ """
+ if node.tag == usage_key.block_type and node.get('url_name') == usage_key.block_id:
+ return node
+
+ for child in node.getchildren():
+ found = get_node_for_usage_key(child, usage_key)
+ if found is not None:
+ return found
+
+
+def get_items_to_import(import_event: Import) -> list['XBlock']:
+ """
+ Collect items to import from a course.
+ """
+ items_to_import: list['XBlock'] = []
+ if isinstance(import_event.source_key, CourseLocator):
+ items_to_import.extend(
+ modulestore().get_items(import_event.source_key, qualifiers={"category": "chapter"}) or []
+ )
+ items_to_import.extend(
+ modulestore().get_items(import_event.source_key, qualifiers={"category": "static_tab"}) or []
+ )
+
+ return items_to_import
+
+
+def cancel_incomplete_old_imports(import_event: Import) -> None:
+ """
+ Cancel any incomplete imports that have the same target as the current import.
+
+ When a new import is created, we want to cancel any other incomplete user imports that have the same target.
+ """
+ incomplete_user_imports_with_same_target = Import.objects.filter(
+ user=import_event.user,
+ target_change=import_event.target_change,
+ source_key=import_event.source_key,
+ staged_content_for_import__isnull=False
+ ).exclude(uuid=import_event.uuid)
+ for incomplete_import in incomplete_user_imports_with_same_target:
+ incomplete_import.set_status(ImportStatus.CANCELED)
diff --git a/cms/djangoapps/import_from_modulestore/models.py b/cms/djangoapps/import_from_modulestore/models.py
index acbe82fa6d07..5b6122749bba 100644
--- a/cms/djangoapps/import_from_modulestore/models.py
+++ b/cms/djangoapps/import_from_modulestore/models.py
@@ -1,7 +1,6 @@
"""
Models for the course to library import app.
"""
-
import uuid as uuid_tools
from django.contrib.auth import get_user_model
diff --git a/cms/djangoapps/import_from_modulestore/tasks.py b/cms/djangoapps/import_from_modulestore/tasks.py
new file mode 100644
index 000000000000..4644b29e4904
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tasks.py
@@ -0,0 +1,101 @@
+"""
+Tasks for course to library import.
+"""
+
+from celery import shared_task
+from celery.utils.log import get_task_logger
+from django.db import transaction
+from edx_django_utils.monitoring import set_code_owner_attribute
+
+from openedx_learning.api import authoring as authoring_api
+from openedx_learning.api.authoring_models import LearningPackage
+from openedx.core.djangoapps.content_staging import api as content_staging_api
+
+from .constants import IMPORT_FROM_MODULESTORE_STAGING_PURPOSE
+from .data import ImportStatus
+from .helpers import get_items_to_import, import_from_staged_content
+from .models import Import, PublishableEntityImport, StagedContentForImport
+from .validators import validate_composition_level
+
+log = get_task_logger(__name__)
+
+
+@shared_task
+@set_code_owner_attribute
+def save_legacy_content_to_staged_content_task(import_uuid: str) -> None:
+ """
+ Save courses to staged content task by sections/chapters.
+ """
+ import_event = Import.objects.get(uuid=import_uuid)
+
+ import_event.clean_related_staged_content()
+ import_event.set_status(ImportStatus.STAGING)
+ try:
+ with transaction.atomic():
+ items_to_import = get_items_to_import(import_event)
+ for item in items_to_import:
+ staged_content = content_staging_api.stage_xblock_temporarily(
+ item,
+ import_event.user.id,
+ purpose=IMPORT_FROM_MODULESTORE_STAGING_PURPOSE,
+ )
+ StagedContentForImport.objects.create(
+ staged_content=staged_content,
+ import_event=import_event,
+ source_usage_key=item.location
+ )
+
+ if items_to_import:
+ import_event.set_status(ImportStatus.STAGED)
+ else:
+ import_event.set_status(ImportStatus.STAGING_FAILED)
+ except Exception as exc: # pylint: disable=broad-except
+ import_event.set_status(ImportStatus.STAGING_FAILED)
+ raise exc
+
+
+@shared_task
+@set_code_owner_attribute
+def import_staged_content_to_library_task(
+ usage_key_strings: list[str],
+ import_uuid: str,
+ learning_package_id: int,
+ user_id: int,
+ composition_level: str,
+ override: bool,
+) -> None:
+ """
+ Import staged content to a library task.
+ """
+ validate_composition_level(composition_level)
+
+ import_event = Import.objects.get(uuid=import_uuid, status=ImportStatus.STAGED, user_id=user_id)
+ target_learning_package = LearningPackage.objects.get(id=learning_package_id)
+
+ imported_publishable_versions = []
+ with authoring_api.bulk_draft_changes_for(learning_package_id=learning_package_id) as change_log:
+ try:
+ for usage_key_string in usage_key_strings:
+ staged_content_for_import = import_event.staged_content_for_import.get(
+ source_usage_key=usage_key_string
+ )
+ publishable_versions = import_from_staged_content(
+ import_event,
+ usage_key_string,
+ target_learning_package,
+ staged_content_for_import.staged_content,
+ composition_level,
+ override,
+ )
+ imported_publishable_versions.extend(publishable_versions)
+ except: # pylint: disable=bare-except
+ import_event.set_status(ImportStatus.IMPORTING_FAILED)
+ raise
+
+ import_event.set_status(ImportStatus.IMPORTED)
+ for imported_component_version in imported_publishable_versions:
+ PublishableEntityImport.objects.create(
+ import_event=import_event,
+ resulting_mapping=imported_component_version.mapping,
+ resulting_change=change_log.records.get(entity=imported_component_version.mapping.target_entity),
+ )
diff --git a/cms/djangoapps/import_from_modulestore/tests/__init__.py b/cms/djangoapps/import_from_modulestore/tests/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/cms/djangoapps/import_from_modulestore/tests/factories.py b/cms/djangoapps/import_from_modulestore/tests/factories.py
new file mode 100644
index 000000000000..368cc0ed94ff
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tests/factories.py
@@ -0,0 +1,28 @@
+"""
+Factories for Import model.
+"""
+
+import uuid
+
+import factory
+from factory.django import DjangoModelFactory
+from opaque_keys.edx.keys import CourseKey
+
+from common.djangoapps.student.tests.factories import UserFactory
+from cms.djangoapps.import_from_modulestore.models import Import
+
+
+class ImportFactory(DjangoModelFactory):
+ """
+ Factory for Import model.
+ """
+
+ class Meta:
+ model = Import
+
+ @factory.lazy_attribute
+ def source_key(self):
+ return CourseKey.from_string(f'course-v1:edX+DemoX+{self.uuid}')
+
+ uuid = factory.LazyFunction(lambda: str(uuid.uuid4()))
+ user = factory.SubFactory(UserFactory)
diff --git a/cms/djangoapps/import_from_modulestore/tests/test_api.py b/cms/djangoapps/import_from_modulestore/tests/test_api.py
new file mode 100644
index 000000000000..62fe2e4159a9
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tests/test_api.py
@@ -0,0 +1,109 @@
+"""
+Test cases for import_from_modulestore.api module.
+"""
+from unittest.mock import patch
+
+import pytest
+from opaque_keys.edx.keys import CourseKey
+from organizations.models import Organization
+
+from common.djangoapps.student.tests.factories import UserFactory
+from cms.djangoapps.import_from_modulestore.api import import_staged_content_to_library, stage_content_for_import
+from cms.djangoapps.import_from_modulestore.data import ImportStatus
+from cms.djangoapps.import_from_modulestore.models import Import
+from openedx.core.djangoapps.content_libraries import api as content_libraries_api
+from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
+from .factories import ImportFactory
+
+
+@pytest.mark.django_db
+class TestCourseToLibraryImportAPI(ModuleStoreTestCase):
+ """
+ Test cases for Import API.
+ """
+
+ def setUp(self):
+ super().setUp()
+
+ _library_metadata = content_libraries_api.create_library(
+ org=Organization.objects.create(name='Organization 1', short_name='org1'),
+ slug='lib_1',
+ title='Library Org 1',
+ description='This is a library from Org 1',
+ )
+ self.library = content_libraries_api.ContentLibrary.objects.get_by_key(_library_metadata.key)
+
+ def test_stage_content_for_import(self):
+ """
+ Test stage_content_for_import function.
+ """
+ course_id = "course-v1:edX+DemoX+Demo_Course"
+ user = UserFactory()
+ stage_content_for_import(course_id, user.id)
+
+ import_event = Import.objects.get()
+ assert import_event.source_key == CourseKey.from_string(course_id)
+ assert import_event.user_id == user.id
+ assert import_event.status == ImportStatus.NOT_STARTED
+
+ def test_import_staged_content_to_library(self):
+ """
+ Test import_staged_content_to_library function with different override values.
+ """
+ import_event = ImportFactory(
+ source_key=CourseKey.from_string("course-v1:edX+DemoX+Demo_Course"),
+ )
+ usage_ids = [
+ "block-v1:edX+DemoX+Demo_Course+type@chapter+block@123",
+ "block-v1:edX+DemoX+Demo_Course+type@chapter+block@456",
+ ]
+ override = False
+
+ with patch(
+ "cms.djangoapps.import_from_modulestore.api.import_staged_content_to_library_task"
+ ) as import_staged_content_to_library_task_mock:
+ import_staged_content_to_library(
+ usage_ids,
+ import_event.uuid,
+ self.library.learning_package.id,
+ import_event.user.id,
+ "xblock",
+ override
+ )
+
+ import_staged_content_to_library_task_mock.apply_async.assert_called_once_with(
+ kwargs={
+ "usage_key_strings": usage_ids,
+ "import_uuid": import_event.uuid,
+ "learning_package_id": self.library.learning_package.id,
+ "user_id": import_event.user.id,
+ "composition_level": "xblock",
+ "override": override,
+ },
+ )
+
+ def test_import_staged_content_to_library_invalid_usage_key(self):
+ """
+ Test import_staged_content_to_library function with not chapter usage keys.
+ """
+ import_event = ImportFactory(
+ source_key=CourseKey.from_string("course-v1:edX+DemoX+Demo_Course"),
+ )
+ usage_ids = [
+ "block-v1:edX+DemoX+Demo_Course+type@problem+block@123",
+ "block-v1:edX+DemoX+Demo_Course+type@vertical+block@456",
+ ]
+
+ with patch(
+ "cms.djangoapps.import_from_modulestore.api.import_staged_content_to_library_task"
+ ) as import_staged_content_to_library_task_mock:
+ with self.assertRaises(ValueError):
+ import_staged_content_to_library(
+ usage_ids,
+ import_event.uuid,
+ self.library.learning_package.id,
+ import_event.user.id,
+ "xblock",
+ False
+ )
+ import_staged_content_to_library_task_mock.apply_async.assert_not_called()
diff --git a/cms/djangoapps/import_from_modulestore/tests/test_helpers.py b/cms/djangoapps/import_from_modulestore/tests/test_helpers.py
new file mode 100644
index 000000000000..1d2ce26867aa
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tests/test_helpers.py
@@ -0,0 +1,396 @@
+"""
+Tests for the import_from_modulestore helper functions.
+"""
+import ddt
+from organizations.models import Organization
+from unittest import mock
+from unittest.mock import patch
+
+from lxml import etree
+from openedx_learning.api.authoring_models import LearningPackage
+
+from cms.djangoapps.import_from_modulestore import api
+from cms.djangoapps.import_from_modulestore.helpers import ImportClient
+from common.djangoapps.student.tests.factories import UserFactory
+
+from openedx.core.djangoapps.content_libraries import api as content_libraries_api
+from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
+from xmodule.modulestore.tests.factories import CourseFactory, BlockFactory
+
+
+@ddt.ddt
+class TestImportClient(ModuleStoreTestCase):
+ """
+ Functional tests for the ImportClient class.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.library = content_libraries_api.create_library(
+ org=Organization.objects.create(name='Organization 1', short_name='org1'),
+ slug='lib_1',
+ title='Library Org 1',
+ description='This is a library from Org 1',
+ )
+ self.learning_package = LearningPackage.objects.get(id=self.library.learning_package_id)
+ self.user = UserFactory()
+ self.course = CourseFactory.create()
+ self.chapter = BlockFactory.create(category='chapter', parent=self.course, display_name='Chapter')
+ self.sequential = BlockFactory.create(category='sequential', parent=self.chapter, display_name='Sequential')
+ self.vertical = BlockFactory.create(category='vertical', parent=self.sequential, display_name='Vertical')
+ self.problem = BlockFactory.create(
+ category='problem',
+ parent=self.vertical,
+ display_name='Problem',
+ data="""""",
+ )
+ self.video = BlockFactory.create(
+ category='video',
+ parent=self.vertical,
+ display_name='Video',
+ data="""""",
+ )
+ with self.captureOnCommitCallbacks(execute=True):
+ self.import_event = api.stage_content_for_import(source_key=self.course.id, user_id=self.user.id)
+ self.parser = etree.XMLParser(strip_cdata=False)
+
+ def test_import_from_staged_content(self):
+ expected_imported_xblocks = [self.video, self.problem]
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=str(self.chapter.location),
+ composition_level='xblock',
+ override=False
+ )
+
+ import_client.import_from_staged_content()
+
+ self.assertEqual(self.learning_package.content_set.count(), len(expected_imported_xblocks))
+
+ @patch('cms.djangoapps.import_from_modulestore.helpers.ImportClient._process_import')
+ def test_import_from_staged_content_block_not_found(self, mocked_process_import):
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import='block-v1:edX+Demo+2025+type@chapter+block@12345',
+ composition_level='xblock',
+ override=False
+ )
+
+ import_client.import_from_staged_content()
+
+ self.assertTrue(not self.learning_package.content_set.count())
+ mocked_process_import.assert_not_called()
+
+ @ddt.data(
+ 'chapter',
+ 'sequential',
+ 'vertical'
+ )
+ def test_create_container(self, block_lvl):
+ container_to_import = getattr(self, block_lvl)
+ block_usage_key_to_import = str(container_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content_for_import.staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=False
+ )
+ import_client.get_or_create_container(
+ container_to_import.category,
+ container_to_import.location.block_id,
+ container_to_import.display_name,
+ str(container_to_import.location)
+ )
+
+ self.assertEqual(self.learning_package.publishable_entities.count(), 1)
+
+ def test_create_container_with_xblock(self):
+ block_usage_key_to_import = str(self.problem.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content_for_import.staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=False
+ )
+ with self.assertRaises(ValueError):
+ import_client.get_or_create_container(
+ self.problem.category,
+ self.problem.location.block_id,
+ self.problem.display_name,
+ str(self.problem.location),
+ )
+
+ @ddt.data('chapter', 'sequential', 'vertical')
+ def test_process_import_with_complicated_blocks(self, block_lvl):
+ container_to_import = getattr(self, block_lvl)
+ block_usage_key_to_import = str(container_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+ expected_imported_xblocks = [self.problem, self.video]
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ block_usage_key_to_import=block_usage_key_to_import,
+ target_learning_package=self.learning_package,
+ composition_level='xblock',
+ override=False
+ )
+ block_to_import = etree.fromstring(staged_content.olx, parser=self.parser)
+ # pylint: disable=protected-access
+ result = import_client.import_from_staged_content()
+
+ self.assertEqual(self.learning_package.content_set.count(), len(expected_imported_xblocks))
+ self.assertEqual(len(result), len(expected_imported_xblocks))
+
+ @ddt.data('problem', 'video')
+ def test_process_import_with_simple_blocks(self, block_type_to_import):
+ block_to_import = getattr(self, block_type_to_import)
+ block_usage_key_to_import = str(block_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ expected_imported_xblocks = [block_to_import]
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content_for_import.staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=False
+ )
+
+ block_to_import = etree.fromstring(block_to_import.data, parser=self.parser)
+ # pylint: disable=protected-access
+ result = import_client._process_import(block_usage_key_to_import, block_to_import)
+
+ self.assertEqual(self.learning_package.content_set.count(), len(expected_imported_xblocks))
+ self.assertEqual(len(result), len(expected_imported_xblocks))
+
+ @ddt.data(True, False)
+ def test_process_import_with_override(self, override):
+ block_to_import = self.problem
+ block_usage_key_to_import = str(block_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content_for_import.staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=False
+ )
+
+ block_xml = etree.fromstring(block_to_import.data, parser=self.parser)
+ # pylint: disable=protected-access
+ result1 = import_client._process_import(block_usage_key_to_import, block_xml)
+ self.assertEqual(len(result1), 1)
+
+ with self.captureOnCommitCallbacks(execute=True):
+ new_import_event = api.stage_content_for_import(source_key=self.course.id, user_id=self.user.id)
+
+ staged_content_for_import = new_import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ new_staged_content = staged_content_for_import.staged_content
+ import_client = ImportClient(
+ import_event=new_import_event,
+ staged_content=new_staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=override
+ )
+
+ if override:
+ modified_data = block_to_import.data.replace('DisplayName', 'ModifiedName')
+ modified_block = BlockFactory.create(
+ category='problem',
+ parent=self.vertical,
+ display_name='Modified Problem',
+ data=modified_data,
+ )
+ block_xml = etree.fromstring(modified_block.data, parser=self.parser)
+
+ # pylint: disable=protected-access
+ result2 = import_client._process_import(block_usage_key_to_import, block_xml)
+ self.assertEqual(len(result2), 1)
+
+ assert result2[0].publishable_version.title == 'ModifiedName'
+ else:
+ # pylint: disable=protected-access
+ result2 = import_client._process_import(block_usage_key_to_import, block_xml)
+ self.assertEqual(result2, [])
+
+ @patch('cms.djangoapps.import_from_modulestore.helpers.authoring_api')
+ def test_container_override(self, mock_authoring_api):
+ container_to_import = self.vertical
+ block_usage_key_to_import = str(container_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='vertical',
+ override=False
+ )
+
+ container_version_with_mapping = import_client.get_or_create_container(
+ 'vertical',
+ container_to_import.location.block_id,
+ container_to_import.display_name,
+ str(container_to_import.location),
+ )
+ assert container_version_with_mapping is not None
+ assert container_version_with_mapping.publishable_version.title == container_to_import.display_name
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='vertical',
+ override=True
+ )
+ container_version_with_mapping = import_client.get_or_create_container(
+ 'vertical',
+ container_to_import.location.block_id,
+ 'New Display Name',
+ str(container_to_import.location),
+ )
+ overrided_container_version = container_version_with_mapping.publishable_version
+ assert overrided_container_version is not None
+ assert overrided_container_version.title == 'New Display Name'
+
+ @ddt.data('xblock', 'vertical')
+ def test_composition_levels(self, composition_level):
+ if composition_level == 'xblock':
+ expected_imported_blocks = [self.problem, self.video]
+ else:
+ # The vertical block is expected to be imported as a container
+ # with the same location as the original vertical block.
+ expected_imported_blocks = [self.vertical, self.problem, self.video]
+
+ container_to_import = self.vertical
+ block_usage_key_to_import = str(container_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level=composition_level,
+ override=False
+ )
+
+ block_xml = etree.fromstring(staged_content.olx, parser=self.parser)
+ # pylint: disable=protected-access
+ result = import_client._process_import(block_usage_key_to_import, block_xml)
+
+ self.assertEqual(len(result), len(expected_imported_blocks))
+
+ @patch('cms.djangoapps.import_from_modulestore.helpers.content_staging_api')
+ def test_process_staged_content_files(self, mock_content_staging_api):
+ block_to_import = self.problem
+ block_usage_key_to_import = str(block_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+ staged_content = staged_content_for_import.staged_content
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='xblock',
+ override=False
+ )
+
+ mock_file_data = b'file content'
+ mock_file = mock.MagicMock()
+ mock_file.filename = 'test.png'
+ mock_content_staging_api.get_staged_content_static_files.return_value = [mock_file]
+ mock_content_staging_api.get_staged_content_static_file_data.return_value = mock_file_data
+
+ modified_data = '
'
+ modified_block = BlockFactory.create(
+ category='problem',
+ parent=self.vertical,
+ display_name='Problem With Image',
+ data=modified_data,
+ )
+ block_xml = etree.fromstring(modified_data, parser=self.parser)
+
+ # pylint: disable=protected-access
+ import_client._create_block_in_library(block_xml, modified_block.location)
+ mock_content_staging_api.get_staged_content_static_file_data.assert_called_once_with(
+ staged_content.id, 'test.png'
+ )
+
+ def test_update_container_components(self):
+ container_to_import = self.vertical
+ block_usage_key_to_import = str(container_to_import.location)
+ staged_content_for_import = self.import_event.staged_content_for_import.get(
+ source_usage_key=self.chapter.location
+ )
+
+ import_client = ImportClient(
+ import_event=self.import_event,
+ staged_content=staged_content_for_import.staged_content,
+ target_learning_package=self.learning_package,
+ block_usage_key_to_import=block_usage_key_to_import,
+ composition_level='container',
+ override=False
+ )
+
+ with patch('cms.djangoapps.import_from_modulestore.helpers.authoring_api') as mock_authoring_api:
+ mock_container_version = mock.MagicMock()
+ mock_component_version1 = mock.MagicMock()
+ mock_component_version2 = mock.MagicMock()
+ mock_component_versions = [mock_component_version1, mock_component_version2]
+
+ # pylint: disable=protected-access
+ import_client._update_container_components(mock_container_version, mock_component_versions)
+
+ mock_authoring_api.create_next_container_version.assert_called_once()
+ call_args = mock_authoring_api.create_next_container_version.call_args[1]
+ self.assertEqual(call_args['container_pk'], mock_container_version.container.pk)
+ self.assertEqual(call_args['title'], mock_container_version.title)
+ self.assertEqual(call_args['created_by'], self.user.id)
diff --git a/cms/djangoapps/import_from_modulestore/tests/test_tasks.py b/cms/djangoapps/import_from_modulestore/tests/test_tasks.py
new file mode 100644
index 000000000000..8c24fa95c1cc
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tests/test_tasks.py
@@ -0,0 +1,169 @@
+"""
+Tests for tasks in import_from_modulestore app.
+"""
+from django.core.exceptions import ObjectDoesNotExist
+from organizations.models import Organization
+from openedx_learning.api.authoring_models import LearningPackage
+from unittest.mock import patch
+
+from cms.djangoapps.import_from_modulestore.data import ImportStatus
+from cms.djangoapps.import_from_modulestore.tasks import (
+ import_staged_content_to_library_task,
+ save_legacy_content_to_staged_content_task,
+)
+from openedx.core.djangoapps.content_libraries import api as content_libraries_api
+from openedx.core.djangoapps.content_libraries.api import ContentLibrary
+from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
+from xmodule.modulestore.tests.factories import CourseFactory, BlockFactory
+
+from .factories import ImportFactory
+
+
+class ImportCourseToLibraryMixin(ModuleStoreTestCase):
+ """
+ Mixin for setting up data for tests.
+ """
+
+ def setUp(self):
+ super().setUp()
+
+ self.library = content_libraries_api.create_library(
+ org=Organization.objects.create(name='Organization 1', short_name='org1'),
+ slug='lib_1',
+ title='Library Org 1',
+ description='This is a library from Org 1',
+ )
+ self.content_library = ContentLibrary.objects.get_by_key(self.library.key)
+
+ self.course = CourseFactory.create()
+ self.chapter = BlockFactory.create(category='chapter', parent=self.course, display_name='Chapter 1')
+ self.sequential = BlockFactory.create(category='sequential', parent=self.chapter, display_name='Sequential 1')
+ self.vertical = BlockFactory.create(category='vertical', parent=self.sequential, display_name='Vertical 1')
+ self.video = BlockFactory.create(category='video', parent=self.vertical, display_name='Video 1')
+ self.problem = BlockFactory.create(category='problem', parent=self.vertical, display_name='Problem 1')
+
+ # self.course2 = CourseFactory.create()
+ # self.chapter2 = BlockFactory.create(category='chapter', parent=self.course, display_name='Chapter 2')
+ self.chapter2 = BlockFactory.create(category='chapter', parent=self.course, display_name='Chapter 2')
+ self.sequential2 = BlockFactory.create(category='sequential', parent=self.chapter2, display_name='Sequential 2')
+ self.vertical2 = BlockFactory.create(category='vertical', parent=self.sequential2, display_name='Vertical 2')
+ self.video2 = BlockFactory.create(category='video', parent=self.vertical2, display_name='Video 2')
+ self.problem2 = BlockFactory.create(category='problem', parent=self.vertical2, display_name='Problem 2')
+
+ self.import_event = ImportFactory(source_key=self.course.id)
+ self.user = self.import_event.user
+
+
+class TestSaveCourseSectionsToStagedContentTask(ImportCourseToLibraryMixin):
+ """
+ Test cases for save_course_sections_to_staged_content_task.
+ """
+
+ def test_save_legacy_content_to_staged_content_task(self):
+ """
+ End-to-end test for save_legacy_content_to_staged_content_task.
+ """
+ course_chapters_to_import = [self.chapter, self.chapter2]
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+
+ self.import_event.refresh_from_db()
+ self.assertEqual(self.import_event.staged_content_for_import.count(), len(course_chapters_to_import))
+ self.assertEqual(self.import_event.status, ImportStatus.STAGED)
+
+ def test_old_staged_content_deletion_before_save_new(self):
+ """ Checking that repeated saving of the same content does not create duplicates. """
+ course_chapters_to_import = [self.chapter, self.chapter2]
+
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+
+ self.assertEqual(self.import_event.staged_content_for_import.count(), len(course_chapters_to_import))
+
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+
+ self.assertEqual(self.import_event.staged_content_for_import.count(), len(course_chapters_to_import))
+
+
+class TestImportLibraryFromStagedContentTask(ImportCourseToLibraryMixin):
+ """
+ Test cases for import_staged_content_to_library_task.
+ """
+
+ def _is_imported(self, library, xblock):
+ library_learning_package = LearningPackage.objects.get(id=library.learning_package_id)
+ self.assertTrue(library_learning_package.content_set.filter(text__icontains=xblock.display_name).exists())
+
+ def test_import_staged_content_to_library_task(self):
+ """ End-to-end test for import_staged_content_to_library_task. """
+ library_learning_package = LearningPackage.objects.get(id=self.library.learning_package_id)
+ self.assertEqual(library_learning_package.content_set.count(), 0)
+ expected_imported_xblocks = [self.problem, self.problem2, self.video, self.video2]
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+
+ import_staged_content_to_library_task(
+ [str(self.chapter.location), str(self.chapter2.location)],
+ self.import_event.uuid,
+ self.content_library.learning_package.id,
+ self.user.id,
+ 'component',
+ override=True
+ )
+
+ self.import_event.refresh_from_db()
+ self.assertEqual(self.import_event.status, ImportStatus.IMPORTED)
+
+ for xblock in expected_imported_xblocks:
+ self._is_imported(self.library, xblock)
+
+ library_learning_package.refresh_from_db()
+ self.assertEqual(library_learning_package.content_set.count(), len(expected_imported_xblocks))
+ self.assertEqual(self.import_event.publishableentityimport_set.count(), len(expected_imported_xblocks))
+
+ @patch('cms.djangoapps.import_from_modulestore.tasks.import_from_staged_content')
+ def test_import_library_block_not_found(self, mock_import_from_staged_content):
+ """ Test that if a block is not found in the staged content, it is not imported. """
+ non_existent_usage_ids = ['block-v1:edX+Demo+2023+type@vertical+block@12345']
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+ with self.allow_transaction_exception():
+ with self.assertRaises(ObjectDoesNotExist):
+ import_staged_content_to_library_task(
+ non_existent_usage_ids,
+ str(self.import_event.uuid),
+ self.content_library.learning_package.id,
+ self.user.id,
+ 'component',
+ override=True,
+ )
+ mock_import_from_staged_content.assert_not_called()
+
+ def test_cannot_import_staged_content_twice(self):
+ """
+ Tests if after importing staged content into the library,
+ the staged content is deleted and cannot be imported again.
+ """
+ chapters_to_import = [self.chapter, self.chapter2]
+ expected_imported_xblocks = [self.problem, self.video]
+ save_legacy_content_to_staged_content_task(self.import_event.uuid)
+
+ self.import_event.refresh_from_db()
+ self.assertEqual(self.import_event.staged_content_for_import.count(), len(chapters_to_import))
+ self.assertEqual(self.import_event.status, ImportStatus.STAGED)
+
+ import_staged_content_to_library_task(
+ [str(self.chapter.location)],
+ str(self.import_event.uuid),
+ self.content_library.learning_package.id,
+ self.user.id,
+ 'component',
+ override=True,
+ )
+
+ for xblock in expected_imported_xblocks:
+ self._is_imported(self.library, xblock)
+
+ library_learning_package = LearningPackage.objects.get(id=self.library.learning_package_id)
+ self.assertEqual(library_learning_package.content_set.count(), len(expected_imported_xblocks))
+
+ self.import_event.refresh_from_db()
+ self.assertEqual(self.import_event.status, ImportStatus.IMPORTED)
+ self.assertTrue(not self.import_event.staged_content_for_import.exists())
+ self.assertEqual(self.import_event.publishableentityimport_set.count(), len(expected_imported_xblocks))
diff --git a/cms/djangoapps/import_from_modulestore/tests/test_validators.py b/cms/djangoapps/import_from_modulestore/tests/test_validators.py
new file mode 100644
index 000000000000..961d1cc81ba7
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/tests/test_validators.py
@@ -0,0 +1,31 @@
+"""
+Tests for import_from_modulestore validators
+"""
+
+from typing import get_args
+
+from django.test import TestCase
+import pytest
+
+from cms.djangoapps.import_from_modulestore.validators import (
+ validate_composition_level,
+)
+from cms.djangoapps.import_from_modulestore.data import CompositionLevel
+
+
+class TestValidateCompositionLevel(TestCase):
+ """
+ Test cases for validate_composition_level function.
+ Case 1: Valid composition level
+ Case 2: Invalid composition level
+ """
+
+ def test_valid_composition_level(self):
+ for level in get_args(CompositionLevel):
+ # Should not raise an exception for valid levels
+ validate_composition_level(level)
+
+ def test_invalid_composition_level(self):
+ with pytest.raises(ValueError) as exc:
+ validate_composition_level('invalid_composition_level')
+ assert 'Invalid composition level: invalid_composition_level' in str(exc.value)
diff --git a/cms/djangoapps/import_from_modulestore/validators.py b/cms/djangoapps/import_from_modulestore/validators.py
new file mode 100644
index 000000000000..ecf0c72e08e8
--- /dev/null
+++ b/cms/djangoapps/import_from_modulestore/validators.py
@@ -0,0 +1,26 @@
+"""
+Validators for the import_from_modulestore app.
+"""
+from typing import Sequence
+
+from opaque_keys.edx.keys import UsageKey
+
+from .data import CompositionLevel
+
+
+def validate_usage_keys_to_import(usage_keys: Sequence[str | UsageKey]):
+ """
+ Validate the usage keys to import.
+
+ Currently, supports importing from the modulestore only by chapters.
+ """
+ for usage_key in usage_keys:
+ if isinstance(usage_key, str):
+ usage_key = UsageKey.from_string(usage_key)
+ if usage_key.block_type != 'chapter':
+ raise ValueError(f'Importing from modulestore only supports chapters, not {usage_key.block_type}')
+
+
+def validate_composition_level(composition_level):
+ if composition_level not in CompositionLevel.values():
+ raise ValueError(f'Invalid composition level: {composition_level}')
diff --git a/mypy.ini b/mypy.ini
index 40f58ff5c411..83255c1a5af1 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -8,6 +8,7 @@ plugins =
files =
cms/lib/xblock/upstream_sync.py,
cms/djangoapps/contentstore/rest_api/v2/views/downstreams.py,
+ cms/djangoapps/import_from_modulestore,
openedx/core/djangoapps/content/learning_sequences,
# FIXME: need to solve type issues and add 'search' app here:
# openedx/core/djangoapps/content/search,
diff --git a/openedx/core/djangoapps/content_staging/api.py b/openedx/core/djangoapps/content_staging/api.py
index 7f8167d9aadd..7baae10baed4 100644
--- a/openedx/core/djangoapps/content_staging/api.py
+++ b/openedx/core/djangoapps/content_staging/api.py
@@ -53,17 +53,19 @@ def _save_xblock_to_staged_content(
expired_ids = []
with transaction.atomic():
- # Mark all of the user's existing StagedContent rows as EXPIRED
- to_expire = _StagedContent.objects.filter(
- user_id=user_id,
- purpose=purpose,
- ).exclude(
- status=StagedContentStatus.EXPIRED,
- )
- for sc in to_expire:
- expired_ids.append(sc.id)
- sc.status = StagedContentStatus.EXPIRED
- sc.save()
+ if purpose == CLIPBOARD_PURPOSE:
+ # Mark all of the user's existing StagedContent rows as EXPIRED
+ to_expire = _StagedContent.objects.filter(
+ user_id=user_id,
+ purpose=purpose,
+ ).exclude(
+ status=StagedContentStatus.EXPIRED,
+ )
+ for sc in to_expire:
+ expired_ids.append(sc.id)
+ sc.status = StagedContentStatus.EXPIRED
+ sc.save()
+
# Insert a new StagedContent row for this
staged_content = _StagedContent.objects.create(
user_id=user_id,