Skip to content

Commit

Permalink
Merge branch 'main' into zhangxingzhi/reset-user-agent-for-each-pfs-r…
Browse files Browse the repository at this point in the history
…equest
  • Loading branch information
elliotzh committed Mar 12, 2024
2 parents fb9c41b + b2cdec0 commit f3b5e3d
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 64 deletions.
8 changes: 8 additions & 0 deletions src/promptflow/promptflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,11 @@ class MessageFormatType:


DEFAULT_OUTPUT_NAME = "output"

OUTPUT_FILE_NAME = "output.jsonl"


class OutputsFolderName:
FLOW_OUTPUTS = "flow_outputs"
FLOW_ARTIFACTS = "flow_artifacts"
NODE_ARTIFACTS = "node_artifacts"
35 changes: 26 additions & 9 deletions src/promptflow/promptflow/_sdk/_service/apis/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import traceback
from datetime import datetime
from typing import Callable

from flask import request
from google.protobuf.json_format import MessageToJson
Expand All @@ -28,7 +29,15 @@
from promptflow._utils.thread_utils import ThreadWithContextVars


def trace_collector(logger: logging.Logger):
def trace_collector(get_created_by_info_with_cache: Callable, logger: logging.Logger):
"""
This function is target to be reused in other places, so pass in get_created_by_info_with_cache and logger to avoid
app related dependencies.
Args:
get_created_by_info_with_cache (Callable): A function that retrieves information about the creator of the trace.
logger (logging.Logger): The logger object used for logging.
"""
content_type = request.headers.get("Content-Type")
# binary protobuf encoding
if "application/x-protobuf" in content_type:
Expand All @@ -55,15 +64,17 @@ def trace_collector(logger: logging.Logger):
all_spans.append(span)

# Create a new thread to write trace to cosmosdb to avoid blocking the main thread
ThreadWithContextVars(target=_try_write_trace_to_cosmosdb, args=(all_spans, logger)).start()
ThreadWithContextVars(
target=_try_write_trace_to_cosmosdb, args=(all_spans, get_created_by_info_with_cache, logger)
).start()
return "Traces received", 200

# JSON protobuf encoding
elif "application/json" in content_type:
raise NotImplementedError


def _try_write_trace_to_cosmosdb(all_spans, logger: logging.Logger):
def _try_write_trace_to_cosmosdb(all_spans, get_created_by_info_with_cache: Callable, logger: logging.Logger):
if not all_spans:
return
try:
Expand All @@ -78,31 +89,37 @@ def _try_write_trace_to_cosmosdb(all_spans, logger: logging.Logger):

logger.info(f"Start writing trace to cosmosdb, total spans count: {len(all_spans)}.")
start_time = datetime.now()
from promptflow._sdk._service.app import CREATED_BY_FOR_LOCAL_TO_CLOUD_TRACE
from promptflow.azure._storage.cosmosdb.client import get_client
from promptflow.azure._storage.cosmosdb.span import Span as SpanCosmosDB
from promptflow.azure._storage.cosmosdb.summary import Summary

# Load span and summary clients first time may slow.
# So, we load 2 client in parallel for warm up.
span_thread = ThreadWithContextVars(
span_client_thread = ThreadWithContextVars(
target=get_client, args=(CosmosDBContainerName.SPAN, subscription_id, resource_group_name, workspace_name)
)
span_thread.start()
span_client_thread.start()

# Load created_by info first time may slow. So, we load it in parallel for warm up.
created_by_thread = ThreadWithContextVars(target=get_created_by_info_with_cache)
created_by_thread.start()

get_client(CosmosDBContainerName.LINE_SUMMARY, subscription_id, resource_group_name, workspace_name)

span_thread.join()
span_client_thread.join()
created_by_thread.join()

created_by = get_created_by_info_with_cache()

for span in all_spans:
span_client = get_client(CosmosDBContainerName.SPAN, subscription_id, resource_group_name, workspace_name)
result = SpanCosmosDB(span, CREATED_BY_FOR_LOCAL_TO_CLOUD_TRACE).persist(span_client)
result = SpanCosmosDB(span, created_by).persist(span_client)
# None means the span already exists, then we don't need to persist the summary also.
if result is not None:
line_summary_client = get_client(
CosmosDBContainerName.LINE_SUMMARY, subscription_id, resource_group_name, workspace_name
)
Summary(span, CREATED_BY_FOR_LOCAL_TO_CLOUD_TRACE, logger).persist(line_summary_client)
Summary(span, created_by, logger).persist(line_summary_client)
logger.info(
(
f"Finish writing trace to cosmosdb, total spans count: {len(all_spans)}."
Expand Down
71 changes: 39 additions & 32 deletions src/promptflow/promptflow/_sdk/_service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import logging
import threading
import time
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
Expand Down Expand Up @@ -42,9 +43,6 @@ def heartbeat():
return jsonify(response)


CREATED_BY_FOR_LOCAL_TO_CLOUD_TRACE = {}


def create_app():
app = Flask(__name__)

Expand All @@ -54,7 +52,9 @@ def create_app():
CORS(app)

app.add_url_rule("/heartbeat", view_func=heartbeat)
app.add_url_rule("/v1/traces", view_func=lambda: trace_collector(app.logger), methods=["POST"])
app.add_url_rule(
"/v1/traces", view_func=lambda: trace_collector(get_created_by_info_with_cache, app.logger), methods=["POST"]
)
with app.app_context():
api_v1 = Blueprint("Prompt Flow Service", __name__, url_prefix="/v1.0")

Expand Down Expand Up @@ -86,33 +86,6 @@ def create_app():
# Set app logger to the only one RotatingFileHandler to avoid duplicate logs
app.logger.handlers = [handler]

def initialize_created_by_info():
from promptflow._sdk._configuration import Configuration
from promptflow._sdk._utils import extract_workspace_triad_from_trace_provider

trace_provider = Configuration.get_instance().get_trace_provider()
if trace_provider is None or extract_workspace_triad_from_trace_provider(trace_provider) is None:
return
try:
import jwt
from azure.identity import DefaultAzureCredential

from promptflow.azure._utils.general import get_arm_token

default_credential = DefaultAzureCredential()

token = get_arm_token(credential=default_credential)
decoded_token = jwt.decode(token, options={"verify_signature": False})
user_object_id, user_tenant_id = decoded_token["oid"], decoded_token["tid"]
CREATED_BY_FOR_LOCAL_TO_CLOUD_TRACE.update(
{
"object_id": user_object_id,
"tenant_id": user_tenant_id,
}
)
except Exception as e:
current_app.logger.error(f"Failed to get created_by info, ignore it: {e}")

# Basic error handler
@api.errorhandler(Exception)
def handle_exception(e):
Expand Down Expand Up @@ -167,8 +140,42 @@ def monitor_request():
kill_exist_service(port)
break

initialize_created_by_info()
if not is_run_from_built_binary():
monitor_thread = ThreadWithContextVars(target=monitor_request, daemon=True)
monitor_thread.start()
return app, api


created_by_for_local_to_cloud_trace = {}
created_by_for_local_to_cloud_trace_lock = threading.Lock()


def get_created_by_info_with_cache():
if len(created_by_for_local_to_cloud_trace) > 0:
return created_by_for_local_to_cloud_trace
with created_by_for_local_to_cloud_trace_lock:
if len(created_by_for_local_to_cloud_trace) > 0:
return created_by_for_local_to_cloud_trace
try:
# The total time of collecting info is about 3s.
import jwt
from azure.identity import DefaultAzureCredential

from promptflow.azure._utils.general import get_arm_token

default_credential = DefaultAzureCredential()

token = get_arm_token(credential=default_credential)
decoded_token = jwt.decode(token, options={"verify_signature": False})
created_by_for_local_to_cloud_trace.update(
{
"object_id": decoded_token["oid"],
"tenant_id": decoded_token["tid"],
# Use appid as fallback for service principal scenario.
"name": decoded_token.get("name", decoded_token.get("appid", "")),
}
)
except Exception as e:
# This function is only target to be used in Flask app.
current_app.logger.error(f"Failed to get created_by info, ignore it: {e}")
return created_by_for_local_to_cloud_trace
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from filelock import FileLock

from promptflow._constants import OUTPUT_FILE_NAME, OutputsFolderName
from promptflow._sdk._constants import (
HOME_PROMPT_FLOW_DIR,
LINE_NUMBER,
Expand Down Expand Up @@ -45,6 +46,7 @@
load_multimedia_data_recursively,
resolve_multimedia_data_recursively,
)
from promptflow._utils.utils import prepare_folder
from promptflow._utils.yaml_utils import load_yaml
from promptflow.batch._result import BatchResult
from promptflow.contracts.multimedia import Image
Expand Down Expand Up @@ -191,13 +193,13 @@ class LocalStorageOperations(AbstractBatchRunStorage):

def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
self._run = run
self.path = self._prepare_folder(self._run._output_path)
self.path = prepare_folder(self._run._output_path)

self.logger = LoggerOperations(
file_path=self.path / LocalStorageFilenames.LOG, stream=stream, run_mode=run_mode
)
# snapshot
self._snapshot_folder_path = self._prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
self._snapshot_folder_path = prepare_folder(self.path / LocalStorageFilenames.SNAPSHOT_FOLDER)
self._dag_path = self._snapshot_folder_path / LocalStorageFilenames.DAG
self._flow_tools_json_path = (
self._snapshot_folder_path / PROMPT_FLOW_DIR_NAME / LocalStorageFilenames.FLOW_TOOLS_JSON
Expand All @@ -214,10 +216,10 @@ def __init__(self, run: Run, stream=False, run_mode=RunMode.Test):
# for line run records, store per line
# for normal node run records, store per node per line;
# for reduce node run records, store centralized in 000000000.jsonl per node
self.outputs_folder = self._prepare_folder(self.path / "flow_outputs")
self._outputs_path = self.outputs_folder / "output.jsonl" # dumped by executor
self._node_infos_folder = self._prepare_folder(self.path / "node_artifacts")
self._run_infos_folder = self._prepare_folder(self.path / "flow_artifacts")
self.outputs_folder = prepare_folder(self.path / OutputsFolderName.FLOW_OUTPUTS)
self._outputs_path = self.outputs_folder / OUTPUT_FILE_NAME # dumped by executor
self._node_infos_folder = prepare_folder(self.path / OutputsFolderName.NODE_ARTIFACTS)
self._run_infos_folder = prepare_folder(self.path / OutputsFolderName.FLOW_ARTIFACTS)
self._data_path = Path(run.data) if run.data is not None else None

self._meta_path = self.path / LocalStorageFilenames.META
Expand Down Expand Up @@ -379,7 +381,7 @@ def load_metrics(self, *, parse_const_as_str: bool = False) -> Dict[str, Union[i

def persist_node_run(self, run_info: NodeRunInfo) -> None:
"""Persist node run record to local storage."""
node_folder = self._prepare_folder(self._node_infos_folder / run_info.node)
node_folder = prepare_folder(self._node_infos_folder / run_info.node)
self._persist_run_multimedia(run_info, node_folder)
node_run_record = NodeRunRecord.from_run_info(run_info)
# for reduce nodes, the line_number is None, store the info in the 000000000.jsonl
Expand Down Expand Up @@ -482,12 +484,6 @@ def _serialize_multimedia(self, value, folder_path: Path, relative_path: Path =
serialization_funcs = {Image: partial(Image.serialize, **{"encoder": pfbytes_file_reference_encoder})}
return serialize(value, serialization_funcs=serialization_funcs)

@staticmethod
def _prepare_folder(path: Union[str, Path]) -> Path:
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path

@staticmethod
def _outputs_padding(df: "DataFrame", inputs_line_numbers: List[int]) -> "DataFrame":
import pandas as pd
Expand Down
7 changes: 7 additions & 0 deletions src/promptflow/promptflow/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,10 @@ def copy_file_except(src_dir, dst_dir, exclude_file):
src_file_path = os.path.join(root, file)
dst_file_path = os.path.join(current_dst_dir, file)
shutil.copy2(src_file_path, dst_file_path)


def prepare_folder(path: Union[str, Path]) -> Path:
"""Create folder if not exists and return the folder path."""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
7 changes: 3 additions & 4 deletions src/promptflow/promptflow/batch/_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional

from promptflow._constants import LANGUAGE_KEY, LINE_NUMBER_KEY, LINE_TIMEOUT_SEC, FlowLanguage
from promptflow._constants import LANGUAGE_KEY, LINE_NUMBER_KEY, LINE_TIMEOUT_SEC, OUTPUT_FILE_NAME, FlowLanguage
from promptflow._core._errors import ResumeCopyError, UnexpectedError
from promptflow._core.operation_context import OperationContext
from promptflow._utils.async_utils import async_run_allowing_running_loop
Expand Down Expand Up @@ -46,7 +46,6 @@
from promptflow.executor.flow_validator import FlowValidator
from promptflow.storage import AbstractBatchRunStorage, AbstractRunStorage

OUTPUT_FILE_NAME = "output.jsonl"
DEFAULT_CONCURRENCY = 10


Expand Down Expand Up @@ -239,13 +238,13 @@ def _copy_previous_run_result(
return the list of previous line results for the usage of aggregation and summarization.
"""
# Load the previous flow run output from output.jsonl
previous_run_output = load_list_from_jsonl(resume_from_run_output_dir / "output.jsonl")
previous_run_output = load_list_from_jsonl(resume_from_run_output_dir / OUTPUT_FILE_NAME)
previous_run_output_dict = {
each_line_output[LINE_NUMBER_KEY]: each_line_output for each_line_output in previous_run_output
}

# Copy other files from resume_from_run_output_dir to output_dir in case there are images
copy_file_except(resume_from_run_output_dir, output_dir, "output.jsonl")
copy_file_except(resume_from_run_output_dir, output_dir, OUTPUT_FILE_NAME)

try:
previous_run_results = []
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_activate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.logger_utils import LogContext
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts._errors import FlowDefinitionError
from promptflow.contracts.run_info import FlowRunInfo
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_batch_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._sdk.entities._run import Run
from promptflow._sdk.operations._local_storage_operations import LocalStorageOperations
from promptflow._utils.utils import dump_list_to_jsonl
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._errors import EmptyInputsData
from promptflow.batch._result import BatchResult
from promptflow.contracts.run_info import Status
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_eager_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import pytest

from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow._constants import OUTPUT_FILE_NAME
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult, LineResult
from promptflow.contracts.run_info import Status
from promptflow.executor._script_executor import ScriptExecutor
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.multimedia_utils import MIME_PATTERN, _create_image_from_file, _is_url, is_multimedia_dict
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts.multimedia import Image
from promptflow.contracts.run_info import FlowRunInfo, RunInfo, Status
Expand Down
2 changes: 1 addition & 1 deletion src/promptflow/tests/executor/e2etests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._utils.logger_utils import LogContext
from promptflow.batch import BatchEngine
from promptflow.batch._result import BatchResult
Expand All @@ -21,7 +22,6 @@

TEST_LOGS_FLOW = ["print_input_flow"]
SAMPLE_FLOW_WITH_TEN_INPUTS = "simple_flow_with_ten_inputs"
OUTPUT_FILE_NAME = "output.jsonl"


def submit_batch_run(
Expand Down
3 changes: 2 additions & 1 deletion src/promptflow/tests/executor/e2etests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

import pytest

from promptflow._constants import OUTPUT_FILE_NAME
from promptflow._core.operation_context import OperationContext
from promptflow.batch._batch_engine import OUTPUT_FILE_NAME, BatchEngine
from promptflow.batch._batch_engine import BatchEngine
from promptflow.batch._result import BatchResult
from promptflow.contracts.run_mode import RunMode
from promptflow.executor import FlowExecutor
Expand Down

0 comments on commit f3b5e3d

Please sign in to comment.