Skip to content

Commit

Permalink
Merge pull request #695 from PrefectHQ/long-app
Browse files Browse the repository at this point in the history
parent app for slackbot
  • Loading branch information
zzstoatzz authored Dec 19, 2023
2 parents b17567b + 636deec commit e0b4803
Show file tree
Hide file tree
Showing 17 changed files with 497 additions and 218 deletions.
Empty file removed cookbook/apps/__init__.py
Empty file.
15 changes: 0 additions & 15 deletions cookbook/apps/agent.py

This file was deleted.

50 changes: 0 additions & 50 deletions cookbook/apps/chatbot.py

This file was deleted.

27 changes: 0 additions & 27 deletions cookbook/apps/multi_agent.py

This file was deleted.

27 changes: 0 additions & 27 deletions cookbook/apps/todo.py

This file was deleted.

2 changes: 1 addition & 1 deletion cookbook/flows/github_digest.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def daily_github_digest(username: str, gh_token_secret_name: str):
if __name__ == "__main__":
import asyncio

marvin.settings.llm_model = "gpt-4"
marvin.settings.openai.chat.completions.model = "gpt-4"

asyncio.run(
daily_github_digest(username="zzstoatzz", gh_token_secret_name="github-token")
Expand Down
154 changes: 154 additions & 0 deletions cookbook/slackbot/parent_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import asyncio
from contextlib import asynccontextmanager

from fastapi import FastAPI
from marvin import ai_fn
from marvin.beta.assistants import Assistant
from marvin.beta.assistants.applications import AIApplication
from marvin.kv.json_block import JSONBlockKV
from marvin.utilities.logging import get_logger
from prefect.events import Event, emit_event
from prefect.events.clients import PrefectCloudEventSubscriber
from prefect.events.filters import EventFilter
from pydantic import confloat
from typing_extensions import TypedDict
from websockets.exceptions import ConnectionClosedError


class Lesson(TypedDict):
relevance: confloat(ge=0, le=1)
heuristic: str | None


@ai_fn(model="gpt-3.5-turbo-1106")
def take_lesson_from_interaction(
transcript: str, assistant_instructions: str
) -> Lesson:
"""You are an expert counselor, and you are teaching Marvin how to be a better assistant.
Here is the transcript of an interaction between Marvin and a user:
{{ transcript }}
... and here is the stated purpose of the assistant:
{{ assistant_instructions }}
how directly relevant to the assistant's purpose is this interaction?
- if not at all, relevance = 0 & heuristic = None. (most of the time)
- if very, relevance >= 0.5, <1 & heuristic = "1 SHORT SENTENCE (max) summary of a generalizable lesson".
"""


logger = get_logger("PrefectEventSubscriber")


def excerpt_from_event(event: Event) -> str:
"""Create an excerpt from the event - TODO jinja this"""
user_name = event.payload.get("user").get("name")
user_id = event.payload.get("user").get("id")
user_message = event.payload.get("user_message")
ai_response = event.payload.get("ai_response")

return (
f"{user_name} ({user_id}) said: {user_message}"
f"\n\nMarvin (the assistant) responded with: {ai_response}"
)


async def update_parent_app_state(app: AIApplication, event: Event):
event_excerpt = excerpt_from_event(event)
lesson = take_lesson_from_interaction(
event_excerpt, event.payload.get("ai_instructions")
)
if lesson["relevance"] >= 0.5 and lesson["heuristic"] is not None:
experience = f"transcript: {event_excerpt}\n\nlesson: {lesson['heuristic']}"
logger.debug_kv("💡 Learned lesson from excerpt", experience, "green")
await app.default_thread.add_async(experience)
logger.debug_kv("📝", "Updating parent app state", "green")
await app.default_thread.run_async(app)
else:
logger.debug_kv("🥱 ", "nothing special", "green")
user_id = event.payload.get("user").get("id")
current_user_state = await app.state.read(user_id)
await app.state.write(
user_id,
{
**current_user_state,
"n_interactions": current_user_state["n_interactions"] + 1,
},
)


async def learn_from_child_interactions(
app: AIApplication, event_name: str | None = None
):
if event_name is None:
event_name = "marvin.assistants.SubAssistantRunCompleted"

logger.debug_kv("👂 Listening for", event_name, "green")
while not sum(map(ord, "vogon poetry")) == 42:
try:
async with PrefectCloudEventSubscriber(
filter=EventFilter(event=dict(name=[event_name]))
) as subscriber:
async for event in subscriber:
logger.debug_kv("📬 Received event", event.event, "green")
await update_parent_app_state(app, event)
except Exception as e:
if isinstance(e, ConnectionClosedError):
logger.debug_kv("🚨 Connection closed, reconnecting...", "red")
else: # i know, i know
logger.debug_kv("🚨", str(e), "red")


parent_assistant_options = dict(
instructions=(
"Your job is learn from the interactions of data engineers (users) and Marvin (a growing AI assistant)."
" You'll receive excerpts of these interactions (which are in the Prefect Slack workspace) as they occur."
" Your notes will be provided to Marvin when it interacts with users. Notes should be stored for each user"
" with the user's id as the key. The user id will be shown in the excerpt of the interaction."
" The user profiles (values) should include at least: {name: str, notes: list[str], n_interactions: int}."
" Keep NO MORE THAN 3 notes per user, but you may curate/update these over time for Marvin's maximum benefit."
" Notes must be 2 sentences or less, and must be concise and focused primarily on users' data engineering needs."
" Notes should not directly mention Marvin as an actor, they should be generally useful observations."
),
state=JSONBlockKV(block_name="marvin-parent-app-state"),
)


@asynccontextmanager
async def lifespan(app: FastAPI):
with AIApplication(name="Marvin", **parent_assistant_options) as marvin:
app.state.marvin = marvin
task = asyncio.create_task(learn_from_child_interactions(marvin))
yield
task.cancel()
try:
await task
except asyncio.exceptions.CancelledError:
get_logger("PrefectEventSubscriber").debug_kv(
"👋", "Stopped listening for child events", "red"
)

app.state.marvin = None


def emit_assistant_completed_event(
child_assistant: Assistant, parent_app: AIApplication, payload: dict
) -> Event:
event = emit_event(
event="marvin.assistants.SubAssistantRunCompleted",
resource={
"prefect.resource.id": child_assistant.id,
"prefect.resource.name": "child assistant",
"prefect.resource.role": "assistant",
},
related=[
{
"prefect.resource.id": parent_app.id,
"prefect.resource.name": "parent assistant",
"prefect.resource.role": "assistant",
}
],
payload=payload,
)
return event
Loading

0 comments on commit e0b4803

Please sign in to comment.