Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] add mle kaggle pipeline #191

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions mle/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ def start(ctx, mode, model):
elif mode == 'report':
# Report mode
return ctx.invoke(report, model=model, visualize=False)
elif mode == 'kaggle':
# Kaggle mode
return ctx.invoke(kaggle, model=model)
else:
raise ValueError("Invalid mode. Supported modes: 'baseline', 'report'.")
raise ValueError("Invalid mode. Supported modes: 'baseline', 'report', 'kaggle'.")


@cli.command()
Expand Down Expand Up @@ -108,6 +111,33 @@ def report(ctx, repo, model, user, visualize):
return workflow.report(os.getcwd(), repo, user, model)


@cli.command()
@click.option('--model', default=None, help='The model to use for the chat.')
def kaggle(model):
"""
kaggle: kaggle competition workflow.
"""
if not check_config(console):
return

config = get_config()
if "integration" not in config.keys():
config["integration"] = {}

if "kaggle" not in config.get("integration", {}).keys():
from mle.integration.kaggle import kaggle_login
username, key = kaggle_login()
config["integration"]["kaggle"] = {
"key": key,
"username": username,
}
write_config(config)

username = config["integration"]["kaggle"].get("username")
key = config["integration"]["kaggle"].get("key")
return workflow.kaggle(os.getcwd(), model, username, key)


@cli.command()
def chat():
"""
Expand Down Expand Up @@ -199,7 +229,8 @@ def new(name):


@cli.command()
def integrate():
@click.option('--reset', is_flag=True, help='Reset the integration')
def integrate(reset):
"""
integrate: integrate the third-party extensions.
"""
Expand All @@ -212,11 +243,11 @@ def integrate():

platform = questionary.select(
"Which platform do you want to integrate?",
choices=['GitHub', 'Google Calendar']
choices=['GitHub', 'Google Calendar', 'Kaggle']
).ask()

if platform == "GitHub":
if config.get("integration").get("github"):
if not reset and config.get("integration").get("github"):
print("GitHub is already integrated.")
else:
token = questionary.password(
Expand All @@ -230,11 +261,23 @@ def integrate():

elif platform == "Google Calendar":
from mle.integration.google_calendar import google_calendar_login
if get_config().get("integration").get("google_calendar"):
if not reset and get_config().get("integration").get("google_calendar"):
print("Google Calendar is already integrated.")
else:
token = google_calendar_login()
config["integration"]["google_calendar"] = {
"token": pickle.dumps(token, fix_imports=False),
}
write_config(config)

elif platform == "Kaggle":
from mle.integration.kaggle import kaggle_login
if not reset and get_config().get("integration").get("kaggle"):
print("Kaggle is already integrated.")
else:
username, key = kaggle_login()
config["integration"]["kaggle"] = {
"key": key,
"username": username,
}
write_config(config)
21 changes: 12 additions & 9 deletions mle/function/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ def preview_csv_data(path: str, limit_rows: int = 1) -> str:
:param limit_rows: the number of rows to preview.
:return: the sample dataset with metadata as a string.
"""
df = pd.read_csv(path)
num_rows = len(df)
columns = ', '.join(df.columns)
df_limited = df.head(limit_rows)
data_dict_list = df_limited.to_dict(orient='records')
data_dict_str = "\n".join([str(record) for record in data_dict_list])
try:
df = pd.read_csv(path)
num_rows = len(df)
columns = ', '.join(df.columns)
df_limited = df.head(limit_rows)
data_dict_list = df_limited.to_dict(orient='records')
data_dict_str = "\n".join([str(record) for record in data_dict_list])

return textwrap.dedent(f"""
Data file: {path}\nNumber of all rows: {num_rows}\nAll columns: {columns}\nData example:\n{data_dict_str}
""").strip()
return textwrap.dedent(f"""
Data file: {path}\nNumber of all rows: {num_rows}\nAll columns: {columns}\nData example:\n{data_dict_str}
""").strip()
except Exception as e:
return f"cannot read csv data: {e}"
1 change: 1 addition & 0 deletions mle/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .github import GitHubIntegration
from .google_calendar import GoogleCalendarIntegration, google_calendar_login
from .kaggle import KaggleIntegration, kaggle_login
108 changes: 108 additions & 0 deletions mle/integration/kaggle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import json
import requests
import importlib
import questionary

from zipfile import ZipFile


def kaggle_login():
"""
Kaggle login by retrieving the username and API key.
:returns: A tuple containing the Kaggle username and API key.
"""
kaggle_file = os.path.join(os.path.expanduser("~"), ".kaggle", "kaggle.json")

try:
# login by `~/.kaggle/kaggle.json`
with open(kaggle_file, "r") as f:
kaggle_data = json.load(f)
if questionary.confirm(
f"Find the kaggle token in `{kaggle_file}` "
f"(username: {kaggle_data['username']}).\n"
"Would you like to integrate this token?"
).ask():
return kaggle_data["username"], kaggle_data["key"]
finally:
pass

# login by manual input token
username = questionary.text("What is your Kaggle username?").ask()
key = questionary.password("What is your Kaggle key?").ask()
return username, key


class KaggleIntegration:

def __init__(self, username: str, token: str):
"""
Initializes KaggleIntegration with the provided credentials.
:param username: Kaggle username.
:param token: Kaggle API key.
"""
os.environ["KAGGLE_USERNAME"] = username
os.environ["KAGGLE_KEY"] = token

dependency = "kaggle"
spec = importlib.util.find_spec(dependency)
if spec is not None:
self.client = importlib.import_module(dependency).api
else:
raise ImportError(
"It seems you didn't install kaggle. In order to enable the Kaggle related features, "
"please make sure kaggle Python package has been installed. "
"More information, please refer to: https://www.kaggle.com/docs/api"
)

def list_competition(self):
"""
Lists all Kaggle competitions.
:return: A tuple containing references of all competitions.
"""
competitions = self.client.competitions_list()
return tuple([comp.ref for comp in competitions])

def download_competition_dataset(
self, competition: str, download_dir: str = "./data"
):
"""
Downloads and extracts the dataset for a specific competition.
:param competition: The URL or name of the Kaggle competition.
:param download_dir: Directory to save the downloaded files. Defaults to './data'.
:return: The directory where the dataset has been downloaded and extracted.
"""
if competition.startswith("https://www.kaggle.com/competitions/"):
competition = competition.split("/")[-1]

os.makedirs(download_dir, exist_ok=True)
self.client.competition_download_files(competition, path=download_dir)

# Unzip downloaded files
for file in os.listdir(download_dir):
if file.endswith(".zip"):
with ZipFile(os.path.join(download_dir, file), "r") as zip_ref:
zip_ref.extractall(download_dir)
return download_dir

def get_competition_overview(self, competition: str):
"""
Fetches competition content using Jina Reader and returns it as a dictionary.
:param competition: The URL or name of the Kaggle competition.
"""
SECTIONS = ["overview", "data"]
text_dict = {}
for section in SECTIONS:
for _ in range(3): # Retry 3 times if the request fails
try:
reader_url = f"https://r.jina.ai/{competition}/{section}"
response = requests.get(reader_url)
response.raise_for_status()
text_dict[section] = response.text
except requests.exceptions.HTTPError:
continue
return {
"url": competition,
"overview": text_dict["overview"],
"data": text_dict["data"],
}
1 change: 1 addition & 0 deletions mle/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def query(self, chat_history, **kwargs):
if len(search_attempts) > 3:
parameters['function_call'] = "none"
result = get_function(function_name)(**arguments)
chat_history.append({"role": "assistant", "function_call": dict(resp.function_call)})
chat_history.append({"role": "function", "content": result, "name": function_name})
return self.query(chat_history, **parameters)
else:
Expand Down
1 change: 1 addition & 0 deletions mle/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .baseline import baseline
from .report import report
from .kaggle import kaggle
100 changes: 100 additions & 0 deletions mle/workflow/kaggle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Kaggle Mode: the mode to generate ML pipeline for kaggle competitions.
"""
import os
import questionary
from rich.console import Console
from mle.model import load_model
from mle.utils import ask_text, WorkflowCache
from mle.agents import CodeAgent, DebugAgent, AdviseAgent, PlanAgent
from mle.integration import KaggleIntegration


def kaggle(work_dir: str, model=None, kaggle_username=None, kaggle_token=None):
"""
The workflow of the kaggle mode.
"""
console = Console()
cache = WorkflowCache(work_dir)
model = load_model(work_dir, model)
kaggle = KaggleIntegration(kaggle_username, kaggle_token)

if not cache.is_empty():
step = ask_text(f"MLE has finished the following steps: \n{cache}\n"
f"You can pick a step from 1 to {cache.current_step()} to resume\n"
"(or ENTER to continue the workflow)")
if step:
step = int(step)
for i in range(step, cache.current_step() + 1):
cache.remove(i) # remove the stale step caches

# ask for the kaggle competition
with cache(step=1, name="ask for the kaggle competition") as ca:
competition = ca.resume("competition")
dataset = ca.resume("dataset")
if competition is None or dataset is None:
competition = questionary.select(
"Please select a Kaggle competition to join:",
choices=kaggle.list_competition()
).ask()
with console.status("MLE Agent is downloading the kaggle competition dataset..."):
dataset = kaggle.download_competition_dataset(
competition, os.path.join(os.getcwd(), 'data'))
ca.store("competition", competition)
ca.store("dataset", dataset)

# ask for the user requirement
with cache(step=2, name="get the competition overview from kaggle") as ca:
ml_requirement = ca.resume("ml_requirement")
if ml_requirement is None:
with console.status("MLE Agent is fetching the kaggle competition overview..."):
overview = kaggle.get_competition_overview(competition)
ml_requirement = f"Finish a kaggle competition: {overview}"
ca.store("ml_requirement", ml_requirement)

# advisor agent gives suggestions in a report
with cache(step=3, name="MLE advisor agent provides a high-level report") as ca:
advisor_report = ca.resume("advisor_report")
if advisor_report is None:
advisor = AdviseAgent(model, console)
advisor_report = advisor.interact(
f"[green]User Requirement:[/green] {ml_requirement}\n"
f"Dataset is downloaded in path: {dataset}"
)
ca.store("advisor_report", advisor_report)

# plan agent generates the coding plan
with cache(step=4, name="MLE plan agent generates a dev plan") as ca:
coding_plan = ca.resume("coding_plan")
if coding_plan is None:
planner = PlanAgent(model, console)
coding_plan = planner.interact(advisor_report)
ca.store("coding_plan", coding_plan)

# code agent codes the tasks and debug with the debug agent
with cache(step=5, name="MLE code&debug agents start to work") as ca:
coder = CodeAgent(model, work_dir, console)
coder.read_requirement(advisor_report)
debugger = DebugAgent(model, console)

is_auto_mode = questionary.confirm(
"MLE developer is about to start to code.\n"
"Choose to debug or not (If no, MLE agent will only focus on coding tasks,"
" and you have to run and debug the code yourself)?"
).ask()

for current_task in coding_plan.get('tasks'):
code_report = coder.interact(current_task)
is_debugging = code_report.get('debug')

if is_auto_mode:
while True:
if is_debugging == 'true' or is_debugging == 'True':
with console.status("MLE Debug Agent is executing and debugging the code..."):
debug_report = debugger.analyze(code_report)
if debug_report.get('status') == 'success':
break
else:
code_report = coder.debug(current_task, debug_report)
else:
break
Loading