Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dagster Data pipeline #798

Closed
wants to merge 18 commits into from
Closed

Conversation

yan91083
Copy link
Contributor

There are 5 assets:

  • model_predict: choose model & language to conduct prediction for all three files.
  • matching: after prediction, choose model & language to analyze how prediction matches with groundtruth.
  • tabby_eval_result: generate a report(csv file) for all models and languages.
  • tabby_dataset: read the csv to dataframe.
  • tabby_jupyter: Jupyter notebook to show the result.

Copy link
Member

@wsxiaoys wsxiaoys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round of feedback, focusing on prediction part.

"pandas"
)
.copy_local_file(local_path="/tmp/tabby_model_id", remote_path="/tmp/tabby_model_id")
.run_function(download_model)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use https://modal.com/docs/reference/modal.Image#env to pass MODEL_ID as environment variable, and in download_model you can access model id as os.environ.get("MODEL_ID")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.env("MODEL_ID", os.environ.get("MODEL_ID"))

)
.dockerfile_commands("ENTRYPOINT []")
.pip_install(
"git+https://github.com/TabbyML/tabby.git#egg=tabby-python-client&subdirectory=experimental/eval/tabby-python-client",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

embed this directory inside tabby-data-pipeline, use https://modal.com/docs/reference/modal.Image#copy_local_dir to copy and run pip install


my_env = os.environ.copy()
my_env["TABBY_DISABLE_USAGE_COLLECTION"] = "1"
MODEL_ID = os.popen("cat /tmp/tabby_model_id").read().strip()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

from tabby_python_client.api.v1 import health

resp = await health.asyncio(client=self.client)
return resp.to_dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return resp?

return resp.to_dict()

@method()
async def complete(self, language, crossfile_context, index, row):
Copy link
Member

@wsxiaoys wsxiaoys Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type annotation for every argument. For row, define a named tuple: https://docs.python.org/3/library/typing.html#typing.NamedTuple

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to list all column name of the dataframe in the namedtuple?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For columns used for input / output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there's a dynamic column? For example, "prediction" didn't exist when you first run the file and it will be added to the file. Next time, we need to pass it through the "row"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


df = pd.DataFrame(objs)

outputs = await asyncio.gather(*[model.complete.remote.aio(language, crossfile_context, index, row) for index, row in df.iterrows()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it still necessary to run this in chunks?



@stub.local_entrypoint()
async def main(language, file):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put prediction code under modal directory.

LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)

# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()

Line 91 already contains retry logic, no need to increase timeout

LAUNCH_FLAGS = ["serve", "--model", MODEL_ID, "--port", "8000", "--device", "cuda"]
self.launcher = subprocess.Popen(["/opt/tabby/bin/tabby"] + LAUNCH_FLAGS, env=my_env)
self.client = Client("http://127.0.0.1:8000", timeout=240)

# Poll until webserver at 127.0.0.1:8000 accepts connections before running inputs.
def webserver_ready():
try:
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
socket.create_connection(("127.0.0.1", 8000), timeout=30).close()
socket.create_connection(("127.0.0.1", 8000), timeout=1).close()

Line 91 already contains retry logic, no need to increase timeout

@@ -111,15 +115,10 @@ async def complete(self, language, crossfile_context, index, row):
from tabby_python_client import errors
import pandas as pd

if 'prediction' in row and not pd.isnull(row['prediction']):
# if prediction exists, just skip
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if prediction exists, you can simply don't call complete function


for file in ['line_completion.jsonl', 'line_completion_rg1_bm25.jsonl', 'line_completion_oracle_bm25.jsonl']:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please extract function for this, e.g read_pandas_frame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this meant? extract function for the "for" loop? or do you mean extract function for reading all the three files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either interface is fine, it's just good to split the main function here into smaller chunks for better readability

@yan91083 yan91083 changed the title Dagster Data pipeline feat: Dagster Data pipeline Nov 25, 2023
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This directory should be in side .gitignore


context.add_output_metadata(metadata={"model_id": MetadataValue.md(model_id)})

files = 'line_completion.jsonl, line_completion_rg1_bm25.jsonl, line_completion_oracle_bm25.jsonl'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's these files? Should they being added as assets?



model = model_id.split("/")[-1]
for file in ["line_completion.jsonl", "line_completion_rg1_bm25.jsonl", "line_completion_oracle_bm25.jsonl"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't each file itself being an asset?

Int,
file_relative_path
)
from . import analyze, create_csv
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no need to extract utility functions. You could just define the asset as python function and orgnize them in individual python files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are individual python files, but I have to import them and call them in the assets

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they don't have to be. They can be defined as asset directly.

@@ -0,0 +1,2 @@
tmp*
tabby_data_pipeline.egg-info
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you still need to remove the directory from this commit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't remove this file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be in git ignore as well

@wsxiaoys wsxiaoys closed this Oct 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants