diff --git a/autogpt/commands/file_operations.py b/autogpt/commands/file_operations.py index c54fb0547c0a..9563883e4170 100644 --- a/autogpt/commands/file_operations.py +++ b/autogpt/commands/file_operations.py @@ -1,9 +1,10 @@ """File operations for AutoGPT""" from __future__ import annotations +import hashlib import os import os.path -from typing import Generator +from typing import Dict, Generator, Literal, Tuple import requests from colorama import Back, Fore @@ -17,31 +18,96 @@ CFG = Config() +Operation = Literal["write", "append", "delete"] -def check_duplicate_operation(operation: str, filename: str) -> bool: - """Check if the operation has already been performed on the given file + +def text_checksum(text: str) -> str: + """Get the hex checksum for the given text.""" + return hashlib.md5(text.encode("utf-8")).hexdigest() + + +def operations_from_log(log_path: str) -> Generator[Tuple[Operation, str, str | None]]: + """Parse the file operations log and return a tuple containing the log entries""" + try: + log = open(log_path, "r", encoding="utf-8") + except FileNotFoundError: + return + + for line in log: + line = line.replace("File Operation Logger", "").strip() + if not line: + continue + operation, tail = line.split(": ", maxsplit=1) + operation = operation.strip() + if operation in ("write", "append"): + try: + path, checksum = (x.strip() for x in tail.rsplit(" #", maxsplit=1)) + except ValueError: + path, checksum = tail.strip(), None + yield (operation, path, checksum) + elif operation == "delete": + yield (operation, tail.strip(), None) + + log.close() + + +def file_operations_state(log_path: str) -> Dict: + """Iterates over the operations log and returns the expected state. + + Parses a log file at CFG.file_logger_path to construct a dictionary that maps + each file path written or appended to its checksum. Deleted files are removed + from the dictionary. + + Returns: + A dictionary mapping file paths to their checksums. + + Raises: + FileNotFoundError: If CFG.file_logger_path is not found. + ValueError: If the log file content is not in the expected format. + """ + state = {} + for operation, path, checksum in operations_from_log(log_path): + if operation in ("write", "append"): + state[path] = checksum + elif operation == "delete": + del state[path] + return state + + +def is_duplicate_operation( + operation: Operation, filename: str, checksum: str | None = None +) -> bool: + """Check if the operation has already been performed Args: - operation (str): The operation to check for - filename (str): The name of the file to check for + operation: The operation to check for + filename: The name of the file to check for + checksum: The checksum of the contents to be written Returns: - bool: True if the operation has already been performed on the file + True if the operation has already been performed on the file """ - log_content = read_file(CFG.file_logger_path) - log_entry = f"{operation}: {filename}\n" - return log_entry in log_content + state = file_operations_state(CFG.file_logger_path) + if operation == "delete" and filename not in state: + return True + if operation == "write" and state.get(filename) == checksum: + return True + return False -def log_operation(operation: str, filename: str) -> None: +def log_operation(operation: str, filename: str, checksum: str | None = None) -> None: """Log the file operation to the file_logger.txt Args: - operation (str): The operation to log - filename (str): The name of the file the operation was performed on + operation: The operation to log + filename: The name of the file the operation was performed on + checksum: The checksum of the contents to be written """ - log_entry = f"{operation}: {filename}\n" - append_to_file(CFG.file_logger_path, log_entry, should_log=False) + log_entry = f"{operation}: {filename}" + if checksum is not None: + log_entry += f" #{checksum}" + logger.debug(f"Logging file operation: {log_entry}") + append_to_file(CFG.file_logger_path, f"{log_entry}\n", should_log=False) def split_file( @@ -90,8 +156,8 @@ def read_file(filename: str) -> str: with open(filename, "r", encoding="utf-8") as f: content = f.read() return content - except Exception as e: - return f"Error: {str(e)}" + except Exception as err: + return f"Error: {err}" def ingest_file( @@ -124,8 +190,8 @@ def ingest_file( memory.add(memory_to_add) logger.info(f"Done ingesting {num_chunks} chunks from {filename}.") - except Exception as e: - logger.info(f"Error while ingesting file '{filename}': {str(e)}") + except Exception as err: + logger.info(f"Error while ingesting file '{filename}': {err}") @command("write_to_file", "Write to file", '"filename": "", "text": ""') @@ -139,17 +205,18 @@ def write_to_file(filename: str, text: str) -> str: Returns: str: A message indicating success or failure """ - if check_duplicate_operation("write", filename): + checksum = text_checksum(text) + if is_duplicate_operation("write", filename, checksum): return "Error: File has already been updated." try: directory = os.path.dirname(filename) os.makedirs(directory, exist_ok=True) with open(filename, "w", encoding="utf-8") as f: f.write(text) - log_operation("write", filename) + log_operation("write", filename, checksum) return "File written to successfully." - except Exception as e: - return f"Error: {str(e)}" + except Exception as err: + return f"Error: {err}" @command( @@ -169,15 +236,17 @@ def append_to_file(filename: str, text: str, should_log: bool = True) -> str: try: directory = os.path.dirname(filename) os.makedirs(directory, exist_ok=True) - with open(filename, "a") as f: + with open(filename, "a", encoding="utf-8") as f: f.write(text) if should_log: - log_operation("append", filename) + with open(filename, "r", encoding="utf-8") as f: + checksum = text_checksum(f.read()) + log_operation("append", filename, checksum=checksum) return "Text appended successfully." - except Exception as e: - return f"Error: {str(e)}" + except Exception as err: + return f"Error: {err}" @command("delete_file", "Delete file", '"filename": ""') @@ -190,14 +259,14 @@ def delete_file(filename: str) -> str: Returns: str: A message indicating success or failure """ - if check_duplicate_operation("delete", filename): + if is_duplicate_operation("delete", filename): return "Error: File has already been deleted." try: os.remove(filename) log_operation("delete", filename) return "File deleted successfully." - except Exception as e: - return f"Error: {str(e)}" + except Exception as err: + return f"Error: {err}" @command("list_files", "List Files in Directory", '"directory": ""') @@ -266,7 +335,7 @@ def download_file(url, filename): spinner.update_message(f"{message} {progress}") return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(downloaded_size)})' - except requests.HTTPError as e: - return f"Got an HTTP Error whilst trying to download file: {e}" - except Exception as e: - return "Error: " + str(e) + except requests.HTTPError as err: + return f"Got an HTTP Error whilst trying to download file: {err}" + except Exception as err: + return f"Error: {err}" diff --git a/tests/unit/test_file_operations.py b/tests/unit/test_file_operations.py index e7ba2f719006..fb8300d47987 100644 --- a/tests/unit/test_file_operations.py +++ b/tests/unit/test_file_operations.py @@ -2,25 +2,19 @@ This set of unit tests is designed to test the file operations that autoGPT has access to. """ +import hashlib import os +import re +from io import TextIOWrapper from pathlib import Path -from tempfile import gettempdir import pytest +from pytest_mock import MockerFixture -from autogpt.commands.file_operations import ( - append_to_file, - check_duplicate_operation, - delete_file, - download_file, - list_files, - log_operation, - read_file, - split_file, - write_to_file, -) +import autogpt.commands.file_operations as file_ops from autogpt.config import Config from autogpt.utils import readable_file_size +from autogpt.workspace import Workspace @pytest.fixture() @@ -29,66 +23,186 @@ def file_content(): @pytest.fixture() -def test_file(workspace, file_content): - test_file = str(workspace.get_path("test_file.txt")) - with open(test_file, "w") as f: - f.write(file_content) - return test_file +def test_file_path(config, workspace: Workspace): + return workspace.get_path("test_file.txt") @pytest.fixture() -def test_directory(workspace): - return str(workspace.get_path("test_directory")) +def test_file(test_file_path: Path): + file = open(test_file_path, "w") + yield file + if not file.closed: + file.close() @pytest.fixture() -def test_nested_file(workspace): - return str(workspace.get_path("nested/test_file.txt")) +def test_file_with_content_path(test_file: TextIOWrapper, file_content): + test_file.write(file_content) + test_file.close() + file_ops.log_operation( + "write", test_file.name, file_ops.text_checksum(file_content) + ) + return Path(test_file.name) + +@pytest.fixture() +def test_directory(config, workspace: Workspace): + return workspace.get_path("test_directory") -def test_check_duplicate_operation(config, test_file): - log_operation("write", test_file) - assert check_duplicate_operation("write", test_file) is True + +@pytest.fixture() +def test_nested_file(config, workspace: Workspace): + return workspace.get_path("nested/test_file.txt") + + +def test_file_operations_log(test_file: TextIOWrapper): + log_file_content = ( + "File Operation Logger\n" + "write: path/to/file1.txt #checksum1\n" + "write: path/to/file2.txt #checksum2\n" + "write: path/to/file3.txt #checksum3\n" + "append: path/to/file2.txt #checksum4\n" + "delete: path/to/file3.txt\n" + ) + test_file.write(log_file_content) + test_file.close() + + expected = [ + ("write", "path/to/file1.txt", "checksum1"), + ("write", "path/to/file2.txt", "checksum2"), + ("write", "path/to/file3.txt", "checksum3"), + ("append", "path/to/file2.txt", "checksum4"), + ("delete", "path/to/file3.txt", None), + ] + assert list(file_ops.operations_from_log(test_file.name)) == expected + + +def test_file_operations_state(test_file: TextIOWrapper): + # Prepare a fake log file + log_file_content = ( + "File Operation Logger\n" + "write: path/to/file1.txt #checksum1\n" + "write: path/to/file2.txt #checksum2\n" + "write: path/to/file3.txt #checksum3\n" + "append: path/to/file2.txt #checksum4\n" + "delete: path/to/file3.txt\n" + ) + test_file.write(log_file_content) + test_file.close() + + # Call the function and check the returned dictionary + expected_state = { + "path/to/file1.txt": "checksum1", + "path/to/file2.txt": "checksum4", + } + assert file_ops.file_operations_state(test_file.name) == expected_state + + +def test_is_duplicate_operation(config, mocker: MockerFixture): + # Prepare a fake state dictionary for the function to use + state = { + "path/to/file1.txt": "checksum1", + "path/to/file2.txt": "checksum2", + } + mocker.patch.object(file_ops, "file_operations_state", lambda _: state) + + # Test cases with write operations + assert ( + file_ops.is_duplicate_operation("write", "path/to/file1.txt", "checksum1") + is True + ) + assert ( + file_ops.is_duplicate_operation("write", "path/to/file1.txt", "checksum2") + is False + ) + assert ( + file_ops.is_duplicate_operation("write", "path/to/file3.txt", "checksum3") + is False + ) + # Test cases with append operations + assert ( + file_ops.is_duplicate_operation("append", "path/to/file1.txt", "checksum1") + is False + ) + # Test cases with delete operations + assert file_ops.is_duplicate_operation("delete", "path/to/file1.txt") is False + assert file_ops.is_duplicate_operation("delete", "path/to/file3.txt") is True # Test logging a file operation -def test_log_operation(test_file, config): - file_logger_name = config.file_logger_path - if os.path.exists(file_logger_name): - os.remove(file_logger_name) +def test_log_operation(config: Config): + file_ops.log_operation("log_test", "path/to/test") + with open(config.file_logger_path, "r", encoding="utf-8") as f: + content = f.read() + assert f"log_test: path/to/test\n" in content + + +def test_text_checksum(file_content: str): + checksum = file_ops.text_checksum(file_content) + different_checksum = file_ops.text_checksum("other content") + assert re.match(r"^[a-fA-F0-9]+$", checksum) is not None + assert checksum != different_checksum - log_operation("log_test", test_file) - with open(config.file_logger_path, "r") as f: + +def test_log_operation_with_checksum(config: Config): + file_ops.log_operation("log_test", "path/to/test", checksum="ABCDEF") + with open(config.file_logger_path, "r", encoding="utf-8") as f: content = f.read() - assert f"log_test: {test_file}" in content + assert f"log_test: path/to/test #ABCDEF\n" in content # Test splitting a file into chunks def test_split_file(): content = "abcdefghij" - chunks = list(split_file(content, max_length=4, overlap=1)) + chunks = list(file_ops.split_file(content, max_length=4, overlap=1)) expected = ["abcd", "defg", "ghij"] assert chunks == expected -def test_read_file(test_file, file_content): - content = read_file(test_file) +def test_read_file(test_file_with_content_path: Path, file_content): + content = file_ops.read_file(test_file_with_content_path) assert content == file_content -def test_write_to_file(config, test_nested_file): +def test_write_to_file(test_file_path: Path): new_content = "This is new content.\n" - write_to_file(test_nested_file, new_content) - with open(test_nested_file, "r") as f: + file_ops.write_to_file(str(test_file_path), new_content) + with open(test_file_path, "r", encoding="utf-8") as f: content = f.read() assert content == new_content -def test_append_to_file(test_nested_file): +def test_write_file_logs_checksum(config: Config, test_file_path: Path): + new_content = "This is new content.\n" + new_checksum = file_ops.text_checksum(new_content) + file_ops.write_to_file(str(test_file_path), new_content) + with open(config.file_logger_path, "r", encoding="utf-8") as f: + log_entry = f.read() + assert log_entry == f"write: {test_file_path} #{new_checksum}\n" + + +def test_write_file_fails_if_content_exists(test_file_path: Path): + new_content = "This is new content.\n" + file_ops.log_operation( + "write", + str(test_file_path), + checksum=file_ops.text_checksum(new_content), + ) + result = file_ops.write_to_file(str(test_file_path), new_content) + assert result == "Error: File has already been updated." + + +def test_write_file_succeeds_if_content_different(test_file_with_content_path: Path): + new_content = "This is different content.\n" + result = file_ops.write_to_file(str(test_file_with_content_path), new_content) + assert result == "File written to successfully." + + +def test_append_to_file(test_nested_file: Path): append_text = "This is appended text.\n" - write_to_file(test_nested_file, append_text) + file_ops.write_to_file(test_nested_file, append_text) - append_to_file(test_nested_file, append_text) + file_ops.append_to_file(test_nested_file, append_text) with open(test_nested_file, "r") as f: content_after = f.read() @@ -96,24 +210,45 @@ def test_append_to_file(test_nested_file): assert content_after == append_text + append_text -def test_delete_file(config, test_file): - delete_file(test_file) - assert os.path.exists(test_file) is False - assert delete_file(test_file) == "Error: File has already been deleted." +def test_append_to_file_uses_checksum_from_appended_file( + config: Config, test_file_path: Path +): + append_text = "This is appended text.\n" + file_ops.append_to_file(test_file_path, append_text) + file_ops.append_to_file(test_file_path, append_text) + with open(config.file_logger_path, "r", encoding="utf-8") as f: + log_contents = f.read() + + digest = hashlib.md5() + digest.update(append_text.encode("utf-8")) + checksum1 = digest.hexdigest() + digest.update(append_text.encode("utf-8")) + checksum2 = digest.hexdigest() + assert log_contents == ( + f"append: {test_file_path} #{checksum1}\n" + f"append: {test_file_path} #{checksum2}\n" + ) + + +def test_delete_file(test_file_with_content_path: Path): + result = file_ops.delete_file(str(test_file_with_content_path)) + assert result == "File deleted successfully." + assert os.path.exists(test_file_with_content_path) is False -def test_delete_missing_file(test_file): - os.remove(test_file) +def test_delete_missing_file(config): + filename = "path/to/file/which/does/not/exist" + # confuse the log + file_ops.log_operation("write", filename, checksum="fake") try: - os.remove(test_file) - except FileNotFoundError as e: - error_string = str(e) - assert error_string in delete_file(test_file) + os.remove(filename) + except FileNotFoundError as err: + assert str(err) in file_ops.delete_file(filename) return - assert True, "Failed to test delete_file" + assert False, f"Failed to test delete_file; {filename} not expected to exist" -def test_list_files(config, workspace, test_directory): +def test_list_files(workspace: Workspace, test_directory: Path): # Case 1: Create files A and B, search for A, and ensure we don't return A and B file_a = workspace.get_path("file_a.txt") file_b = workspace.get_path("file_b.txt") @@ -131,7 +266,7 @@ def test_list_files(config, workspace, test_directory): with open(os.path.join(test_directory, file_a.name), "w") as f: f.write("This is file A in the subdirectory.") - files = list_files(str(workspace.root)) + files = file_ops.list_files(str(workspace.root)) assert file_a.name in files assert file_b.name in files assert os.path.join(Path(test_directory).name, file_a.name) in files @@ -144,26 +279,28 @@ def test_list_files(config, workspace, test_directory): # Case 2: Search for a file that does not exist and make sure we don't throw non_existent_file = "non_existent_file.txt" - files = list_files("") + files = file_ops.list_files("") assert non_existent_file not in files -def test_download_file(): +def test_download_file(config, workspace: Workspace): url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.2.2.tar.gz" - local_name = os.path.join(gettempdir(), "auto-gpt.tar.gz") + local_name = workspace.get_path("auto-gpt.tar.gz") size = 365023 readable_size = readable_file_size(size) assert ( - download_file(url, local_name) + file_ops.download_file(url, local_name) == f'Successfully downloaded and locally stored file: "{local_name}"! (Size: {readable_size})' ) assert os.path.isfile(local_name) is True assert os.path.getsize(local_name) == size url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.0.0.tar.gz" - assert "Got an HTTP Error whilst trying to download file" in download_file( + assert "Got an HTTP Error whilst trying to download file" in file_ops.download_file( url, local_name ) url = "https://thiswebsiteiswrong.hmm/v0.0.0.tar.gz" - assert "Failed to establish a new connection:" in download_file(url, local_name) + assert "Failed to establish a new connection:" in file_ops.download_file( + url, local_name + )