Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor async feature for improved performance #44

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/monkeyFunctions.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions examples/async_tasks/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import os
from time import time
from typing import AsyncIterable, Generator

import openai
from dotenv import load_dotenv

from monkey_patch.monkey import Monkey as monkey

load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")


@monkey.patch
async def iter_presidents() -> AsyncIterable[str]:
"""List the presidents of the United States"""


@monkey.patch
async def iter_prime_ministers() -> AsyncIterable[str]:
"""List the prime ministers of the UK"""


@monkey.patch
async def tell_me_more_about(topic: str) -> str:
""""""


async def describe_presidents():
# For each president listed, generate a description concurrently
start_time = time()
print(start_time)
tasks = []
iter = iter_prime_ministers()
async for president in iter:
print(f"Generating description for {president}")
#task = asyncio.create_task(tell_me_more_about(president))
#tasks.append(task)

#descriptions = await asyncio.gather(*tasks)

#print(f"Generated {len(descriptions)} descriptions in {time() - start_time} seconds")
# return descriptions


def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(describe_presidents())
loop.close()

if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ numpy~=1.24.4
python-dotenv==1.0.0
bitarray==2.8.2
pydantic==2.4.2
fastapi~=0.104.0
fastapi~=0.104.0
httpx
Binary file modified src/monkey_patch/__pycache__/monkey.cpython-311.pyc
Binary file not shown.
Binary file modified src/monkey_patch/__pycache__/register.cpython-311.pyc
Binary file not shown.
Binary file modified src/monkey_patch/__pycache__/repair.cpython-311.pyc
Binary file not shown.
Binary file modified src/monkey_patch/__pycache__/utils.cpython-311.pyc
Binary file not shown.
Binary file not shown.
143 changes: 136 additions & 7 deletions src/monkey_patch/language_models/language_modeler.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
import io
import json
from typing import get_args, Any

import ijson as ijson

from monkey_patch.language_models.openai_api import Openai_API
from monkey_patch.models.function_description import FunctionDescription
from monkey_patch.models.language_model_output import LanguageModelOutput
from monkey_patch.utils import approximate_token_count
from monkey_patch.validator import Validator

INSTRUCTION = "You are given below a function description and input data. The function description of what the " \
"function must carry out can be found in the Function section, with input and output type hints. The " \
"input data can be found in Input section. Using the function description, apply the function to the " \
"Input and return a valid output type, that is acceptable by the output_class_definition and " \
"output_class_hint. Return None if you can't apply the function to the input or if the output is " \
"optional and the correct output is None.\nINCREDIBLY IMPORTANT: Only output a JSON-compatible string " \
"in the correct response format. If there are no inputs, but a defined output, you must follow the " \
"instructions of the docstring and generate an output. "

SYSTEM = f"You are a skillful and accurate language model, who applies a described function on input data. Make sure " \
f"the function is applied accurately and correctly and the outputs follow the output type hints and are " \
f"valid outputs given the output types. "

REPAIR = "Below are an outputs of a function applied to inputs, which failed type validation. The input to the " \
"function is brought out in the INPUT section and function description is brought out in the FUNCTION " \
"DESCRIPTION section. Your task is to apply the function to the input and return a correct output in the " \
"right type. The FAILED EXAMPLES section will show previous outputs of this function applied to the data, " \
"which failed type validation and hence are wrong outputs. Using the input and function description output " \
"the accurate output following the output_class_definition and output_type_hint attributes of the function " \
"description, which define the output type. Make sure the output is an accurate function output and in the " \
"correct type. Return None if you can't apply the function to the input or if the output is optional and the " \
"correct output is None. "


class LanguageModel(object):
def __init__(self, generation_token_limit = 512) -> None:
self.instruction = "You are given below a function description and input data. The function description of what the function must carry out can be found in the Function section, with input and output type hints. The input data can be found in Input section. Using the function description, apply the function to the Input and return a valid output type, that is acceptable by the output_class_definition and output_class_hint. Return None if you can't apply the function to the input or if the output is optional and the correct output is None.\nINCREDIBLY IMPORTANT: Only output a JSON-compatible string in the correct response format."
self.system_message = f"You are a skillful and accurate language model, who applies a described function on input data. Make sure the function is applied accurately and correctly and the outputs follow the output type hints and are valid outputs given the output types."

self.instruction = INSTRUCTION
self.system_message = SYSTEM
self.instruction_token_count = approximate_token_count(self.instruction)
self.system_message_token_count = approximate_token_count(self.system_message)
self.api_models = {"openai": Openai_API()}
self.repair_instruction = "Below are an outputs of a function applied to inputs, which failed type validation. The input to the function is brought out in the INPUT section and function description is brought out in the FUNCTION DESCRIPTION section. Your task is to apply the function to the input and return a correct output in the right type. The FAILED EXAMPLES section will show previous outputs of this function applied to the data, which failed type validation and hence are wrong outputs. Using the input and function description output the accurate output following the output_class_definition and output_type_hint attributes of the function description, which define the output type. Make sure the output is an accurate function output and in the correct type. Return None if you can't apply the function to the input or if the output is optional and the correct output is None."
self.repair_instruction = REPAIR
self.generation_length = generation_token_limit
self.models = {"gpt-4":{"token_limit": 8192 - self.generation_length, "type": "openai"},
"gpt-4-32k": {"token_limit": 32768 - self.generation_length, "type": "openai"}
} # models and token counts
self.validator = Validator()


def generate(self, args, kwargs, function_modeler, function_description, llm_parameters = {}):
def generate(self, args, kwargs, function_modeler, function_description, llm_parameters = {}) -> LanguageModelOutput:
"""
The main generation function, given the args, kwargs, function_modeler, function description and model type, generate a response and check if the datapoint can be saved to the finetune dataset
"""
Expand All @@ -30,8 +61,17 @@ def generate(self, args, kwargs, function_modeler, function_description, llm_par
model_type = self.get_teacher_model_type(model)
choice = self.synthesise_answer(prompt, model, model_type, llm_parameters)

output = LanguageModelOutput(choice, save_to_finetune,is_distilled_model)
return output
output = LanguageModelOutput(choice, save_to_finetune, is_distilled_model)

# Create the object from the output of the language model
instantiated = args.get_object_from_output(function_description,
args,
kwargs,
output,
self.validator)


return instantiated

def synthesise_answer(self, prompt, model, model_type, llm_parameters):
"""
Expand All @@ -40,6 +80,95 @@ def synthesise_answer(self, prompt, model, model_type, llm_parameters):
if model_type == "openai":
return self.api_models[model_type].generate(model, self.system_message, prompt, **llm_parameters)

async def generate_async(self, args, kwargs,
function_modeler,
function_description: FunctionDescription,
llm_parameters={}) -> LanguageModelOutput:
"""
The main generation function, given the args, kwargs, function_modeler, function description and model type,
generate a response and check if the datapoint can be saved to the finetune dataset :return:
"""
prompt, model, save_to_finetune, is_distilled_model = self.get_generation_case(args,
kwargs,
function_modeler,
function_description)
if is_distilled_model:
model_type = self.get_distillation_model_type(model)
else:
model_type = self.get_teacher_model_type(model)

buffer = ""
async for choice in self.synthesise_answer_async(prompt, model, model_type, llm_parameters):
delta = choice.get('choices', [{}])[0].get('delta', {})
content_chunk = delta.get('content', '')
buffer += content_chunk

if not buffer:
continue

# Convert set representation to JSON-compatible list
#if buffer.startswith("{'") and "', '" in buffer or buffer.startswith('{"') and '", "' in buffer:
# buffer = '[' + buffer[1:]

# Use ijson to parse buffer as a stream
try:
parser = ijson.parse(io.StringIO(buffer))

stack = []
key = None
for prefix, event, value in parser:
if event == 'map_key':
key = value
elif event in ('start_map', 'start_array'):
new_obj = [] if event == 'start_array' else {}
if stack:
parent_key, parent_obj = stack[-1]
if isinstance(parent_obj, list):
parent_obj.append(new_obj)
elif parent_key is not None:
parent_obj[parent_key] = new_obj
stack.append((key, new_obj)) # Initially set key as None
elif event in ('end_map', 'end_array'):
key, obj = stack.pop()
# Handle the case where obj is a list of strings and we are at the top level
if not stack and isinstance(obj, list) and all(isinstance(x, str) for x in obj):
for item in obj:
is_instantiable = self.validator.check_type(item,
function_description.output_class_definition)
if is_instantiable:
output = LanguageModelOutput(item, save_to_finetune, is_distilled_model)
yield output
buffer = "" # Reset buffer for next object
elif prefix:
parent_key, current_obj = stack[-1]
if isinstance(current_obj, list):
# Check if we are at the top level and handling a list of strings
if len(stack) == 1 and isinstance(value, str):
output_type_args = get_args(function_description.output_type_hint)
if output_type_args:
output_type_arg = output_type_args[0]
else:
output_type_arg = Any
is_instantiable = self.validator.check_type(value, output_type_arg)
if is_instantiable:
output = LanguageModelOutput(value, save_to_finetune, is_distilled_model)
yield output
buffer = "[" + buffer[len(json.dumps(value))+2:].lstrip(', ')
else:
current_obj.append(value)
else:
current_obj[key] = value

except ijson.JSONError as e:
# Not enough data to constitute a complete JSON object, continue reading more data
pass



async def synthesise_answer_async(self, prompt, model, model_type, llm_parameters):
if model_type == "openai":
async for chunk in self.api_models[model_type].generate_async(model, self.system_message, prompt, **llm_parameters):
yield chunk

def get_distillation_model_type(self, model):
"""
Expand Down
36 changes: 35 additions & 1 deletion src/monkey_patch/language_models/openai_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os

import httpx as httpx
import openai

# import abstract base class
Expand Down Expand Up @@ -36,4 +39,35 @@ def generate(self, model, system_message, prompt, **kwargs):
presence_penalty=presence_penalty
)
choice = response.choices[0].message.content.strip("'")
return choice
return choice

async def generate_async(self, model, system_message, prompt, **kwargs):
temperature = kwargs.get("temperature", 0)
top_p = kwargs.get("top_p", 1)
frequency_penalty = kwargs.get("frequency_penalty", 0)
presence_penalty = kwargs.get("presence_penalty", 0)

messages = [
{
"role": "system",
"content": system_message
},
{
"role": "user",
"content": prompt
}
]

response = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=512,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
stream=True
)

for chunk in response:
yield chunk
Loading