Skip to content

Commit

Permalink
optimize ai cache
Browse files Browse the repository at this point in the history
  • Loading branch information
johnlanni committed Dec 26, 2024
1 parent 8f3723f commit fd47061
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 29 deletions.
14 changes: 9 additions & 5 deletions plugins/wasm-go/extensions/ai-cache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,26 @@ func onHttpResponseBody(ctx wrapper.HttpContext, c config.PluginConfig, chunk []
return chunk
}

stream := ctx.GetContext(STREAM_CONTEXT_KEY)
var err error
if !isLastChunk {
if err := handleNonLastChunk(ctx, c, chunk, log); err != nil {
if stream == nil {
err = handleNonStreamChunk(ctx, c, chunk, log)
} else {
err = handleStreamChunk(ctx, c, unifySSEChunk(chunk), log)
}
if err != nil {
log.Errorf("[onHttpResponseBody] handle non last chunk failed, error: %v", err)
// Set an empty struct in the context to indicate an error in processing the partial message
ctx.SetContext(ERROR_PARTIAL_MESSAGE_KEY, struct{}{})
}
return chunk
}

stream := ctx.GetContext(STREAM_CONTEXT_KEY)
var value string
var err error
if stream == nil {
value, err = processNonStreamLastChunk(ctx, c, chunk, log)
} else {
value, err = processStreamLastChunk(ctx, c, chunk, log)
value, err = processStreamLastChunk(ctx, c, unifySSEChunk(chunk), log)
}

if err != nil {
Expand Down
40 changes: 16 additions & 24 deletions plugins/wasm-go/extensions/ai-cache/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"fmt"
"strings"

Expand All @@ -9,17 +10,6 @@ import (
"github.com/tidwall/gjson"
)

func handleNonLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
stream := ctx.GetContext(STREAM_CONTEXT_KEY)
err := error(nil)
if stream == nil {
err = handleNonStreamChunk(ctx, c, chunk, log)
} else {
err = handleStreamChunk(ctx, c, chunk, log)
}
return err
}

func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
if tempContentI == nil {
Expand All @@ -32,6 +22,12 @@ func handleNonStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk
return nil
}

func unifySSEChunk(data []byte) []byte {
data = bytes.ReplaceAll(data, []byte("\r\n"), []byte("\n"))
data = bytes.ReplaceAll(data, []byte("\r"), []byte("\n"))
return data
}

func handleStreamChunk(ctx wrapper.HttpContext, c config.PluginConfig, chunk []byte, log wrapper.Log) error {
var partialMessage []byte
partialMessageI := ctx.GetContext(PARTIAL_MESSAGE_CONTEXT_KEY)
Expand Down Expand Up @@ -103,7 +99,7 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun
func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log wrapper.Log) (string, error) {
content := ""
for _, chunk := range strings.Split(sseMessage, "\n\n") {
log.Infof("chunk _ : %s", chunk)
log.Debugf("single sse message: %s", chunk)
subMessages := strings.Split(chunk, "\n")
var message string
for _, msg := range subMessages {
Expand Down Expand Up @@ -140,19 +136,15 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag
}
return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message)
} else {
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)

// If there is no content in the cache, initialize and set the content
if tempContentI == nil {
content = responseBody.String()
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
// Update the content in the cache
appendMsg := responseBody.String()
content = tempContentI.(string) + appendMsg
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
}
content += responseBody.String()
}
}
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
// If there is no content in the cache, initialize and set the content
if tempContentI == nil {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, tempContentI.(string)+content)
}
return content, nil
}

0 comments on commit fd47061

Please sign in to comment.