Skip to content
This repository has been archived by the owner on Sep 9, 2024. It is now read-only.

Feature: Add Streaming Functionality To Ocular Search #94

Merged
merged 7 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions packages/ocular-ui/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
class NdJsonParserStream extends TransformStream<string, JSON> {
private buffer: string = '';
constructor() {
let controller: TransformStreamDefaultController<JSON>;
super({
start: (_controller) => {
controller = _controller;
},
transform: (chunk) => {
const jsonChunks = chunk.split('\n').filter(Boolean);
for (const jsonChunk of jsonChunks) {
try {
this.buffer += jsonChunk;
controller.enqueue(JSON.parse(this.buffer));
this.buffer = '';
} catch {
// Invalid JSON, wait for next chunk
}
}
},
});
}
}

export function createReader(responseBody: ReadableStream<Uint8Array> | null) {
return responseBody
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new NdJsonParserStream())
.getReader();
}

export async function* readStream<T>(reader: any): AsyncGenerator<T, void> {
if (!reader) {
throw new Error('No response body or body is not readable');
}

let value: JSON | undefined;
let done: boolean;
while ((({ value, done } = await reader.read()), !done)) {
yield new Promise<T>((resolve) => {
setTimeout(() => {
resolve(value as T);
}, 50);
});
}
}
30 changes: 25 additions & 5 deletions packages/ocular-ui/pages/dashboard/search/results/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Header from "@/components/search/header";
import { useRouter } from "next/router";
import SearchResults from "@/components/search/search-results";
import { ApplicationContext } from "@/context/context"
import { createReader,readStream } from '@/lib/stream';

// Importing API End Points
import api from "@/services/api"
Expand Down Expand Up @@ -34,19 +35,38 @@ const selectedDate = useMemo(() => {
useEffect(() => {
setIsLoadingResults(true);
setIsLoadingCopilot(true);
// Search
api.search.search(router.query.q, selectedResultSources, selectedDate)
.then(data => {
setAiResults(data.data.chat_completion.content);
setai_citations(data.data.chat_completion.citations);
setIsLoadingCopilot(false);
setSearchResults(data.data.hits);
setResultSources(data.data.sources);
setIsLoadingResults(false);
})
.catch(error => {
setIsLoadingResults(false);
setIsLoadingCopilot(false);
setIsLoadingResults(false);
});
// Copilot
const stream = true;
api.search.ask(router.query.q, selectedResultSources, selectedDate, stream)
.then(async response => {
setIsLoadingCopilot(false);
if(stream){
const reader = createReader(response.body);
const chunks = readStream(reader);
for await (const chunk of chunks) {
setAiResults(chunk.chat_completion.content);
setai_citations(chunk.chat_completion.citations);
}
} else {
setAiResults(response.data.chat_completion.content);
setai_citations(response.data.chat_completion.citations);
setIsLoadingCopilot(false);
}
})
.catch(error => {
console.error(error);
setIsLoadingCopilot(false);
});
}, [router.query.q, selectedResultSources, setResultSources, selectedDate]);

return (
Expand Down
19 changes: 18 additions & 1 deletion packages/ocular-ui/services/api.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ocularRequest from './request';
import ocularStreamingRequest from './streaming-request';

export default {
apps: {
Expand Down Expand Up @@ -90,12 +91,28 @@ export default {
},
},
search: {
ask(q, sources, date,stream=false) {
const path = `/ask`;
const body = {
context: {
top: 5,
stream:stream
},
q: q
};
if (date.from || date.to) {
body.context.date = date;
}
if (sources && sources.length > 0) {
body.context.sources = sources;
}
return ocularStreamingRequest("POST", path, body,stream);
},
search(q, sources, date) {
const path = `/search`;
const body = {
context: {
top: 20,
ai_completion: true
// date: date,
},
q: q
Expand Down
48 changes: 48 additions & 0 deletions packages/ocular-ui/services/streaming-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Ocular uses Axios on all its services to make HTTP requests. However Axios does support client streaming in the
// browser so we created this stream api as a workaround. The streaming api is a simple wrapper around the fetch api that allows you to make streaming requests to the server.
export const OCULAR_BACKEND_URL =
process.env.OCULAR_BACKEND_URL || 'http://localhost:9000/v1';

export default async function ocularStreamingRequest(
method:
| 'GET'
| 'POST'
| 'PUT'
| 'DELETE'
| 'PATCH'
| 'OPTIONS'
| 'HEAD'
| 'CONNECT'
| 'TRACE',
path = '',
payload = {},
stream = false,
cancelTokenSource: AbortController | null
) {
const headers = {
'Access-Control-Allow-Origin': '*',
'Content-Type': 'application/json',
};

const options = {
method,
headers,
credentials: 'include',
body: JSON.stringify(payload),
signal: cancelTokenSource ? cancelTokenSource.signal : undefined,
};

if (method === 'GET') {
delete options.body;
}

const response = await fetch(`${OCULAR_BACKEND_URL}${path}`, options);

if (!response.ok) {
const error = await response.text();
console.error('Error', error);
throw new Error(error);
}

return response;
}
34 changes: 17 additions & 17 deletions packages/ocular/src/api/routes/member/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import { Router } from "express"
import { Router } from "express";

import auth from "./auth"
import middlewares from "../../middlewares"
import auth from "./auth";
import middlewares from "../../middlewares";
// import apps from "./apps"
// import components from "./member/components"
import search from "./search"
import chat from "./chat"
// import teams from "./member/teams"
import search from "./search";
import chat from "./chat";
import { ask } from "./search";
// import organisation from "./member/organisation"

export default (app, container, config) => {
const route = Router()
app.use("/",route)
const route = Router();
app.use("/", route);

// Unauthenticated Routes
auth(route)
auth(route);

// Authenticated routes
route.use(middlewares.authenticate())
route.use(middlewares.registeredLoggedinUser)
route.use(middlewares.authenticate());
route.use(middlewares.registeredLoggedinUser);

// // Authenticated routes
// route.use(middlewares.authenticate())
Expand All @@ -27,11 +27,11 @@ export default (app, container, config) => {
// apps(route)
// components(route)
// invites(route)
chat(route)
search(route)
// teams(route)
chat(route);
search(route);
ask(route);
// organisation(route)

// users(route)
return app
}
return app;
};
Loading