Skip to content

Commit

Permalink
[SDK/CLI] Add two more tokens to run properties (#3037)
Browse files Browse the repository at this point in the history
# Description

- Add `completion_tokens` and `prompt_tokens` to run properties.
- Set run status to running and set start time.

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
0mza987 authored and crazygao committed May 6, 2024
1 parent f3bc3c5 commit f952343
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 20 deletions.
22 changes: 21 additions & 1 deletion src/promptflow-azure/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,30 @@

from promptflow._constants import PROMPTFLOW_CONNECTIONS
from promptflow._core.connection_manager import ConnectionManager
from promptflow._sdk.entities._connection import AzureOpenAIConnection
from promptflow._sdk._pf_client import PFClient as LocalClient
from promptflow._sdk.entities._connection import AzureOpenAIConnection, _Connection
from promptflow._utils.context_utils import _change_working_dir

load_dotenv()
_connection_setup = False


@pytest.fixture
def local_client() -> LocalClient:
yield LocalClient()


@pytest.fixture
def setup_local_connection(local_client, azure_open_ai_connection):
global _connection_setup
if _connection_setup:
return
connection_dict = json.loads(open(CONNECTION_FILE, "r").read())
for name, _dct in connection_dict.items():
if _dct["type"] == "BingConnection":
continue
local_client.connections.create_or_update(_Connection._from_execution_connection_dict(name=name, data=_dct))
_connection_setup = True


@pytest.fixture(autouse=True, scope="session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from _constants import PROMPTFLOW_ROOT
from sdk_cli_azure_test.conftest import DATAS_DIR, FLOWS_DIR

from promptflow._sdk._constants import Local2CloudProperties, Local2CloudUserProperties, RunStatus
from promptflow._constants import TokenKeys
from promptflow._sdk._constants import FlowRunProperties, Local2CloudProperties, Local2CloudUserProperties, RunStatus
from promptflow._sdk._errors import RunNotFoundError
from promptflow._sdk._pf_client import PFClient as LocalPFClient
from promptflow._sdk.entities import Run
Expand Down Expand Up @@ -50,8 +51,10 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
assert cloud_run._start_time and cloud_run._end_time
assert cloud_run.properties["azureml.promptflow.local_to_cloud"] == "true"
assert cloud_run.properties["azureml.promptflow.snapshot_id"]
assert cloud_run.properties[Local2CloudProperties.TOTAL_TOKENS]
assert cloud_run.properties[Local2CloudProperties.EVAL_ARTIFACTS]
for token_key in TokenKeys.get_all_values():
cloud_key = f"{Local2CloudProperties.PREFIX}.{token_key}"
assert cloud_run.properties[cloud_key] == str(run.properties[FlowRunProperties.SYSTEM_METRICS][token_key])

# if no description or tags, skip the check, since one could be {} but the other is None
if run.description:
Expand All @@ -72,6 +75,8 @@ def check_local_to_cloud_run(pf: PFClient, run: Run, check_run_details_in_cloud:
@pytest.mark.timeout(timeout=DEFAULT_TEST_TIMEOUT, method=PYTEST_TIMEOUT_METHOD)
@pytest.mark.e2etest
@pytest.mark.usefixtures(
"use_secrets_config_file",
"setup_local_connection",
"mock_set_headers_with_user_aml_token",
"single_worker_thread_pool",
"vcr_recording",
Expand All @@ -88,7 +93,7 @@ def test_upload_run(
):
name = randstr("batch_run_name_for_upload")
local_pf = Local2CloudTestHelper.get_local_pf(name)
# submit a local batch run
# submit a local batch run.
run = local_pf.run(
flow=f"{FLOWS_DIR}/simple_hello_world",
data=f"{DATAS_DIR}/webClassification3.jsonl",
Expand All @@ -112,6 +117,7 @@ def test_upload_flex_flow_run_with_yaml(self, pf: PFClient, randstr: Callable[[s
flow=Path(f"{EAGER_FLOWS_DIR}/simple_with_yaml"),
data=f"{DATAS_DIR}/simple_eager_flow_data.jsonl",
name=name,
column_mapping={"input_val": "${data.input_val}"},
display_name="sdk-cli-test-run-local-to-cloud-flex-with-yaml",
tags={"sdk-cli-test-flex": "true"},
description="test sdk local to cloud",
Expand All @@ -131,6 +137,7 @@ def test_upload_flex_flow_run_without_yaml(self, pf: PFClient, randstr: Callable
flow="entry:my_flow",
code=f"{EAGER_FLOWS_DIR}/simple_without_yaml",
data=f"{DATAS_DIR}/simple_eager_flow_data.jsonl",
column_mapping={"input_val": "${data.input_val}"},
name=name,
display_name="sdk-cli-test-run-local-to-cloud-flex-without-yaml",
tags={"sdk-cli-test-flex": "true"},
Expand Down Expand Up @@ -164,7 +171,6 @@ def test_upload_run_with_customized_run_properties(self, pf: PFClient, randstr:
local_pf = Local2CloudTestHelper.get_local_pf(name)

run_type = "test_run_type"
eval_artifacts = '[{"path": "instance_results.jsonl", "type": "table"}]'

# submit a local batch run
run = local_pf._run(
Expand All @@ -173,20 +179,14 @@ def test_upload_run_with_customized_run_properties(self, pf: PFClient, randstr:
name=name,
column_mapping={"name": "${data.url}"},
display_name="sdk-cli-test-run-local-to-cloud-with-properties",
tags={"sdk-cli-test": "true"},
description="test sdk local to cloud",
properties={
Local2CloudUserProperties.RUN_TYPE: run_type,
Local2CloudUserProperties.EVAL_ARTIFACTS: eval_artifacts,
},
properties={Local2CloudUserProperties.RUN_TYPE: run_type},
)
run = local_pf.runs.stream(run.name)
assert run.status == RunStatus.COMPLETED

# check the run is uploaded to cloud, and the properties are set correctly
cloud_run = Local2CloudTestHelper.check_local_to_cloud_run(pf, run)
assert cloud_run.properties[Local2CloudUserProperties.RUN_TYPE] == run_type
assert cloud_run.properties[Local2CloudUserProperties.EVAL_ARTIFACTS] == eval_artifacts

@pytest.mark.skipif(condition=not pytest.is_live, reason="Bug - 3089145 Replay failed for test 'test_upload_run'")
def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):
Expand All @@ -208,7 +208,6 @@ def test_upload_eval_run(self, pf: PFClient, randstr: Callable[[str], str]):
data=f"{DATAS_DIR}/webClassification3.jsonl",
run=main_run_name,
name=eval_run_name,
# column_mapping={"name": "${run.outputs.result}"},
column_mapping={"name": "${data.url}"},
)
eval_run = Local2CloudTestHelper.check_local_to_cloud_run(pf, eval_run)
Expand All @@ -224,10 +223,9 @@ def test_upload_flex_flow_run_with_global_azureml(self, pf: PFClient, randstr: C
flow="entry:my_flow",
code=f"{EAGER_FLOWS_DIR}/simple_with_config_json",
data=f"{DATAS_DIR}/simple_eager_flow_data.jsonl",
column_mapping={"input_val": "${data.input_val}"},
name=name,
display_name="sdk-cli-test-run-local-to-cloud-flex-with-global-azureml",
tags={"sdk-cli-test-flex": "true"},
description="test sdk local to cloud",
)
assert run.status == RunStatus.COMPLETED
assert "error" not in run._to_dict(), f"Error found: {run._to_dict()['error']}"
Expand Down
11 changes: 11 additions & 0 deletions src/promptflow-core/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ def is_custom_key(key):
]


class TokenKeys:
TOTAL_TOKENS = "total_tokens"
COMPLETION_TOKENS = "completion_tokens"
PROMPT_TOKENS = "prompt_tokens"

@staticmethod
def get_all_values():
values = [value for key, value in vars(TokenKeys).items() if isinstance(value, str) and key.isupper()]
return values


class ConnectionProviderConfig:
LOCAL = "local"
AZUREML = "azureml"
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class Local2Cloud:
class Local2CloudProperties:
"""Run properties that server needs when uploading local run to cloud."""

TOTAL_TOKENS = "azureml.promptflow.total_tokens"
PREFIX = "azureml.promptflow"
EVAL_ARTIFACTS = "_azureml.evaluate_artifacts"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ def _submit_bulk_run(
status = Status.Failed.value
exception = None
# create run to db when fully prepared to run in executor, otherwise won't create it
run._status = Status.Running.value
run._start_time = datetime.datetime.now()
run._dump() # pylint: disable=protected-access

resume_from_run_storage = (
Expand Down
12 changes: 9 additions & 3 deletions src/promptflow-devkit/promptflow/_sdk/entities/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from dateutil import parser as date_parser

from promptflow._constants import FlowType, OutputsFolderName
from promptflow._constants import FlowType, OutputsFolderName, TokenKeys
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._constants import (
BASE_PATH_CONTEXT_KEY,
Expand Down Expand Up @@ -710,12 +710,18 @@ def _to_rest_object_for_local_to_cloud(self, local_to_cloud_info: dict, variant_
end_time = self._end_time.isoformat() + "Z" if self._end_time else None

# extract properties that needs to be passed to the request
total_tokens = self.properties[FlowRunProperties.SYSTEM_METRICS].get("total_tokens", 0)
properties = {
Local2CloudProperties.TOTAL_TOKENS: total_tokens,
# add instance_results.jsonl path to run properties, which is required by UI feature.
Local2CloudProperties.EVAL_ARTIFACTS: '[{"path": "instance_results.jsonl", "type": "table"}]',
}
# add system metrics to run properties
for token_key in TokenKeys.get_all_values():
final_key = f"{Local2CloudProperties.PREFIX}.{token_key}"
value = self.properties[FlowRunProperties.SYSTEM_METRICS].get(token_key, 0)
if value is not None:
properties[final_key] = value

# add user properties to run properties
for property_key in Local2CloudUserProperties.get_all_values():
value = self.properties.get(property_key, None)
if value is not None:
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow-devkit/promptflow/batch/_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from itertools import chain
from typing import Any, List, Mapping

from promptflow._constants import TokenKeys
from promptflow._utils.exception_utils import ExceptionPresenter, RootErrorCode
from promptflow.contracts.run_info import RunInfo, Status
from promptflow.executor._result import AggregationResult, LineResult
Expand Down Expand Up @@ -129,7 +130,7 @@ def _get_openai_metrics(line_results: List[LineResult], aggr_results: Aggregatio
def _try_get_openai_metrics(run_info: RunInfo):
openai_metrics = {}
if run_info.system_metrics:
for metric in ["total_tokens", "prompt_tokens", "completion_tokens"]:
for metric in TokenKeys.get_all_values():
if metric not in run_info.system_metrics:
return False
openai_metrics[metric] = run_info.system_metrics[metric]
Expand Down

0 comments on commit f952343

Please sign in to comment.