Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Commit

Permalink
Feature: Add Streaming Functionality To Ocular Search (#94)
Browse files Browse the repository at this point in the history
* Add Chat Streaming Functionality

* Add Streaming Functionality

* Add AI Streaming API

* Ocular CoPilot Streaming

* Add Streaming Functionality

* Add Azure Open AI
  • Loading branch information
louismurerwa authored May 23, 2024
1 parent 3b5ee4e commit 737acc3
Show file tree
Hide file tree
Showing 21 changed files with 661 additions and 244 deletions.
58 changes: 57 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions packages/ocular-ui/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
class NdJsonParserStream extends TransformStream<string, JSON> {
private buffer: string = '';
constructor() {
let controller: TransformStreamDefaultController<JSON>;
super({
start: (_controller) => {
controller = _controller;
},
transform: (chunk) => {
const jsonChunks = chunk.split('\n').filter(Boolean);
for (const jsonChunk of jsonChunks) {
try {
this.buffer += jsonChunk;
controller.enqueue(JSON.parse(this.buffer));
this.buffer = '';
} catch {
// Invalid JSON, wait for next chunk
}
}
},
});
}
}

export function createReader(responseBody: ReadableStream<Uint8Array> | null) {
return responseBody
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new NdJsonParserStream())
.getReader();
}

export async function* readStream<T>(reader: any): AsyncGenerator<T, void> {
if (!reader) {
throw new Error('No response body or body is not readable');
}

let value: JSON | undefined;
let done: boolean;
while ((({ value, done } = await reader.read()), !done)) {
yield new Promise<T>((resolve) => {
setTimeout(() => {
resolve(value as T);
}, 50);
});
}
}
30 changes: 25 additions & 5 deletions packages/ocular-ui/pages/dashboard/search/results/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Header from "@/components/search/header";
import { useRouter } from "next/router";
import SearchResults from "@/components/search/search-results";
import { ApplicationContext } from "@/context/context"
import { createReader,readStream } from '@/lib/stream';

// Importing API End Points
import api from "@/services/api"
Expand Down Expand Up @@ -34,19 +35,38 @@ const selectedDate = useMemo(() => {
useEffect(() => {
setIsLoadingResults(true);
setIsLoadingCopilot(true);
// Search
api.search.search(router.query.q, selectedResultSources, selectedDate)
.then(data => {
setAiResults(data.data.chat_completion.content);
setai_citations(data.data.chat_completion.citations);
setIsLoadingCopilot(false);
setSearchResults(data.data.hits);
setResultSources(data.data.sources);
setIsLoadingResults(false);
})
.catch(error => {
setIsLoadingResults(false);
setIsLoadingCopilot(false);
setIsLoadingResults(false);
});
// Copilot
const stream = true;
api.search.ask(router.query.q, selectedResultSources, selectedDate, stream)
.then(async response => {
setIsLoadingCopilot(false);
if(stream){
const reader = createReader(response.body);
const chunks = readStream(reader);
for await (const chunk of chunks) {
setAiResults(chunk.chat_completion.content);
setai_citations(chunk.chat_completion.citations);
}
} else {
setAiResults(response.data.chat_completion.content);
setai_citations(response.data.chat_completion.citations);
setIsLoadingCopilot(false);
}
})
.catch(error => {
console.error(error);
setIsLoadingCopilot(false);
});
}, [router.query.q, selectedResultSources, setResultSources, selectedDate]);

return (
Expand Down
19 changes: 18 additions & 1 deletion packages/ocular-ui/services/api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ocularRequest from './request';
import ocularStreamingRequest from './streaming-request';

export default {
apps: {
Expand Down Expand Up @@ -90,12 +91,28 @@ export default {
},
},
search: {
ask(q, sources, date,stream=false) {
const path = `/ask`;
const body = {
context: {
top: 5,
stream:stream
},
q: q
};
if (date.from || date.to) {
body.context.date = date;
}
if (sources && sources.length > 0) {
body.context.sources = sources;
}
return ocularStreamingRequest("POST", path, body,stream);
},
search(q, sources, date) {
const path = `/search`;
const body = {
context: {
top: 20,
ai_completion: true
// date: date,
},
q: q
Expand Down
48 changes: 48 additions & 0 deletions packages/ocular-ui/services/streaming-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Ocular uses Axios on all its services to make HTTP requests. However Axios does support client streaming in the
// browser so we created this stream api as a workaround. The streaming api is a simple wrapper around the fetch api that allows you to make streaming requests to the server.
export const OCULAR_BACKEND_URL =
process.env.OCULAR_BACKEND_URL || 'http://localhost:9000/v1';

export default async function ocularStreamingRequest(
method:
| 'GET'
| 'POST'
| 'PUT'
| 'DELETE'
| 'PATCH'
| 'OPTIONS'
| 'HEAD'
| 'CONNECT'
| 'TRACE',
path = '',
payload = {},
stream = false,
cancelTokenSource: AbortController | null
) {
const headers = {
'Access-Control-Allow-Origin': '*',
'Content-Type': 'application/json',
};

const options = {
method,
headers,
credentials: 'include',
body: JSON.stringify(payload),
signal: cancelTokenSource ? cancelTokenSource.signal : undefined,
};

if (method === 'GET') {
delete options.body;
}

const response = await fetch(`${OCULAR_BACKEND_URL}${path}`, options);

if (!response.ok) {
const error = await response.text();
console.error('Error', error);
throw new Error(error);
}

return response;
}
34 changes: 17 additions & 17 deletions packages/ocular/src/api/routes/member/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { Router } from "express"
import { Router } from "express";

import auth from "./auth"
import middlewares from "../../middlewares"
import auth from "./auth";
import middlewares from "../../middlewares";
// import apps from "./apps"
// import components from "./member/components"
import search from "./search"
import chat from "./chat"
// import teams from "./member/teams"
import search from "./search";
import chat from "./chat";
import { ask } from "./search";
// import organisation from "./member/organisation"

export default (app, container, config) => {
const route = Router()
app.use("/",route)
const route = Router();
app.use("/", route);

// Unauthenticated Routes
auth(route)
auth(route);

// Authenticated routes
route.use(middlewares.authenticate())
route.use(middlewares.registeredLoggedinUser)
route.use(middlewares.authenticate());
route.use(middlewares.registeredLoggedinUser);

// // Authenticated routes
// route.use(middlewares.authenticate())
Expand All @@ -27,11 +27,11 @@ export default (app, container, config) => {
// apps(route)
// components(route)
// invites(route)
chat(route)
search(route)
// teams(route)
chat(route);
search(route);
ask(route);
// organisation(route)

// users(route)
return app
}
return app;
};
Loading

0 comments on commit 737acc3

Please sign in to comment.