Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate core logic of importchannel to a utility function and update associated tasks #13099

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kolibri/core/content/constants/transfer_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DOWNLOAD_METHOD = "download"
COPY_METHOD = "copy"
191 changes: 12 additions & 179 deletions kolibri/core/content/management/commands/importchannel.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,16 @@
import logging
import os

from django.core.management.base import CommandError
from le_utils.constants import content_kinds

from ...utils import channel_import
from ...utils import paths
from ...utils.annotation import update_content_metadata
from kolibri.core.content.models import ContentNode
from kolibri.core.content.utils.importability_annotation import clear_channel_stats
from kolibri.core.device.models import ContentCacheKey
from kolibri.core.errors import KolibriUpgradeError
from kolibri.core.content.constants.transfer_types import COPY_METHOD
from kolibri.core.content.constants.transfer_types import DOWNLOAD_METHOD
from kolibri.core.content.utils.channel_transfer import transfer_channel
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.utils import conf
from kolibri.utils import file_transfer as transfer

logger = logging.getLogger(__name__)

# constants to specify the transfer method to be used
DOWNLOAD_METHOD = "download"
COPY_METHOD = "copy"


def import_channel_by_id(channel_id, cancel_check, contentfolder=None):
try:
return channel_import.import_channel_from_local_db(
channel_id, cancel_check=cancel_check, contentfolder=contentfolder
)
except channel_import.InvalidSchemaVersionError:
raise CommandError(
"Database file had an invalid database schema, the file may be corrupted or have been modified."
)
except channel_import.FutureSchemaError:
raise KolibriUpgradeError(
"Database file uses a future database schema that this version of Kolibri does not support."
)


class Command(AsyncCommand):
def add_arguments(self, parser):
Expand Down Expand Up @@ -103,166 +78,24 @@ def add_arguments(self, parser):

def download_channel(self, channel_id, baseurl, no_upgrade, content_dir):
logger.info("Downloading data for channel id {}".format(channel_id))
self._transfer(
DOWNLOAD_METHOD,
channel_id,
baseurl,
transfer_channel(
channel_id=channel_id,
method=DOWNLOAD_METHOD,
no_upgrade=no_upgrade,
content_dir=content_dir,
baseurl=baseurl,
)

def copy_channel(self, channel_id, path, no_upgrade, content_dir):
def copy_channel(self, channel_id, source_path, no_upgrade, content_dir):
logger.info("Copying in data for channel id {}".format(channel_id))
self._transfer(
COPY_METHOD,
channel_id,
path=path,
transfer_channel(
channel_id=channel_id,
method=COPY_METHOD,
no_upgrade=no_upgrade,
content_dir=content_dir,
source_path=source_path,
)

def _transfer(
self,
method,
channel_id,
baseurl=None,
path=None,
no_upgrade=False,
content_dir=None,
):

new_channel_dest = paths.get_upgrade_content_database_file_path(
channel_id, contentfolder=content_dir
)
dest = (
new_channel_dest
if no_upgrade
else paths.get_content_database_file_path(
channel_id, contentfolder=content_dir
)
)

# if new channel version db has previously been downloaded, just copy it over
if os.path.exists(new_channel_dest) and not no_upgrade:
method = COPY_METHOD
# determine where we're downloading/copying from, and create appropriate transfer object
if method == DOWNLOAD_METHOD:
url = paths.get_content_database_file_url(channel_id, baseurl=baseurl)
logger.debug("URL to fetch: {}".format(url))
filetransfer = transfer.FileDownload(
url, dest, cancel_check=self.is_cancelled
)
elif method == COPY_METHOD:
# if there is a new channel version db, set that as source path
srcpath = (
new_channel_dest
if os.path.exists(new_channel_dest)
else paths.get_content_database_file_path(channel_id, datafolder=path)
)
filetransfer = transfer.FileCopy(
srcpath, dest, cancel_check=self.is_cancelled
)

logger.debug("Destination: {}".format(dest))

try:
self._start_file_transfer(
filetransfer,
channel_id,
dest,
no_upgrade=no_upgrade,
contentfolder=content_dir,
)
except transfer.TransferCanceled:
pass

if self.is_cancelled():
try:
os.remove(dest)
except OSError as e:
logger.info(
"Tried to remove {}, but exception {} occurred.".format(dest, e)
)
# Reraise any cancellation
self.check_for_cancel()

# if we are trying to upgrade, remove new channel db
if os.path.exists(new_channel_dest) and not no_upgrade:
os.remove(new_channel_dest)

def _start_file_transfer(
self, filetransfer, channel_id, dest, no_upgrade=False, contentfolder=None
):
progress_extra_data = {"channel_id": channel_id}

with filetransfer:
self.start_progress(total=filetransfer.transfer_size)

def progress_callback(bytes):
self.update_progress(bytes, extra_data=progress_extra_data)

filetransfer.run(progress_callback)
# if upgrading, import the channel
if not no_upgrade:
try:
# In each case we need to evaluate the queryset now,
# in order to get node ids as they currently are before
# the import. If we did not coerce each of these querysets
# to a list now, they would be lazily evaluated after the
# import, and would reflect the state of the database
# after the import.

# evaluate list so we have the current node ids
node_ids = list(
ContentNode.objects.filter(
channel_id=channel_id, available=True
)
.exclude(kind=content_kinds.TOPIC)
.values_list("id", flat=True)
)
# evaluate list so we have the current node ids
admin_imported_ids = list(
ContentNode.objects.filter(
channel_id=channel_id, available=True, admin_imported=True
)
.exclude(kind=content_kinds.TOPIC)
.values_list("id", flat=True)
)
# evaluate list so we have the current node ids
not_admin_imported_ids = list(
ContentNode.objects.filter(
channel_id=channel_id, available=True, admin_imported=False
)
.exclude(kind=content_kinds.TOPIC)
.values_list("id", flat=True)
)
import_ran = import_channel_by_id(
channel_id, self.is_cancelled, contentfolder
)
if import_ran:
if node_ids:
# annotate default channel db based on previously annotated leaf nodes
update_content_metadata(channel_id, node_ids=node_ids)
if admin_imported_ids:
# Reset admin_imported flag for nodes that were imported by admin
ContentNode.objects.filter_by_uuids(
admin_imported_ids
).update(admin_imported=True)
if not_admin_imported_ids:
# Reset admin_imported flag for nodes that were not imported by admin
ContentNode.objects.filter_by_uuids(
not_admin_imported_ids
).update(admin_imported=False)
else:
# ensure the channel is available to the frontend
ContentCacheKey.update_cache_key()

# Clear any previously set channel availability stats for this channel
clear_channel_stats(channel_id)
except channel_import.ImportCancelError:
# This will only occur if is_cancelled is True.
pass

def handle_async(self, *args, **options):
if options["command"] == "network":
self.download_channel(
Expand Down
32 changes: 7 additions & 25 deletions kolibri/core/content/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
from rest_framework import serializers

from kolibri.core.auth.models import FacilityDataset
from kolibri.core.content.constants.transfer_types import COPY_METHOD
from kolibri.core.content.constants.transfer_types import DOWNLOAD_METHOD
from kolibri.core.content.models import ChannelMetadata
from kolibri.core.content.models import ContentRequest
from kolibri.core.content.models import ContentRequestReason
from kolibri.core.content.models import ContentRequestStatus
from kolibri.core.content.models import ContentRequestType
from kolibri.core.content.utils.channel_import import import_channel_from_data
from kolibri.core.content.utils.channel_transfer import transfer_channel
from kolibri.core.content.utils.channels import get_mounted_drive_by_id
from kolibri.core.content.utils.channels import read_channel_metadata_from_db_file
from kolibri.core.content.utils.content_request import incomplete_removals_queryset
Expand Down Expand Up @@ -218,12 +221,7 @@ class RemoteChannelImportValidator(RemoteImportMixin, ChannelValidator):
status_fn=get_status,
)
def remotechannelimport(channel_id, baseurl=None, peer_id=None):
call_command(
"importchannel",
"network",
channel_id,
baseurl=baseurl,
)
transfer_channel(channel_id, DOWNLOAD_METHOD, baseurl=baseurl)


class RemoteChannelResourcesImportValidator(
Expand Down Expand Up @@ -538,13 +536,8 @@ def remoteimport(
fail_on_error=False,
all_thumbnails=False,
):
call_command(
"importchannel",
"network",
channel_id,
baseurl=baseurl,
)

transfer_channel(channel_id, DOWNLOAD_METHOD, baseurl=baseurl)
if update:
current_job = get_current_job()
current_job.update_metadata(database_ready=True)
Expand Down Expand Up @@ -587,12 +580,7 @@ def diskimport(
drive = get_mounted_drive_by_id(drive_id)
Copy link
Contributor Author

@thesujai thesujai Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is duplicated at line 594. Is it a bug or is there any purpose for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a bug, but it definitely looks unnecessary the second time. Probably just copy pasted from somewhere else.

directory = drive.datafolder

call_command(
"importchannel",
"disk",
channel_id,
directory,
)
transfer_channel(channel_id, COPY_METHOD, source_path=directory)

if update:
current_job = get_current_job()
Expand All @@ -601,7 +589,6 @@ def diskimport(
manager_class = (
DiskChannelUpdateManager if update else DiskChannelResourceImportManager
)
drive = get_mounted_drive_by_id(drive_id)
manager = manager_class(
channel_id,
path=drive.datafolder,
Expand Down Expand Up @@ -633,12 +620,7 @@ def diskchannelimport(
drive_id,
):
drive = get_mounted_drive_by_id(drive_id)
call_command(
"importchannel",
"disk",
channel_id,
drive.datafolder,
)
transfer_channel(channel_id, COPY_METHOD, source_path=drive.datafolder)


class RemoteChannelDiffStatsValidator(RemoteChannelImportValidator):
Expand Down
Loading
Loading