Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/goose/toolkit/developer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from subprocess import CompletedProcess, run
from typing import List
from typing import List, Dict
import os
from goose.utils.check_shell_command import is_dangerous_command

from exchange import Message
Expand Down Expand Up @@ -30,6 +31,10 @@ class Developer(Toolkit):
We also include some default shell strategies in the prompt, such as using ripgrep
"""

def __init__(self, *args: object, **kwargs: Dict[str, object]) -> None:
super().__init__(*args, **kwargs)
self.timestamps: Dict[str, float] = {}

def system(self) -> str:
"""Retrieve system configuration details for developer"""
hints_path = Path(".goosehints")
Expand Down Expand Up @@ -65,7 +70,7 @@ def update_plan(self, tasks: List[dict]) -> List[dict]:
table.add_column("Status", justify="left")

# Mapping of statuses to emojis for better visual representation in the table.
emoji = {"planned": "⏳", "complete": "✅", "failed": "❌", "in-progress": "🕓"}
emoji = {"planned": "⏳", "complete": "✅", "failed": "❌", "in-progress": "🕑"}
for i, entry in enumerate(tasks):
table.add_row(str(i), entry["description"], emoji[entry["status"]])

Expand Down Expand Up @@ -124,6 +129,8 @@ def read_file(self, path: str) -> str:
language = get_language(path)
content = Path(path).expanduser().read_text()
self.notifier.log(Panel.fit(Markdown(f"```\ncat {path}\n```"), box=box.MINIMAL))
# Record the last read timestamp
self.timestamps[path] = os.path.getmtime(path)
return f"```{language}\n{content}\n```"

@tool
Expand Down Expand Up @@ -183,12 +190,24 @@ def write_file(self, path: str, content: str) -> str:
# this method is dynamically attached to functions in the Goose framework
self.notifier.log(Panel.fit(Markdown(md), title=path))

# Prepare the path and create any necessary parent directories
_path = Path(path)
if path in self.timestamps:
last_read_timestamp = self.timestamps.get(path, 0.0)
current_timestamp = os.path.getmtime(path)
if current_timestamp > last_read_timestamp:
raise RuntimeError(
f"File '{path}' has been modified since it was last read."
+ " Read the file to incorporate changes or update your plan."
)

# Prepare the path and create any necessary parent directories
_path.parent.mkdir(parents=True, exist_ok=True)

# Write the content to the file
_path.write_text(content)

# Update the last read timestamp after writing to the file
self.timestamps[path] = os.path.getmtime(path)

# Return a success message
return f"Succesfully wrote to {path}"
return f"Successfully wrote to {path}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

21 changes: 21 additions & 0 deletions tests/toolkit/test_developer.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,24 @@ def test_write_file(temp_dir, developer_toolkit):
content = "Hello World"
developer_toolkit.write_file(test_file.as_posix(), content)
assert test_file.read_text() == content


def test_write_file_prevent_write_if_changed(temp_dir, developer_toolkit):
test_file = temp_dir / "test.txt"
content = "Hello World"
updated_content = "Hello Universe"

# Initial write to record the timestamp
developer_toolkit.write_file(test_file.as_posix(), content)
developer_toolkit.read_file(test_file.as_posix())

import time

# Modify file externally to simulate change
time.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use os.utime(test_file, (mtime, mtime)) to set the file time instead of sleep

test_file.write_text(updated_content)

# Try to write through toolkit and check for the raised exception
with pytest.raises(RuntimeError, match="has been modified"):
developer_toolkit.write_file(test_file.as_posix(), content)
assert test_file.read_text() == updated_content