Skip to content

Commit

Permalink
log jobs and job defs in mlflow
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed May 14, 2024
1 parent dad6c5f commit 5cb4107
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 64 deletions.
24 changes: 15 additions & 9 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,19 @@ def execute(self):
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
)

try:
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
except CellExecutionError as e:
raise e
finally:
if getattr(job, "mlflow_logging", False):
self.log_to_mlflow(job, nb)
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
with mlflow.start_run(run_id=job.mlflow_run_id):
try:
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
if job.parameters:
mlflow.log_params(job.parameters)
except CellExecutionError as e:
raise e
finally:
if getattr(job, "mlflow_logging", False):
self.log_to_mlflow(job, nb)
self.add_side_effects_files(staging_dir)
self.create_output_files(job, nb)

def add_side_effects_files(self, staging_dir: str):
"""Scan for side effect files potentially created after input file execution and update the job's packaged_files with these files"""
Expand Down Expand Up @@ -175,8 +179,10 @@ def create_output_files(self, job: DescribeJob, notebook_node):
for output_format in job.output_formats:
cls = nbconvert.get_exporter(output_format)
output, _ = cls().from_notebook_node(notebook_node)
output_path = self.staging_paths[output_format]
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)
mlflow.log_artifact(output_path)

def log_to_mlflow(self, job, nb):
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
Expand Down
62 changes: 10 additions & 52 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
import os
import random
import shutil
import signal
import subprocess
from typing import Dict, List, Optional, Type, Union
import sys
from uuid import uuid4
import subprocess

import fsspec
import mlflow
import psutil
from jupyter_core.paths import jupyter_data_dir
from jupyter_server.transutils import _i18n
Expand Down Expand Up @@ -50,10 +46,6 @@
create_output_filename,
)

MLFLOW_SERVER_HOST = "127.0.0.1"
MLFLOW_SERVER_PORT = "5000"
MLFLOW_SERVER_URI = f"http://{MLFLOW_SERVER_HOST}:{MLFLOW_SERVER_PORT}"


class BaseScheduler(LoggingConfigurable):
"""Base class for schedulers. A default implementation
Expand Down Expand Up @@ -409,31 +401,20 @@ class Scheduler(BaseScheduler):
task_runner = Instance(allow_none=True, klass="jupyter_scheduler.task_runner.BaseTaskRunner")

def start_mlflow_server(self):
mlflow_process = subprocess.Popen(
subprocess.Popen(
[
"mlflow",
"server",
"--backend-store-uri",
"./mlruns",
"--default-artifact-root",
"./mlartifacts",
"--host",
MLFLOW_SERVER_HOST,
"0.0.0.0",
"--port",
MLFLOW_SERVER_PORT,
],
preexec_fn=os.setsid,
"5000",
]
)
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
return mlflow_process

def stop_mlflow_server(self):
if self.mlflow_process is not None:
os.killpg(os.getpgid(self.mlflow_process.pid), signal.SIGTERM)
self.mlflow_process.wait()
self.mlflow_process = None
print("MLFlow server stopped")

def mlflow_signal_handler(self, signum, frame):
print("Shutting down MLFlow server")
self.stop_mlflow_server()
sys.exit(0)

def __init__(
self,
Expand All @@ -450,9 +431,7 @@ def __init__(
if self.task_runner_class:
self.task_runner = self.task_runner_class(scheduler=self, config=config)

self.mlflow_process = self.start_mlflow_server()
signal.signal(signal.SIGINT, self.mlflow_signal_handler)
signal.signal(signal.SIGTERM, self.mlflow_signal_handler)
self.start_mlflow_server()

@property
def db_session(self):
Expand Down Expand Up @@ -502,21 +481,6 @@ def create_job(self, model: CreateJob) -> str:
if not model.output_formats:
model.output_formats = []

mlflow_client = mlflow.MlflowClient()

if model.job_definition_id and model.mlflow_experiment_id:
experiment_id = model.mlflow_experiment_id
else:
experiment_id = mlflow_client.create_experiment(f"{model.input_filename}-{uuid4()}")
model.mlflow_experiment_id = experiment_id
input_file_path = os.path.join(self.root_dir, model.input_uri)
mlflow.log_artifact(input_file_path, "input")

mlflow_run = mlflow_client.create_run(
experiment_id=experiment_id, run_name=f"{model.input_filename}-{uuid4()}"
)
model.mlflow_run_id = mlflow_run.info.run_id

job = Job(**model.dict(exclude_none=True, exclude={"input_uri"}))

session.add(job)
Expand Down Expand Up @@ -664,12 +628,6 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
if not self.file_exists(model.input_uri):
raise InputUriError(model.input_uri)

mlflow_client = mlflow.MlflowClient()
experiment_id = mlflow_client.create_experiment(f"{model.input_filename}-{uuid4()}")
model.mlflow_experiment_id = experiment_id
input_file_path = os.path.join(self.root_dir, model.input_uri)
mlflow.log_artifact(input_file_path, "input")

job_definition = JobDefinition(**model.dict(exclude_none=True, exclude={"input_uri"}))
session.add(job_definition)
session.commit()
Expand Down
10 changes: 7 additions & 3 deletions src/mainviews/create-job.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ export function CreateJob(props: ICreateJobProps): JSX.Element {

const handleInputChange = (event: ChangeEvent<HTMLInputElement>) => {
const target = event.target;

const parameterNameIdx = parameterNameMatch(target.name);
const parameterValueIdx = parameterValueMatch(target.name);
const newParams = props.model.parameters || [];
Expand Down Expand Up @@ -323,7 +322,10 @@ export function CreateJob(props: ICreateJobProps): JSX.Element {
idempotency_token: props.model.idempotencyToken,
tags: props.model.tags,
runtime_environment_parameters: props.model.runtimeEnvironmentParameters,
package_input_folder: props.model.packageInputFolder
package_input_folder: props.model.packageInputFolder,
mlflow_logging: props.model.mlflowLogging,
mlflow_experiment_id: props.model.mlflowExperimentId,
mlflow_run_id: props.model.mlflowRunId
};

if (props.model.parameters !== undefined) {
Expand Down Expand Up @@ -372,7 +374,9 @@ export function CreateJob(props: ICreateJobProps): JSX.Element {
runtime_environment_parameters: props.model.runtimeEnvironmentParameters,
schedule: props.model.schedule,
timezone: props.model.timezone,
package_input_folder: props.model.packageInputFolder
package_input_folder: props.model.packageInputFolder,
mlflow_logging: props.model.mlflowLogging,
mlflow_experiment_id: props.model.mlflowExperimentId
};

if (props.model.parameters !== undefined) {
Expand Down

0 comments on commit 5cb4107

Please sign in to comment.