-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial commit for RAG in py-ml #427
base: main
Are you sure you want to change the base?
Conversation
DCO is missing |
43e01d3
to
db78131
Compare
opensearch_py_ml/ml_commons/rag_pipeline/rag/opensearch_class.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really cool stuff here! 📖 🤖 💬 Lets try and refactor some code so its reusable across classes also lets apply SRP so that a class isnt burdened by doing a lot at once.
Make sure to add documentation to files and methods.
Also Im seeing there exists a query python file within py-ml https://github.com/opensearch-project/opensearch-py-ml/blame/main/opensearch_py_ml/query.py if possible maybe we can aggregate to existing code?
Lastly lets come up with a great description of the feature. You put a lot of effort so lets make it visually appealing in the PR description (diagrams, how to use, gifs, concise summary; emojis) You can use this as influence opensearch-project/neural-search#933 talk to @dhrubo-os if maybe we want to open up a Issue and then link this PR not sure if thats too much.
Great work!
self.aws_region = config.get('region') | ||
self.index_name = config.get('index_name') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there input validation involved? maybe we can catch this earlier so it doesn't have to be a headache later?
opensearch_py_ml/ml_commons/rag_pipeline/rag/opensearch_class.py
Outdated
Show resolved
Hide resolved
DCO is missing. |
ced6a59
to
3260f3a
Compare
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
…ffering a suggested default value with the flexibility for users to enter a custom value if needed. Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
…arametrs, added more functionality to user setup with knn index, fixed chunking proccessor to actually chunk rather, more intutituve user setup Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
…aving method wrappers around them Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
…ctor fields, like name etc, updated knn index details and chunking details Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
opensearch_py_ml/ml_commons/rag_pipeline/rag/AIConnectorHelper.py
Outdated
Show resolved
Hide resolved
opensearch_py_ml/ml_commons/rag_pipeline/rag/AIConnectorHelper.py
Outdated
Show resolved
Hide resolved
Removed references to serverless. Now the code only supports managed and open-source service types. Removed the menu option for serverless. For search methods, removed “neural” and only use “semantic” search. Added a prompt to choose between semantic search with LLM and semantic search without LLM. If semantic with LLM is chosen and the service type is managed, the code will prompt for LLM configuration. If open-source is chosen, we skip AWS/Bedrock configurations and do not prompt for LLM registration since that requires AWS Bedrock. Signed-off-by: hmumtazz <144855436+hmumtazz@users.noreply.github.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
…tsHelper Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
… creating a connector Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets put a new line on the following files thats probably why its not able to pick up the license header
No license header found in:
- opensearch_py_ml/ml_commons/rag_pipeline/init.py
- opensearch_py_ml/ml_commons/rag_pipeline/rag/init.py
- opensearch_py_ml/ml_commons/rag_pipeline/rag/rag.py
- opensearch_py_ml/ml_commons/rag_pipeline/rag/ml_models/init.py
…ues. Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
@hmumtazz lint is still failing:
|
Integ tests are failing too!! Please fix these two |
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
Signed-off-by: hmumtazz <hashimkmumtaz@gmail.com>
:param role_name: Name of the IAM role. | ||
:return: True if the role exists, False otherwise. | ||
""" | ||
iam_client = boto3.client("iam") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having initializing the same client in every method, can we init this client in __init__
? This is reduce the redundancy.
# Attach the inline policy to the role | ||
iam_client.put_role_policy( | ||
RoleName=role_name, | ||
PolicyName="InlinePolicy", # Replace with preferred policy name if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can cause issue if a policy with the same name exists.
- We can take a input for policy name.
- if the policy name is not provided then we can generate a random policy name:
# Generate a default policy name if none is provided
if policy_name is None:
policy_name = f"InlinePolicy-{role_name}-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
if e.response["Error"]["Code"] == "NoSuchEntity": | ||
return False | ||
else: | ||
print(f"An error occurred: {e}") | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are returning False anyway, what's the point of having if and else block here? may be add this: print(f"The requested role {role_name} does not exist")
in the if block and then after if/else we can return False to remove duplicate line.
print(f"Error creating the role: {e}") | ||
return None | ||
|
||
def get_role_arn(self, role_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be combine both of these two methods:
import boto3
import json
from botocore.exceptions import ClientError
def get_role_info(role_name, include_details=False):
"""
Retrieve information about an IAM role.
:param role_name: Name of the IAM role.
:param include_details: Whether to include full role details.
:return: ARN of the role or detailed information as a dictionary, or None if not found.
"""
if not role_name:
return None
iam_client = boto3.client("iam")
try:
response = iam_client.get_role(RoleName=role_name)
role = response["Role"]
role_arn = role["Arn"]
if not include_details:
return role_arn
# Retrieve additional details if requested
role_details = {
"RoleName": role["RoleName"],
"RoleId": role["RoleId"],
"Arn": role_arn,
"CreationDate": role["CreateDate"],
"AssumeRolePolicyDocument": role["AssumeRolePolicyDocument"],
"InlinePolicies": {},
}
# List and retrieve all inline policies
list_role_policies_response = iam_client.list_role_policies(RoleName=role_name)
for policy_name in list_role_policies_response["PolicyNames"]:
get_role_policy_response = iam_client.get_role_policy(
RoleName=role_name, PolicyName=policy_name
)
role_details["InlinePolicies"][policy_name] = get_role_policy_response[
"PolicyDocument"
]
return role_details
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
print(f"Role {role_name} does not exist.")
else:
print(f"An error occurred: {e}")
return None
print(f"An error occurred: {e}") | ||
return None | ||
|
||
def assume_role(self, role_arn, role_session_name="your_session_name"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def assume_role(role_arn, role_session_name=None, session=None):
"""
Assume an IAM role and obtain temporary security credentials.
:param role_arn: ARN of the IAM role to assume.
:param role_session_name: Identifier for the assumed role session. Defaults to a unique session name.
:param session: Optional boto3 session object. Defaults to the default session.
:return: Dictionary with temporary security credentials and metadata, or error details on failure.
"""
if not role_arn:
logging.error("Role ARN is required.")
return {"error": "Role ARN is required."}
# Use the provided session or default session
sts_client = session.client("sts") if session else boto3.client("sts")
# Generate a unique session name if not provided
role_session_name = role_session_name or f"session-{uuid.uuid4()}"
try:
assumed_role_object = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
)
# Extract temporary credentials and metadata
temp_credentials = assumed_role_object["Credentials"]
expiration = temp_credentials["Expiration"]
logging.info(
f"Assumed role: {role_arn}. Temporary credentials valid until: {expiration}"
)
return {
"credentials": {
"AccessKeyId": temp_credentials["AccessKeyId"],
"SecretAccessKey": temp_credentials["SecretAccessKey"],
"SessionToken": temp_credentials["SessionToken"],
},
"expiration": expiration,
"session_name": role_session_name,
}
except ClientError as e:
error_code = e.response["Error"]["Code"]
logging.error(f"Error assuming role {role_arn}: {error_code} - {e}")
return None
```
what do you think about this?
:param aws_role_name: AWS IAM role name. | ||
:param opensearch_domain_arn: ARN of the OpenSearch domain. | ||
""" | ||
self.region = region |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's re-evaluate which class variables are necessary here. I think most of them are unnecessary here.
:return: True if the secret exists, False otherwise. | ||
""" | ||
# Initialize the Secrets Manager client | ||
secretsmanager = boto3.client("secretsmanager", region_name=self.region) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, let's use this variable as a class variable.
# Any modifications Copyright OpenSearch Contributors. See | ||
# GitHub history for details. | ||
|
||
# SPDX-License-Identifier: Apache-2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why two times?
logger.error(f"An error occurred: {e}") | ||
return False | ||
|
||
def get_secret_arn(self, secret_name: str) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you think about combining these two methods:
def get_secret_details(secret_name: str, fetch_value: bool = False) -> dict:
"""
Retrieve details of a secret from AWS Secrets Manager.
:param secret_name: Name of the secret.
:param fetch_value: Whether to fetch the secret value (default is False).
:return: A dictionary with secret details (ARN and optionally the secret value) or error information.
"""
secretsmanager = boto3.client("secretsmanager", region_name=self.region)
try:
# Describe the secret to get its ARN
secret_details = {}
response = secretsmanager.describe_secret(SecretId=secret_name)
secret_details["ARN"] = response["ARN"]
# Fetch the secret value if requested
if fetch_value:
secret_value_response = secretsmanager.get_secret_value(SecretId=secret_name)
secret_details["SecretValue"] = secret_value_response.get("SecretString")
return secret_details
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ResourceNotFoundException":
logger.warning(f"The requested secret {secret_name} was not found")
else:
logger.error(f"An error occurred while fetching secret {secret_name}: {e}")
return {"error": str(e), "error_code": error_code}
```
# Any modifications Copyright OpenSearch Contributors. See | ||
# GitHub history for details. | ||
|
||
# SPDX-License-Identifier: Apache-2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why two times?
Description
[Describe what this change achieves]
Issues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.