Skip to content

Commit

Permalink
refactor: centralize usage of generate flow metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotzh committed Mar 13, 2024
1 parent a97a409 commit 6493895
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from promptflow._sdk._utils import overwrite_null_std_logger
from promptflow._sdk.entities import Run
from promptflow._sdk.entities._experiment import Experiment, ExperimentTemplate
from promptflow._sdk.entities._flow import ProtectedFlow
from promptflow._sdk.entities._utils import get_flow_definition
from promptflow._sdk.operations import RunOperations
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
from promptflow._utils.inputs_mapping_utils import apply_inputs_mapping
Expand Down Expand Up @@ -101,8 +101,7 @@ def test(
start_nodes = [
node
for node in template.nodes
if node.type == ExperimentNodeType.FLOW
and ProtectedFlow._get_flow_definition(node.path) == ProtectedFlow._get_flow_definition(flow_path)
if node.type == ExperimentNodeType.FLOW and get_flow_definition(node.path) == get_flow_definition(flow_path)
]
if not start_nodes:
raise ExperimentValueError(f"Flow {flow_path.as_posix()} not found in experiment {template.dir_name!r}.")
Expand Down
8 changes: 5 additions & 3 deletions src/promptflow/promptflow/_sdk/_submitter/run_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ def _submit_bulk_run(
) -> dict:
logger.info(f"Submitting run {run.name}, log path: {local_storage.logger.file_path}")
run_id = run.name
if flow.language == FlowLanguage.CSharp:
if flow.language != FlowLanguage.Python:
# TODO: consider moving this to Operations
from promptflow.batch import CSharpExecutorProxy
from promptflow.batch._executor_proxy_factory import ExecutorProxyFactory

CSharpExecutorProxy.generate_metadata(flow_file=Path(flow.path), working_dir=Path(flow.code))
ExecutorProxyFactory().get_executor_proxy_cls(flow.language).generate_flow_metadata(
flow_file=Path(flow.path), working_dir=Path(flow.code), dump=True
)
# TODO: shall we resolve connections here?
connections = []
else:
Expand Down
12 changes: 9 additions & 3 deletions src/promptflow/promptflow/_sdk/_submitter/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..._utils.dataclass_serializer import convert_eager_flow_output_to_dict
from ..._utils.logger_utils import get_cli_sdk_logger
from ...batch import APIBasedExecutorProxy, CSharpExecutorProxy
from ...batch._executor_proxy_factory import ExecutorProxyFactory
from .._configuration import Configuration
from ..entities._eager_flow import EagerFlow
from .utils import (
Expand Down Expand Up @@ -171,7 +172,7 @@ def _resolve_connections(cls, flow: FlowBase, client):

return SubmitterHelper.resolve_used_connections(
flow=flow,
tools_meta=CSharpExecutorProxy.get_tool_metadata(
tools_meta=CSharpExecutorProxy.generate_tool_metadata(
flow_file=flow.flow_dag_path,
working_dir=flow.code,
),
Expand Down Expand Up @@ -241,9 +242,14 @@ def init(
# temp flow is generated, will use self.flow instead of self._origin_flow in the following context
self._within_init_context = True

if self.flow.language == FlowLanguage.CSharp:
if self.flow.language != FlowLanguage.Python:
ExecutorProxyFactory().get_executor_proxy_cls(self.flow.language).generate_flow_metadata(
flow_file=self.flow.path,
working_dir=self.flow.code,
dump=True,
)
# TODO: consider move this to Operations
CSharpExecutorProxy.generate_metadata(self.flow.path, self.flow.code)
CSharpExecutorProxy.generate_flow_metadata(self.flow.path, self.flow.code)

self._target_node = target_node
self._enable_stream_output = stream_output
Expand Down
20 changes: 11 additions & 9 deletions src/promptflow/promptflow/_sdk/entities/_eager_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from promptflow._constants import LANGUAGE_KEY, FlowLanguage
from promptflow._sdk._constants import BASE_PATH_CONTEXT_KEY
from promptflow._sdk._utils import generate_flow_meta
from promptflow._sdk.entities._flow import FlowBase
from promptflow._sdk.entities._validation import SchemaValidatableMixin
from promptflow.exceptions import ErrorTarget, UserErrorException
Expand Down Expand Up @@ -97,16 +96,19 @@ def _resolve_entry_file(cls, entry: str, working_dir: Path) -> Optional[str]:
# when entry file not found in working directory, return None since it can come from package
return None

# TODO: no usage? On the other hand, according to our latest design, we'd better avoid touching Executable
# everywhere in devkit.
def _init_executable(self, **kwargs):
# TODO(2991934): support environment variables here
from promptflow.batch._executor_proxy_factory import ExecutorProxyFactory
from promptflow.contracts.flow import EagerFlow as ExecutableEagerFlow

# TODO(2991934): support environment variables here
meta_dict = generate_flow_meta(
flow_directory=self.code,
source_path=self.entry_file,
entry=self.entry,
dump=False,
meta_dict = (
ExecutorProxyFactory()
.get_executor_proxy_cls(self.language)
.generate_flow_metadata(
# TODO: is it possible that there is no path?
flow_file=self.path,
working_dir=self.code,
dump=False,
)
)
return ExecutableEagerFlow.deserialize(meta_dict)
27 changes: 4 additions & 23 deletions src/promptflow/promptflow/_sdk/entities/_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@
import json
from os import PathLike
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
from typing import Dict, Optional, Union

from marshmallow import Schema

from promptflow._constants import LANGUAGE_KEY, FlowLanguage
from promptflow._sdk._constants import (
BASE_PATH_CONTEXT_KEY,
DAG_FILE_NAME,
DEFAULT_ENCODING,
FLOW_TOOLS_JSON,
PROMPT_FLOW_DIR_NAME,
)
from promptflow._sdk._constants import BASE_PATH_CONTEXT_KEY, DEFAULT_ENCODING, FLOW_TOOLS_JSON, PROMPT_FLOW_DIR_NAME
from promptflow._sdk.entities._connection import _Connection
from promptflow._sdk.entities._utils import get_flow_definition
from promptflow._sdk.entities._validation import SchemaValidatableMixin
from promptflow._utils.flow_utils import resolve_flow_path
from promptflow._utils.logger_utils import get_cli_sdk_logger
Expand Down Expand Up @@ -247,7 +242,7 @@ def __init__(
):
super().__init__(path=path, code=code, dag=dag, **kwargs)

self._flow_dir, self._dag_file_name = self._get_flow_definition(self.code)
self._flow_dir, self._dag_file_name = get_flow_definition(self.code)
self._executable = None
self._params_override = params_override

Expand All @@ -273,20 +268,6 @@ def tools_meta_path(self) -> Path:
target_path.parent.mkdir(parents=True, exist_ok=True)
return target_path

@classmethod
def _get_flow_definition(cls, flow, base_path=None) -> Tuple[Path, str]:
if base_path:
flow_path = Path(base_path) / flow
else:
flow_path = Path(flow)

if flow_path.is_dir() and (flow_path / DAG_FILE_NAME).is_file():
return flow_path, DAG_FILE_NAME
elif flow_path.is_file():
return flow_path.parent, flow_path.name

raise ValueError(f"Can't find flow with path {flow_path.as_posix()}.")

# region SchemaValidatableMixin
@classmethod
def _create_schema_for_validation(cls, context) -> Schema:
Expand Down
41 changes: 41 additions & 0 deletions src/promptflow/promptflow/_sdk/entities/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

"""
Previously we have put some shared logic for flow in ProtectedFlow class. However, in practice,
it's hard to guarantee that we always use this class to create flow; on the other hand, the more
"""
from os import PathLike
from pathlib import Path
from typing import Tuple, Union

from promptflow._sdk._constants import DAG_FILE_NAME


def get_flow_definition(
flow_path: Union[str, Path, PathLike], base_path: Union[str, Path, PathLike, None] = None
) -> Tuple[Path, str]:
"""Resolve flow path and return the flow directory path and the file name of the target yaml.
:param flow_path: The path of the flow directory or the flow yaml file. It can either point to a
flow directory or a flow yaml file.
:type flow_path: Union[str, Path, PathLike]
:param base_path: The base path to resolve the flow path. If not specified, the flow path will be
resolved based on the current working directory.
:type base_path: Union[str, Path, PathLike]
:return: The flow directory path and the file name of the target yaml.
:rtype: Tuple[Path, str]
"""
if base_path:
flow_path = Path(base_path) / flow_path
else:
flow_path = Path(flow_path)

if flow_path.is_dir() and (flow_path / DAG_FILE_NAME).is_file():
return flow_path, DAG_FILE_NAME
elif flow_path.is_file():
return flow_path.parent, flow_path.name

raise ValueError(f"Can't find flow with path {flow_path.as_posix()}.")
24 changes: 14 additions & 10 deletions src/promptflow/promptflow/_sdk/operations/_flow_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
_merge_local_code_and_additional_includes,
copy_tree_respect_template_and_ignore_file,
dump_flow_result,
generate_flow_meta,
generate_flow_tools_json,
generate_random_string,
logger,
Expand Down Expand Up @@ -410,7 +409,7 @@ def _export_flow_connections(

return self._migrate_connections(
connection_names=SubmitterHelper.get_used_connection_names(
tools_meta=CSharpExecutorProxy.get_tool_metadata(
tools_meta=CSharpExecutorProxy.generate_tool_metadata(
flow_file=flow.flow_dag_path,
working_dir=flow.code,
),
Expand Down Expand Up @@ -835,13 +834,18 @@ def _generate_flow_meta(
# No flow meta for DAG flow
return {}

# TODO: is it possible that there is no flow.path?
with self._resolve_additional_includes(flow.path) as new_flow_dag_path:
# TODO: support generate flow meta for csharp?
return generate_flow_meta(
flow_directory=new_flow_dag_path.parent,
source_path=flow.entry_file,
entry=flow.entry,
dump=dump,
timeout=timeout,
load_in_subprocess=load_in_subprocess,
from promptflow.batch._executor_proxy_factory import ExecutorProxyFactory

return (
ExecutorProxyFactory()
.get_executor_proxy_cls(flow.language)
.generate_flow_metadata(
flow_file=new_flow_dag_path,
working_dir=new_flow_dag_path.parent,
dump=dump,
timeout=timeout,
load_in_subprocess=load_in_subprocess,
)
)
23 changes: 7 additions & 16 deletions src/promptflow/promptflow/azure/operations/_flow_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
from promptflow._sdk._telemetry import ActivityType, WorkspaceTelemetryMixin, monitor_operation
from promptflow._sdk._utils import PromptflowIgnoreFile
from promptflow._sdk._vendor._asset_utils import traverse_directory
from promptflow._sdk.entities._utils import get_flow_definition
from promptflow._utils.logger_utils import get_cli_sdk_logger
from promptflow.azure._constants._flow import DEFAULT_STORAGE
from promptflow.azure._entities._flow import Flow
from promptflow.azure._load_functions import load_flow
from promptflow.azure._restclient.flow_service_caller import FlowServiceCaller
from promptflow.azure.operations._artifact_utilities import _get_datastore_name, get_datastore_info
from promptflow.azure.operations._fileshare_storeage_helper import FlowFileStorageClient
from promptflow.batch._executor_proxy_factory import ExecutorProxyFactory
from promptflow.exceptions import SystemErrorException, UserErrorException

logger = get_cli_sdk_logger()
Expand Down Expand Up @@ -482,7 +484,11 @@ def _try_resolve_code_for_flow(cls, flow: Flow, ops: OperationOrchestrator, igno
return

# generate .promptflow/flow.json for eager flow
cls._generate_meta_for_eager_flow(code=code)
flow_directory, flow_file = get_flow_definition(code.path)
ExecutorProxyFactory().get_executor_proxy_cls(flow.language).generate_flow_metadata(
flow_file=flow_directory / flow_file,
working_dir=flow_directory,
)

if ignore_tools_json:
ignore_file = code._ignore_file
Expand Down Expand Up @@ -599,18 +605,3 @@ def _try_resolve_code_for_flow_to_file_share(cls, flow: Flow, ops: OperationOrch
flow._code_uploaded = True

# endregion

@classmethod
def _generate_meta_for_eager_flow(cls, code):
from promptflow import load_flow as load_local_flow
from promptflow._sdk.entities._eager_flow import EagerFlow

flow = load_local_flow(code.path)
if isinstance(flow, EagerFlow):
from promptflow.batch._executor_proxy_factory import ExecutorProxyFactory

ExecutorProxyFactory().get_executor_proxy_cls(flow.language).generate_metadata(
flow_file=code.path,
# TODO: may need to resolve additional includes after supported
working_dir=code.path,
)
Loading

0 comments on commit 6493895

Please sign in to comment.