Skip to content
This repository has been archived by the owner on Sep 20, 2024. It is now read-only.

Webpublisher: Added endpoint to reprocess batch through UI #2555

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
parse_json,
get_batch_asset_task_info
)
from openpype.lib.remote_publish import get_webpublish_conn
from openpype.lib.remote_publish import get_webpublish_conn, IN_PROGRESS_STATUS


class CollectBatchData(pyblish.api.ContextPlugin):
Expand Down Expand Up @@ -74,7 +74,7 @@ def _set_ctx_path(self, batch_data):
dbcon.update_one(
{
"batch_id": batch_id,
"status": "in_progress"
"status": IN_PROGRESS_STATUS
},
{
"$set": {
Expand Down
69 changes: 57 additions & 12 deletions openpype/hosts/webpublisher/webserver_service/webpublish_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

from openpype.lib import OpenPypeMongoConnection
from openpype_modules.avalon_apps.rest_api import _RestApiEndpoint
from openpype.lib.remote_publish import get_task_data
from openpype.settings import get_project_settings

from openpype.lib import PypeLogger
from openpype.lib.remote_publish import (
get_task_data,
ERROR_STATUS,
REPROCESS_STATUS
)

log = PypeLogger.get_logger("WebServer")

Expand Down Expand Up @@ -61,7 +65,7 @@ def __init__(self, ):
self.dbcon = mongo_client[database_name]["webpublishes"]


class WebpublisherProjectsEndpoint(_RestApiEndpoint):
class ProjectsEndpoint(_RestApiEndpoint):
"""Returns list of dict with project info (id, name)."""
async def get(self) -> Response:
output = []
Expand All @@ -82,7 +86,7 @@ async def get(self) -> Response:
)


class WebpublisherHiearchyEndpoint(_RestApiEndpoint):
class HiearchyEndpoint(_RestApiEndpoint):
"""Returns dictionary with context tree from assets."""
async def get(self, project_name) -> Response:
query_projection = {
Expand Down Expand Up @@ -181,7 +185,7 @@ def __init__(self, node_type, name):
self["attributes"] = {}


class WebpublisherBatchPublishEndpoint(_RestApiEndpoint):
class BatchPublishEndpoint(_RestApiEndpoint):
"""Triggers headless publishing of batch."""
async def post(self, request) -> Response:
# Validate existence of openpype executable
Expand All @@ -190,7 +194,7 @@ async def post(self, request) -> Response:
msg = "Non existent OpenPype executable {}".format(openpype_app)
raise RuntimeError(msg)

log.info("WebpublisherBatchPublishEndpoint called")
log.info("BatchPublishEndpoint called")
content = await request.json()

# Each filter have extensions which are checked on first task item
Expand Down Expand Up @@ -286,7 +290,7 @@ async def post(self, request) -> Response:
)


class WebpublisherTaskPublishEndpoint(_RestApiEndpoint):
class TaskPublishEndpoint(_RestApiEndpoint):
"""Prepared endpoint triggered after each task - for future development."""
async def post(self, request) -> Response:
return Response(
Expand All @@ -301,21 +305,37 @@ class BatchStatusEndpoint(_RestApiEndpoint):
async def get(self, batch_id) -> Response:
output = self.dbcon.find_one({"batch_id": batch_id})

if output:
status = 200
else:
output = {"msg": "Batch id {} not found".format(batch_id),
"status": "queued",
"progress": 0}
status = 404
body = self.resource.encode(output)
return Response(
status=200,
body=self.resource.encode(output),
status=status,
body=body,
content_type="application/json"
)


class PublishesStatusEndpoint(_RestApiEndpoint):
class UserReportEndpoint(_RestApiEndpoint):
"""Returns list of dict with batch info for user (email address)."""
async def get(self, user) -> Response:
output = list(self.dbcon.find({"user": user}))
output = list(self.dbcon.find({"user": user},
projection={"log": False}))

if output:
status = 200
else:
output = {"msg": "User {} not found".format(user)}
status = 404
body = self.resource.encode(output)

return Response(
status=200,
body=self.resource.encode(output),
status=status,
body=body,
content_type="application/json"
)

Expand Down Expand Up @@ -351,3 +371,28 @@ async def get(self, project_name=None) -> Response:
body=self.resource.encode(dict(configured)),
content_type="application/json"
)


class BatchReprocessEndpoint(_RestApiEndpoint):
"""Marks latest 'batch_id' for reprocessing, returns 404 if not found."""
async def post(self, batch_id) -> Response:
batches = self.dbcon.find({"batch_id": batch_id,
"status": ERROR_STATUS}).sort("_id", -1)

if batches:
self.dbcon.update_one(
{"_id": batches[0]["_id"]},
{"$set": {"status": REPROCESS_STATUS}}
)
output = [{"msg": "Batch id {} set to reprocess".format(batch_id)}]
status = 200
else:
output = [{"msg": "Batch id {} not found".format(batch_id)}]
status = 404
body = self.resource.encode(output)

return Response(
status=status,
body=body,
content_type="application/json"
)
68 changes: 46 additions & 22 deletions openpype/hosts/webpublisher/webserver_service/webserver_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
from .webpublish_routes import (
RestApiResource,
OpenPypeRestApiResource,
WebpublisherBatchPublishEndpoint,
WebpublisherTaskPublishEndpoint,
WebpublisherHiearchyEndpoint,
WebpublisherProjectsEndpoint,
HiearchyEndpoint,
ProjectsEndpoint,
ConfiguredExtensionsEndpoint,
BatchPublishEndpoint,
BatchReprocessEndpoint,
BatchStatusEndpoint,
PublishesStatusEndpoint,
ConfiguredExtensionsEndpoint
TaskPublishEndpoint,
UserReportEndpoint
)
from openpype.lib.remote_publish import (
ERROR_STATUS,
REPROCESS_STATUS,
SENT_REPROCESSING_STATUS
)


Expand All @@ -41,14 +47,14 @@ def run_webserver(*args, **kwargs):
upload_dir=kwargs["upload_dir"],
executable=kwargs["executable"],
studio_task_queue=studio_task_queue)
projects_endpoint = WebpublisherProjectsEndpoint(resource)
projects_endpoint = ProjectsEndpoint(resource)
server_manager.add_route(
"GET",
"/api/projects",
projects_endpoint.dispatch
)

hiearchy_endpoint = WebpublisherHiearchyEndpoint(resource)
hiearchy_endpoint = HiearchyEndpoint(resource)
server_manager.add_route(
"GET",
"/api/hierarchy/{project_name}",
Expand All @@ -64,15 +70,15 @@ def run_webserver(*args, **kwargs):

# triggers publish
webpublisher_task_publish_endpoint = \
WebpublisherBatchPublishEndpoint(resource)
BatchPublishEndpoint(resource)
server_manager.add_route(
"POST",
"/api/webpublish/batch",
webpublisher_task_publish_endpoint.dispatch
)

webpublisher_batch_publish_endpoint = \
WebpublisherTaskPublishEndpoint(resource)
TaskPublishEndpoint(resource)
server_manager.add_route(
"POST",
"/api/webpublish/task",
Expand All @@ -88,13 +94,21 @@ def run_webserver(*args, **kwargs):
batch_status_endpoint.dispatch
)

user_status_endpoint = PublishesStatusEndpoint(openpype_resource)
user_status_endpoint = UserReportEndpoint(openpype_resource)
server_manager.add_route(
"GET",
"/api/publishes/{user}",
user_status_endpoint.dispatch
)

webpublisher_batch_reprocess_endpoint = \
BatchReprocessEndpoint(openpype_resource)
server_manager.add_route(
"POST",
"/api/webpublish/reprocess/{batch_id}",
webpublisher_batch_reprocess_endpoint.dispatch
)

server_manager.start_server()
last_reprocessed = time.time()
while True:
Expand All @@ -116,8 +130,12 @@ def reprocess_failed(upload_dir, webserver_url):
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
dbcon = mongo_client[database_name]["webpublishes"]

results = dbcon.find({"status": "reprocess"})
results = dbcon.find({"status": REPROCESS_STATUS})
reprocessed_batches = set()
for batch in results:
if batch["batch_id"] in reprocessed_batches:
continue

batch_url = os.path.join(upload_dir,
batch["batch_id"],
"manifest.json")
Expand All @@ -130,7 +148,7 @@ def reprocess_failed(upload_dir, webserver_url):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"progress": 100,
"log": batch.get("log") + msg
}}
Expand All @@ -141,18 +159,24 @@ def reprocess_failed(upload_dir, webserver_url):
with open(batch_url) as f:
data = json.loads(f.read())

dbcon.update_many(
{
"batch_id": batch["batch_id"],
"status": {"$in": [ERROR_STATUS, REPROCESS_STATUS]}
},
{
"$set": {
"finish_date": datetime.now(),
"status": SENT_REPROCESSING_STATUS,
"progress": 100
}
}
)

try:
r = requests.post(server_url, json=data)
log.info("response{}".format(r))
except Exception:
log.info("exception", exc_info=True)

dbcon.update_one(
{"_id": batch["_id"]},
{"$set":
{
"finish_date": datetime.now(),
"status": "sent_for_reprocessing",
"progress": 100
}}
)
reprocessed_batches.add(batch["batch_id"])
48 changes: 35 additions & 13 deletions openpype/lib/remote_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
from openpype.lib.mongo import OpenPypeMongoConnection
from openpype.lib.plugin_tools import parse_json

ERROR_STATUS = "error"
IN_PROGRESS_STATUS = "in_progress"
REPROCESS_STATUS = "reprocess"
SENT_REPROCESSING_STATUS = "sent_for_reprocessing"
FINISHED_REPROCESS_STATUS = "republishing_finished"
FINISHED_OK_STATUS = "finished_ok"


def headless_publish(log, close_plugin_name=None, is_test=False):
"""Runs publish in a opened host with a context and closes Python process.
Expand All @@ -26,7 +33,7 @@ def headless_publish(log, close_plugin_name=None, is_test=False):
"batch will be unfinished!")
return

publish_and_log(dbcon, _id, log, close_plugin_name)
publish_and_log(dbcon, _id, log, close_plugin_name=close_plugin_name)
else:
publish(log, close_plugin_name)

Expand All @@ -52,7 +59,7 @@ def start_webpublish_log(dbcon, batch_id, user):
"batch_id": batch_id,
"start_date": datetime.now(),
"user": user,
"status": "in_progress",
"status": IN_PROGRESS_STATUS,
"progress": 0 # integer 0-100, percentage
}).inserted_id

Expand Down Expand Up @@ -84,13 +91,14 @@ def publish(log, close_plugin_name=None):
sys.exit(1)


def publish_and_log(dbcon, _id, log, close_plugin_name=None):
def publish_and_log(dbcon, _id, log, close_plugin_name=None, batch_id=None):
"""Loops through all plugins, logs ok and fails into OP DB.

Args:
dbcon (OpenPypeMongoConnection)
_id (str)
_id (str) - id of current job in DB
log (OpenPypeLogger)
batch_id (str) - id sent from frontend
close_plugin_name (str): name of plugin with responsibility to
close host app
"""
Expand Down Expand Up @@ -121,7 +129,7 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"log": os.linesep.join(log_lines)

}}
Expand All @@ -143,15 +151,29 @@ def publish_and_log(dbcon, _id, log, close_plugin_name=None):
)

# final update
if batch_id:
dbcon.update_many(
{"batch_id": batch_id, "status": SENT_REPROCESSING_STATUS},
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_REPROCESS_STATUS,
}
}
)

dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "finished_ok",
"progress": 100,
"log": os.linesep.join(log_lines)
}}
{
"$set":
{
"finish_date": datetime.now(),
"status": FINISHED_OK_STATUS,
"progress": 100,
"log": os.linesep.join(log_lines)
}
}
)


Expand All @@ -168,7 +190,7 @@ def fail_batch(_id, batches_in_progress, dbcon):
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"status": ERROR_STATUS,
"log": msg

}}
Expand Down
Loading