Skip to content

Commit

Permalink
Merge pull request #128 from PrefectHQ/generate-subtasks
Browse files Browse the repository at this point in the history
Add generate_subtasks
  • Loading branch information
jlowin authored Jun 18, 2024
2 parents f6baa93 + e16e456 commit a7ee2a2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 115 deletions.
17 changes: 17 additions & 0 deletions src/controlflow/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,23 @@ def mark_skipped(self):
return f"{self.friendly_name()} marked skipped by {agent.name}."
return f"{self.friendly_name()} marked skipped."

def generate_subtasks(self, instructions: str = None, agent: Agent = None):
"""
Generate subtasks for this task based on the provided instructions.
Subtasks can reuse the same tools and agents as this task.
"""
from controlflow.planning.plan import create_plan

# enter a context to set the parent task
with self:
create_plan(
self.objective,
instructions=instructions,
planning_agent=agent,
agents=self.agents,
tools=self.tools,
)


def generate_result_schema(result_type: type[T]) -> type[T]:
result_schema = None
Expand Down
34 changes: 7 additions & 27 deletions src/controlflow/planning/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,13 @@ class PlanTask(ControlFlowModel):
)


class Plan(ControlFlowModel):
objective: str = Field(description="The overall objective of the plan.")
instructions: Optional[str] = Field(
None, description="Any optional instructions for carrying out the plan."
)
tasks: list[PlanTask] = Field(
[],
description="The tasks that make up the plan.",
)


def plan(
def create_plan(
objective: str,
instructions: str = None,
planning_agent: Agent = None,
agents: list[Agent] = None,
tools: list[Union[callable, Tool]] = None,
) -> Task:
) -> list[Task]:
"""
Given an objective and instructions for achieving it, generate a plan for
completing the objective. Each step of the plan will be turned into a task
Expand All @@ -71,10 +60,6 @@ def plan(
Each task should be a discrete, actionable step that contributes to the overall objective.
## Objective + Instructions
This is the main objective of your plan. Ultimately it will be turned into a parent task of all the plan tasks you create.
## Tasks
- Use `depends_on` to indicate which tasks must be completed before others can start. Tasks can only depend on tasks that come before them in your plan.
- Use `parent` to indicate tasks that are subtasks of others.
Expand All @@ -86,30 +71,25 @@ def plan(
plan_tools=tool_dict,
),
agents=[planning_agent] if planning_agent else None,
result_type=Plan,
result_type=list[PlanTask],
)

# create a new flow to avoid polluting the main flow's history
with Flow():
task.run()

plan: Plan = task.result
plan: list[PlanTask] = task.result

parent_task = Task(
objective=plan.objective,
instructions=plan.instructions,
agents=[planning_agent] if planning_agent else None,
)
task_ids = {}

for t in plan.tasks:
for t in plan:
task_ids[t.id] = Task(
objective=t.objective,
instructions=t.instructions,
depends_on=[task_ids[i] for i in t.depends_on],
parent=parent_task if t.parent is None else task_ids[t.parent],
parent=task_ids[t.parent] if t.parent else None,
agents=[agent_dict[i] for i in t.agents] if t.agents else None,
tools=[tool_dict[i] for i in t.tools],
)

return parent_task
return list(task_ids.values())
Empty file.
88 changes: 0 additions & 88 deletions tests/core/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,91 +311,3 @@ def test_task_with_subtasks_and_dependencies_graph(self):
and edge.type == EdgeType.SUBTASK
for edge in graph.edges
)


# @pytest.mark.usefixtures("mock_run")
# class TestTaskRun:
# def test_run_task_max_iterations(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# with Flow():
# with pytest.raises(ValueError):
# task.run()

# assert mock_run.await_count == 3

# def test_run_task_mark_successful(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# def mark_complete():
# task.mark_successful()

# mock_run.side_effect = mark_complete
# with Flow():
# result = task.run()
# assert task.is_successful()
# assert result is None

# def test_run_task_mark_successful_with_result(self, mock_run: AsyncMock):
# task = Task(objective="Say hello", result_type=int)

# def mark_complete():
# task.mark_successful(result=42)

# mock_run.side_effect = mark_complete
# with Flow():
# result = task.run()
# assert task.is_successful()
# assert result == 42

# def test_run_task_mark_failed(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# def mark_complete():
# task.mark_failed(message="Failed to say hello")

# mock_run.side_effect = mark_complete
# with Flow():
# with pytest.raises(ValueError):
# task.run()
# assert task.is_failed()
# assert task.error == "Failed to say hello"

# def test_run_task_outside_flow(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# def mark_complete():
# task.mark_successful()

# mock_run.side_effect = mark_complete
# result = task.run()
# assert task.is_successful()
# assert result is None

# def test_run_task_outside_flow_fails_if_strict_flows_enforced(
# self, mock_run: AsyncMock
# ):
# task = Task(objective="Say hello")

# with temporary_settings(strict_flow_context=True):
# with pytest.raises(ValueError):
# task.run()

# def test_task_run_once_outside_flow_fails(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# with pytest.raises(ValueError):
# task.run_once()

# def test_task_run_once_with_passed_flow(self, mock_run: AsyncMock):
# task = Task(objective="Say hello")

# def mark_complete():
# task.mark_successful()

# mock_run.side_effect = mark_complete
# flow = Flow()
# while task.is_incomplete():
# task.run_once(flow=flow)
# assert task.is_successful()
# assert task.result is None

0 comments on commit a7ee2a2

Please sign in to comment.