Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Some Update #153

Merged
merged 1 commit into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 276 additions & 24 deletions bardapi/core_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import json
import base64
from langdetect import detect
import uuid
from re import search
from httpx import AsyncClient
Expand Down Expand Up @@ -57,7 +58,8 @@ def __init__(
timeout=self.timeout,
proxies=self.proxies,
)
self.language = language or os.getenv("_BARD_API_LANG", "en")
self.language = language
self.cookie_dict = {"__Secure-1PSID": self.token}
self.run_code = run_code or False
self.google_translator_api_key = google_translator_api_key

Expand All @@ -66,10 +68,15 @@ async def get_answer(self, input_text: str) -> dict:
Get an answer from the Bard API for the given input text.

Example:
>>> token = 'xxxxxxxxxx'
>>> bard = Bard(token=token)
>>> response = bard.get_answer("나와 내 동년배들이 좋아하는 뉴진스에 대해서 알려줘")
>>> print(response['content'])
>>> import asyncio
>>>
>>> async def main():
>>> token = 'xxxxxxxxxx'
>>> bard = BardAsync(token=token)
>>> response = await bard.get_answer("나와 내 동년배들이 좋아하는 뉴진스에 대해서 알려줘")
>>> print(response['content'])
>>>
>>> asyncio.run(main())

Args:
input_text (str): Input text for the query.
Expand Down Expand Up @@ -135,7 +142,7 @@ async def get_answer(self, input_text: str) -> dict:
timeout=self.timeout,
follow_redirects=True,
headers=SESSION_HEADERS,
cookies={"__Secure-1PSID": self.token},
cookies=self.cookie_dict,
)

# Post-processing of response
Expand Down Expand Up @@ -191,11 +198,10 @@ async def get_answer(self, input_text: str) -> dict:

# Get code
try:
code = parsed_answer[4][0][1][0].split("```")[1][6:]
except Exception as e:
# TODO:
# handle exception using logging instead
code = None
langcode = parsed_answer[4][0][1][0].split("```")[1].split("\n")[0].strip()
code = parsed_answer[4][0][1][0].split("```")[1][len(langcode):]
except Exception:
langcode, code = None, None

# Returned dictionary object
bard_answer = {
Expand All @@ -207,6 +213,7 @@ async def get_answer(self, input_text: str) -> dict:
"choices": [{"id": x[0], "content": x[1]} for x in parsed_answer[4]],
"links": extract_links(parsed_answer[4]),
"images": images,
"langCode": langcode,
"code": code,
"status_code": resp.status_code,
}
Expand Down Expand Up @@ -234,14 +241,21 @@ async def get_answer(self, input_text: str) -> dict:

return bard_answer

async def speech(self, input_text: str, lang="en-US") -> dict:
async def speech(self, input_text: str, lang: str = "en-US") -> dict:
"""
Get speech audio from Bard API for the given input text.

Example:
>>> token = 'xxxxxxxxxx'
>>> bard = Bard(token=token)
>>> audio = bard.speech("hello!")
>>> import asyncio
>>>
>>> async def main():
>>> token = 'xxxxxxxxxx'
>>> bard = BardAsync(token=token)
>>> audio = await bard.speech("Hello")
>>> with open("bard.ogg", "wb") as f:
>>> f.write(bytes(audio))
>>>
>>> asyncio.run(main())

Args:
input_text (str): Input text for the query.
Expand Down Expand Up @@ -315,18 +329,194 @@ async def _get_snim0e(self):
"SNlM0e value not found in response. Check __Secure-1PSID value."
)
return snim0e.group(1)

async def export_conversation(self, bard_answer, title: str = "") -> str:
"""
Get Share URL for specifc answer from bard

Example:
>>> import asyncio
>>>
>>> async def main():
>>> token = 'xxxxxxxxxx'
>>> bard = BardAsync(token=token)
>>> bard_answer = await bard.get_answer("hello!")
>>> url = await bard.export_conversation(bard_answer, title="Export Conversation")
>>> print(url)
>>>
>>> asyncio.run(main())

Args:
bard_answer (dict): bard_answer returned from get_answer
title (str): Title for URL
Returns:
string: public URL you can share
"""
conv_id = bard_answer["conversation_id"]
resp_id = bard_answer["response_id"]
choice_id = bard_answer["choices"][0]["id"]
params = {
"rpcids": "fuVx7",
"source-path": "/",
"bl": "boq_assistant-bard-web-server_20230713.13_p0",
# '_reqid': str(self._reqid),
"rt": "c",
}
input_data_struct = [
[
[
"fuVx7",
json.dumps(
[
[
None,
[
[
[conv_id, resp_id],
None,
None,
[[], [], [], choice_id, []],
]
],
[0, title],
]
]
),
None,
"generic",
]
]
]
data = {
"f.req": json.dumps(input_data_struct),
"at": await self._get_snim0e(),
}
resp = await self.client.post(
"https://bard.google.com/_/BardChatUi/data/batchexecute",
params=params,
data=data,
timeout=self.timeout,
)
# Post-processing of response
resp_dict = json.loads(resp.content.splitlines()[3])
url_id = json.loads(resp_dict[0][2])[2]
url = f"https://g.co/bard/share/{url_id}"
# Increment request ID
self._reqid += 100000
return url

async def export_replit(
self, code: str, langcode: str = None, filename: str = None, **kwargs
) -> str:
"""
Get Export URL to repl.it from code

Example:
>>> import asyncio
>>>
>>> async def main():
>>> token = 'xxxxxxxxxx'
>>> bard = BardAsync(token=token)
>>> bard_answer = await bard.get_answer("code python to print hello world")
>>> url = await bard.export_replit(bard_answer['code'], bard_answer['langCode'])
>>> print(url)
>>>
>>> asyncio.run(main())

Args:
code (str): source code
langcode (str): code language
filename (str): filename for code language
**kwargs: instructions, source_path
Returns:
string: export URL to create repl
"""
params = {
"rpcids": "qACoKe",
"source-path": kwargs.get("source_path", "/"),
"bl": "boq_assistant-bard-web-server_20230718.13_p2",
"_reqid": str(self._reqid),
"rt": "c",
}
support_langs = {
"python": "main.py",
"javascript": "index.js",
"go": "main.go",
"java": "Main.java",
"kotlin": "Main.kt",
"php": "index.php",
"c#": "main.cs",
"swift": "main.swift",
"r": "main.r",
"ruby": "main.rb",
"c": "main.c",
"c++": "main.cpp",
"matlab": "main.m",
"typescript": "main.ts",
"scala": "main.scala",
"sql": "main.sql",
"html": "index.html",
"css": "style.css",
"nosql": "main.nosql",
"rust": "main.rs",
"perl": "main.pl",
}
# Reference: https://github.com/jincheng9/markdown_supported_languages
if langcode not in support_langs and filename is None:
raise Exception(
f"Language {langcode} not supported, please set filename manually."
)

filename = (
support_langs.get(langcode, filename) if filename is None else filename
)
input_data_struct = [
[
[
"qACoKe",
json.dumps(
[kwargs.get("instructions", ""), 5, code, [[filename, code]]]
),
None,
"generic",
]
]
]
data = {
"f.req": json.dumps(input_data_struct),
"at": await self._get_snim0e(),
}

resp = await self.client.post(
"https://bard.google.com/_/BardChatUi/data/batchexecute",
params=params,
data=data,
timeout=self.timeout
)

resp_dict = json.loads(resp.content.splitlines()[3])
url = json.loads(resp_dict[0][2])[0]
# increment request ID
self._reqid += 100000

return url

async def ask_about_image(
self, input_text: str, image: bytes, lang="en-GB"
self, input_text: str, image: bytes, lang: str = None
) -> dict:
"""
Send Bard image along with question and get answer async mode

Example:
>>> token = 'xxxxxxxxxx'
>>> bard = Bard(token=token)
>>> image = open('image.jpg', 'rb').read()
>>> bard_answer = bard.analyze_image("what is in the image?", image)
>>> import asyncio
>>>
>>> async def main():
>>> token = 'xxxxxxxxxx'
>>> bard = BardAsync(token=token)
>>> image = open('image.jpg', 'rb').read()
>>> bard_answer = await bard.ask_about_image("what is in the image?", image)
>>> print(bard_answer)
>>>
>>> asyncio.run(main())

Args:
input_text (str): Input text for the query.
Expand All @@ -351,14 +541,45 @@ async def ask_about_image(
if not isinstance(self.SNlM0e, str):
self.SNlM0e = await self.SNlM0e

if self.google_translator_api_key is not None:
google_official_translator = translate.Client(
api_key=self.google_translator_api_key
)
else:
translator_to_eng = GoogleTranslator(source="auto", target="en")

# Set language (optional)
if (
(self.language is not None or lang is not None)
and self.language not in ALLOWED_LANGUAGES
and self.google_translator_api_key is None
):
translator_to_eng = GoogleTranslator(source="auto", target="en")
transl_text = translator_to_eng.translate(input_text)
elif (
(self.language is not None or lang is not None)
and self.language not in ALLOWED_LANGUAGES
and self.google_translator_api_key is not None
):
transl_text = google_official_translator.translate(
input_text, target_language="en"
)
elif (
(self.language is None or lang is None)
and self.language not in ALLOWED_LANGUAGES
and self.google_translator_api_key is None
):
translator_to_eng = GoogleTranslator(source="auto", target="en")
transl_text = translator_to_eng.translate(input_text)

# Supported format: jpeg, png, webp
image_url = upload_image(image)

input_data_struct = [
None,
[
[input_text, 0, None, [[[image_url, 1], "uploaded_photo.jpg"]]],
[lang],
[transl_text, 0, None, [[[image_url, 1], "uploaded_photo.jpg"]]],
[lang if lang is not None else self.language],
["", "", ""],
"", # Unknown random string value (1000 characters +)
uuid.uuid4().hex, # Should be random uuidv4 (32 characters)
Expand Down Expand Up @@ -395,10 +616,41 @@ async def ask_about_image(
f"\nPlease double-check the cookie values and verify your network environment or google account."
}
parsed_answer = json.loads(resp_dict)
content = parsed_answer[4][0][1][0]
if self.language is not None and self.google_translator_api_key is None:
translator = GoogleTranslator(source="en", target=self.language)
transl_content = translator.translate(content)

elif lang is not None and self.google_translator_api_key is None:
translator = GoogleTranslator(source="en", target=lang)
transl_content = translator.translate(content)

elif (
lang is None and self.language is None
) and self.google_translator_api_key is None:
us_lang = detect(input_text)
translator = GoogleTranslator(source="en", target=us_lang)
transl_content = translator.translate(content)

elif self.language is not None and self.google_translator_api_key is not None:
transl_content = google_official_translator.translate(
content, target_language=self.language
)
elif lang is not None and self.google_translator_api_key is not None:
transl_content = google_official_translator.translate(
content, target_language=lang
)
elif (
self.language is None and lang is None
) and self.google_translator_api_key is not None:
us_lang = detect(input_text)
transl_content = google_official_translator.translate(
content, target_language=us_lang
)

# Returnd dictionary object
bard_answer = {
"content": parsed_answer[4][0][1][0],
"content": transl_content,
"conversation_id": parsed_answer[1][0],
"response_id": parsed_answer[1][1],
"factualityQueries": parsed_answer[3],
Expand Down
Loading
Loading