Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace HTTP "inprogress" gauge with megaservice "request_pending" one #864

Merged
merged 4 commits into from
Nov 13, 2024

Conversation

eero-t
Copy link
Contributor

@eero-t eero-t commented Nov 7, 2024

Description

This new metric matches better the earlier added (LLM usage specific) request latency one, and does not have the issues that the inprogress metric for prometheus-fastapi-instrumentator in HttpService did:

  • Create extra instances which have to be differentiated e.g. for CI:
  • Rely on metric instance (name) differentiator coming through obscure kwargs call-chain:
    • name => Gateway => MicroService => HttpService => BaseService => HttpService.title
  • Create separate inprogress metrics also for metric and health queries

I.e. it does not create useless extra metrics, and it's a bit easier to reason how & why it works.

(This reverts the "inprogress" metric part of commit a6998a1.)

Issues

n/a.

Type of change

  • Others (enhancement, documentation, validation, etc.)

Dependencies

n/a.

Tests

I've manually tested that the new metric works, and provides same gauge values as the old one.

Copy link
Member

@Spycsh Spycsh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

megaservice_request_pending should be the e2e pending request number pending, not LLM specific, right? I think self.metrics.pending_update(True) should be placed here, and another self.metrics.pending_update(True) should also be placed here. WDYT?

@eero-t
Copy link
Contributor Author

eero-t commented Nov 8, 2024

megaservice_request_pending should be the e2e pending request number pending, not LLM specific, right? I think self.metrics.pending_update(True) should be placed here, and another self.metrics.pending_update(True) should also be placed here. WDYT?

Thanks for suggestion! I had trouble finding place in code when request has really ends, and which could be passed the starting time. Best place I found so far was end of token_generator(), so currently completed request (latency) count is also using that.

I'll try whether your suggestion works OK. I.e. will it be called only:

  • For relevant user request, i.e. POSTed queries, not health or metric GETs
  • When request has fully completed

If it does, I'll move both pending gauge and completed counter to new place.

@eero-t
Copy link
Contributor Author

eero-t commented Nov 8, 2024

megaservice_request_pending should be the e2e pending request number pending, not LLM specific, right? I think self.metrics.pending_update(True) should be placed here, and another self.metrics.pending_update(True) should also be placed here. WDYT?

@Spycsh Unfortunately it does not work. pending_update(True) calls do not match pending_update(False) calls, because latter is just in the else (non-LLM) branch, so the pending requests count is ever-increasing (i.e. wrong).

And if I keep pending_update(False) also in the LLM branch, pending count rate change matches incoming request & count is stable, but it's too high, because then those will be called multiple times for each user query. I assume they get called once for each item in the dependency graph?

=> Summary: original code in this PR is better / more correct.

However, I realized it has couple of shortcomings compared to HTTP inprogress metric:

  • If application does not call LLM/LVM service i.e. produce tokens, there are no request metrics either
  • If application calls multiple such services (with streaming), both request & token counts are multiples of that

I do not think those to be serious issues currently though.

Although there are couple of apps in 1st category, Lacking metrics is better than providing false ones:

And happily there are no OPEA applications in 2nd category (as guardrails, embedding & reranking have their own types).

@Spycsh
Copy link
Member

Spycsh commented Nov 9, 2024

Hi @eero-t , sorry for a typo. What I mean is to set pending_update to True at the first before if-else and to False in each if-else branch. This should keep the number stable, right?

Since this only control microservice rather than megaservice/e2e request number, please refer to my following comments.

@Spycsh
Copy link
Member

Spycsh commented Nov 9, 2024

And if I keep pending_update(False) also in the LLM branch, pending count rate change matches incoming request count is stable, but it's too high, because then those will be called multiple times for each user query. I assume they get called once for each item in the dependency graph?

I see. So the correct place to modify the e2e pending request number should be in the "def schedule" method. In "def execute" you can only control the current microservice's pending requests, however e2e is measuring the requests before whole pipeline start --> after whole pipeline finish, which can involve multiple microservices' req/reply.

Therefore I think you should move the pending_update to the schedule method. Notice that one "schedule" maps to one user request handling, and maps to >=1 "execute", and each execute maps to req/reply of one microservice.

@Spycsh
Copy link
Member

Spycsh commented Nov 9, 2024

ServiceType is in fact not a necessary parameter. It behaves like a mark for identifying the current microservice type spec when handling specific cases such as streaming/non-streaming. Although it is shown as a Enum type, you can also set an arbitrary string as a "mark". I think we will refactor or add more doc to explain this in the future.

@eero-t
Copy link
Contributor Author

eero-t commented Nov 11, 2024

Therefore I think you should move the pending_update to the schedule method. Notice that one "schedule" maps to one user request handling, and maps to >=1 "execute", and each execute maps to req/reply of one microservice.

@Spycsh I've already tried that earlier (and retried it now), and it does not work.

This is regardless of whether metric is updated at end of schedule(), or after its while pending: loop finishes. schedule() method finishes much before the called subservice(s) responses have ended, so the pending counts and response times are bogus.

I think this is because there are multiple levels of async used, and shedule() returns with StreamingResponse generator from execute(). Because that generator is async, shedule() completes in practice after LLM query has been posted, before even first token has arrived from LLM to that async generator.

=> Original content of this PR (or "inprogress" metric it replaces), is still the best alternative.

PS. schedule code is IMHO unnecessarily hard to read. I filed PR #889 to improve it slightly.

Copy link

codecov bot commented Nov 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Files with missing lines Coverage Δ
comps/cores/mega/http_service.py 77.77% <100.00%> (-0.61%) ⬇️
comps/cores/mega/orchestrator.py 92.50% <100.00%> (+0.35%) ⬆️

@Spycsh
Copy link
Member

Spycsh commented Nov 12, 2024

Hi @eero-t , if your named it as end2end pending request, you should fix it correctly for streaming and non-streaming cases (when the final response is not StreamingResponse). For example you can try to update the curl request json, and add the "stream":"False" to see a non-streaming response. In that case I am afraid the end2end pending requests keep 0, right?

@Spycsh
Copy link
Member

Spycsh commented Nov 12, 2024

I think this is because there are multiple levels of async used, and shedule() returns with StreamingResponse generator from execute(). Because that generator is async, shedule() completes in practice after LLM query has been posted, before even first token has arrived from LLM to that async generator.

Yes absolutely. You cannot detect the end of the streaming case from outside, and the only way to do it is to update it in the generator passing to the StreamingResponse, as you coded in the execute.

However, my point is that your PR aim to track the end to end pending request number. So the start of the e2e request is always the first line of schedule. And the end of e2e request is:

  • For streaming case, in the generator (you've already got that in execute streaming branch, but you should move the pending request start to the first line of schedule)
  • For non-streaming case, at the end of the schedule function

To check whether it is streaming case, you can check the llm_params_dict. Like "if not llm_params_dict["streaming"]: ..."

Tell me if you have any further questions or if anywhere I'm wrong!

@eero-t
Copy link
Contributor Author

eero-t commented Nov 12, 2024

However, my point is that your PR aim to track the end to end pending request number. So the start of the e2e request is always the first line of schedule. And the end of e2e request is:

  • For streaming case, in the generator
    (you've already got that in execute streaming branch, but you should move the pending request start to the first line of schedule)

No, we already covered that above. Increasing pending count for streaming case in schedule() would produce bogus values.

  • For non-streaming case, at the end of the schedule function

Yeah, for non-streaming case, I can have both pending count updates in schedule(), if testing shows it producing correct values.

I won't do that for request latency though as thats used to derive additional metrics with token latency histogram metric (e.g. tokens-per-request metric). That will be provided only for streaming case.

Reason: because token metrics cannot be provided with non-streaming case (as there's just single reply), those derived metrics would be bogus when application is processing both streaming and non-streaming requests.

@Spycsh
Copy link
Member

Spycsh commented Nov 12, 2024

No, we already covered that above. Increasing pending count for streaming case in schedule() would produce bogus values.

For streaming case, only increasing pending count at the top of schedule and only decreasing pending count at the end of the generator in execute. This still produces bogus values. Is that what you mean?

@eero-t
Copy link
Contributor Author

eero-t commented Nov 12, 2024

For streaming case, only increasing pending count at the top of schedule and only decreasing pending count at the end of the generator in execute. This still produces bogus values. Is that what you mean?

Yes.

I tried applying this on top of 1.0 version of ChatQnA/GenAIComps:

     async def schedule(self, initial_inputs: Dict, llm_parameters: LLMParams = LLMParams(), **kwargs):
+        req_start = time.time()
+        if not llm_parameters.streaming:
+            self.metrics.pending_update(True)
...
+        if not llm_parameters.streaming:
+            self.metrics.pending_update(False)
+
         return result_dict, runtime_graph

And it caused bogus pending counts also for streaming requests.

That issue seems to have been fixed with the current comps version (nightly latest ChatQnA image), but it's still producing bogus[1] values for non-streaming case.

[1] Way too large: 18 for 2 parallel reqs, 24 for 8 parallel reqs, 48 for 32 parallel reqs... This issue is fully reproducible in the Gaudi setup I'm using.

@Spycsh
Copy link
Member

Spycsh commented Nov 12, 2024

What I mean is

    async def schedule(self, initial_inputs: Dict | BaseModel, llm_parameters: LLMParams = LLMParams(), **kwargs):
        self.metrics.pending_update(True)  # [1] For streaming/non-streaming, always start update pending requests here

        result_dict = {}
        runtime_graph = DAG()
        runtime_graph.graph = copy.deepcopy(self.graph)
        
        # ...
        # ...
        # ...

        if not llm_parameters.streaming:
            self.metrics.pending_update(False)  # [2] for non-streaming case, currently this request is synchronously done, decrease pending request
            
         
        return result_dict, runtime_graph

    async def execute(
        self,
        session: aiohttp.client.ClientSession,
        req_start: float,
        cur_node: str,
        inputs: Dict,
        runtime_graph: DAG,
        llm_parameters: LLMParams = LLMParams(),
        **kwargs,
    ):
        # send the cur_node request/reply
        endpoint = self.services[cur_node].endpoint_path
        llm_parameters_dict = llm_parameters.dict()
        if (
            self.services[cur_node].service_type == ServiceType.LLM
            or self.services[cur_node].service_type == ServiceType.LVM
        ):
            for field, value in llm_parameters_dict.items():
                if inputs.get(field) != value:
                    inputs[field] = value
        # pre-process
        inputs = self.align_inputs(inputs, cur_node, runtime_graph, llm_parameters_dict, **kwargs)

        if (
            self.services[cur_node].service_type == ServiceType.LLM
            or self.services[cur_node].service_type == ServiceType.LVM
        ) and llm_parameters.streaming:
            # Still leave to sync requests.post for StreamingResponse
            # self.metrics.pending_update(True) ## Remove this line !!!!!!!
            if LOGFLAG:
                logger.info(inputs)

            response = requests.post(
                url=endpoint,
                data=json.dumps(inputs),
                headers={"Content-type": "application/json"},
                proxies={"http": None},
                stream=True,
                timeout=1000,
            )
            downstream = runtime_graph.downstream(cur_node)
            if downstream:
                assert len(downstream) == 1, "Not supported multiple streaming downstreams yet!"
                cur_node = downstream[0]
                hitted_ends = [".", "?", "!", "。", ",", "!"]
                downstream_endpoint = self.services[downstream[0]].endpoint_path

            def generate():
                token_start = req_start
                if response:
                    # response.elapsed = time until first headers received
                    buffered_chunk_str = ""
                    is_first = True
                    for chunk in response.iter_content(chunk_size=None):
                        if chunk:
                            if downstream:
                                chunk = chunk.decode("utf-8")
                                buffered_chunk_str += self.extract_chunk_str(chunk)
                                is_last = chunk.endswith("[DONE]\n\n")
                                if (buffered_chunk_str and buffered_chunk_str[-1] in hitted_ends) or is_last:
                                    res = requests.post(
                                        url=downstream_endpoint,
                                        data=json.dumps({"text": buffered_chunk_str}),
                                        proxies={"http": None},
                                    )
                                    res_json = res.json()
                                    if "text" in res_json:
                                        res_txt = res_json["text"]
                                    else:
                                        raise Exception("Other response types not supported yet!")
                                    buffered_chunk_str = ""  # clear
                                    yield from self.token_generator(
                                        res_txt, token_start, is_first=is_first, is_last=is_last
                                    )
                                    token_start = time.time()
                            else:
                                yield chunk
                                token_start = self.metrics.token_update(token_start, is_first)
                            is_first = False
                    self.metrics.request_update(req_start)
                    self.metrics.pending_update(False) # [3] for streaming response, LLM is the last output, currently this request is asynchronously done, decrease pending request


            return (
                StreamingResponse(self.align_generator(generate(), **kwargs), media_type="text/event-stream"),
                cur_node,
            )
        else:
            if LOGFLAG:
                logger.info(inputs)
            if not isinstance(inputs, dict):
                input_data = inputs.dict()
                # remove null
                input_data = {k: v for k, v in input_data.items() if v is not None}
            else:
                input_data = inputs
            async with session.post(endpoint, json=input_data) as response:
                if response.content_type == "audio/wav":
                    audio_data = await response.read()
                    data = self.align_outputs(
                        audio_data, cur_node, inputs, runtime_graph, llm_parameters_dict, **kwargs
                    )
                else:
                    # Parse as JSON
                    data = await response.json()
                    # post process
                    data = self.align_outputs(data, cur_node, inputs, runtime_graph, llm_parameters_dict, **kwargs)

                return data, cur_node

You can see/search the three occurences of pending request number update [1][2][3] in this piece of code. Have you tried this?

@eero-t
Copy link
Contributor Author

eero-t commented Nov 12, 2024

I'll try that too, but it may be best just to:

  • Leave HTTP "inprogress" metric for tracking pending counts for whole request
  • Iindicate better that this new metric is tracking pending count only for LLM stream part of a requests

While it could be renamed to megaservice_llm_stream_request_pending (_total), that's a bit long, so I would prefer just documenting all metrics e.g. comps/cores/telemetry/README.md. I'll do that too.

Unlike other megaservice ServiceOrchestrator metrics, this covers (can
cover) also non-streaming requests, as suggested in PR review.

This does not have issues Prometheus-fastapi-instrumentator
"inprogress" metric did:
* Extra instances which have to be differentiated e.g. for CI
* Rely on name -> suffix coming through obscure kwargs calls

Signed-off-by: Eero Tamminen <eero.t.tamminen@intel.com>
Now that ServiceOrchestrator provides pending metric.

Reverts the "inprogress" metric part of commit a6998a1.

Signed-off-by: Eero Tamminen <eero.t.tamminen@intel.com>
Signed-off-by: Eero Tamminen <eero.t.tamminen@intel.com>
@eero-t
Copy link
Contributor Author

eero-t commented Nov 12, 2024

@Spycsh Ok, doing it like that seems to produce correct values. I pushed updated version, along with corresponding observability doc update.

@Spycsh
Copy link
Member

Spycsh commented Nov 13, 2024

@eero-t Thanks. Overall I think this is a great PR! Will merge this.

@lvliang-intel lvliang-intel merged commit 3b106c8 into opea-project:main Nov 13, 2024
14 checks passed
@eero-t eero-t deleted the pending-requests branch November 14, 2024 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants