Skip to content

Commit

Permalink
Merge branch 'main' into mlflow
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i authored May 15, 2024
2 parents 561d695 + 7cf42f9 commit e59cfe1
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 132 deletions.
84 changes: 55 additions & 29 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,73 @@
import os
from pathlib import Path

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

from jupyter_scheduler.orm import create_session, create_tables
from jupyter_scheduler.orm import Base
from jupyter_scheduler.scheduler import Scheduler
from jupyter_scheduler.tests.mocks import MockEnvironmentManager

pytest_plugins = ("jupyter_server.pytest_plugin",)
pytest_plugins = ("jupyter_server.pytest_plugin", "pytest_jupyter.jupyter_server")

HERE = Path(__file__).parent.resolve()
DB_FILE_PATH = f"{HERE}/jupyter_scheduler/tests/testdb.sqlite"
DB_URL = f"sqlite:///{DB_FILE_PATH}"

TEST_ROOT_DIR = f"{HERE}/jupyter_scheduler/tests/test_root_dir"
@pytest.fixture(scope="session")
def static_test_files_dir() -> Path:
return Path(__file__).parent.resolve() / "jupyter_scheduler" / "tests" / "static"


@pytest.fixture
def jp_server_config(jp_server_config):
def jp_scheduler_root_dir(tmp_path: Path) -> Path:
root_dir = tmp_path / "workspace_root"
root_dir.mkdir()
return root_dir


@pytest.fixture
def jp_scheduler_output_dir(jp_scheduler_root_dir: Path) -> Path:
output_dir = jp_scheduler_root_dir / "jobs"
output_dir.mkdir()
return output_dir


@pytest.fixture
def jp_scheduler_staging_dir(jp_data_dir: Path) -> Path:
staging_area = jp_data_dir / "scheduler_staging_area"
staging_area.mkdir()
return staging_area


@pytest.fixture
def jp_scheduler_db_url(jp_scheduler_staging_dir: Path) -> str:
db_file_path = jp_scheduler_staging_dir / "scheduler.sqlite"
return f"sqlite:///{db_file_path}"


@pytest.fixture
def jp_scheduler_db(jp_scheduler_db_url):
engine = create_engine(jp_scheduler_db_url, echo=False)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()


@pytest.fixture
def jp_scheduler(jp_scheduler_db_url, jp_scheduler_root_dir, jp_scheduler_db):
return Scheduler(
db_url=jp_scheduler_db_url,
root_dir=str(jp_scheduler_root_dir),
environments_manager=MockEnvironmentManager(),
)


@pytest.fixture
def jp_server_config(jp_scheduler_db_url, jp_server_config):
return {
"ServerApp": {"jpserver_extensions": {"jupyter_scheduler": True}},
"SchedulerApp": {
"db_url": DB_URL,
"db_url": jp_scheduler_db_url,
"drop_tables": True,
"environment_manager_class": "jupyter_scheduler.tests.mocks.MockEnvironmentManager",
},
Expand All @@ -30,23 +76,3 @@ def jp_server_config(jp_server_config):
},
"Scheduler": {"task_runner_class": "jupyter_scheduler.tests.mocks.MockTaskRunner"},
}


@pytest.fixture(autouse=True)
def setup_db():
create_tables(DB_URL, True)
yield
if os.path.exists(DB_FILE_PATH):
os.remove(DB_FILE_PATH)


@pytest.fixture
def jp_scheduler_db():
return create_session(DB_URL)


@pytest.fixture
def jp_scheduler():
return Scheduler(
db_url=DB_URL, root_dir=str(TEST_ROOT_DIR), environments_manager=MockEnvironmentManager()
)
75 changes: 46 additions & 29 deletions jupyter_scheduler/tests/test_execution_manager.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,60 @@
import shutil
from pathlib import Path
from typing import Tuple

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from conftest import DB_URL
from jupyter_scheduler.executors import DefaultExecutionManager
from jupyter_scheduler.orm import Job

JOB_ID = "69856f4e-ce94-45fd-8f60-3a587457fce7"
NOTEBOOK_NAME = "side_effects.ipynb"
SIDE_EFECT_FILE_NAME = "output_side_effect.txt"

NOTEBOOK_DIR = Path(__file__).resolve().parent / "test_staging_dir" / "job-4"
NOTEBOOK_PATH = NOTEBOOK_DIR / NOTEBOOK_NAME
SIDE_EFFECT_FILE = NOTEBOOK_DIR / SIDE_EFECT_FILE_NAME
@pytest.fixture
def staging_dir_with_side_effects(
static_test_files_dir, jp_scheduler_staging_dir
) -> Tuple[Path, Path]:
notebook_file_path = static_test_files_dir / "side_effects.ipynb"
side_effect_file_path = static_test_files_dir / "output_side_effect.txt"
job_staging_dir = jp_scheduler_staging_dir / "job-4"

job_staging_dir.mkdir()
shutil.copy2(notebook_file_path, job_staging_dir)
shutil.copy2(side_effect_file_path, job_staging_dir)

return (notebook_file_path, side_effect_file_path)


@pytest.fixture
def load_job(jp_scheduler_db):
with jp_scheduler_db() as session:
job = Job(
runtime_environment_name="abc",
input_filename=NOTEBOOK_NAME,
job_id=JOB_ID,
)
session.add(job)
session.commit()


def test_add_side_effects_files(jp_scheduler_db, load_job):
def side_effects_job_record(staging_dir_with_side_effects, jp_scheduler_db) -> str:
notebook_name = staging_dir_with_side_effects[0].name
job = Job(
runtime_environment_name="abc",
input_filename=notebook_name,
)
jp_scheduler_db.add(job)
jp_scheduler_db.commit()

return job.job_id


def test_add_side_effects_files(
side_effects_job_record,
staging_dir_with_side_effects,
jp_scheduler_root_dir,
jp_scheduler_db_url,
jp_scheduler_db,
):
job_id = side_effects_job_record
staged_notebook_file_path = staging_dir_with_side_effects[0]
staged_notebook_dir = staged_notebook_file_path.parent
side_effect_file_name = staging_dir_with_side_effects[1].name

manager = DefaultExecutionManager(
job_id=JOB_ID,
root_dir=str(NOTEBOOK_DIR),
db_url=DB_URL,
staging_paths={"input": str(NOTEBOOK_PATH)},
job_id=job_id,
root_dir=jp_scheduler_root_dir,
db_url=jp_scheduler_db_url,
staging_paths={"input": staged_notebook_file_path},
)
manager.add_side_effects_files(str(NOTEBOOK_DIR))
manager.add_side_effects_files(staged_notebook_dir)

with jp_scheduler_db() as session:
job = session.query(Job).filter(Job.job_id == JOB_ID).one()
assert SIDE_EFECT_FILE_NAME in job.packaged_files
job = jp_scheduler_db.query(Job).filter(Job.job_id == job_id).one()
assert side_effect_file_name in job.packaged_files
107 changes: 65 additions & 42 deletions jupyter_scheduler/tests/test_job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,65 +60,88 @@ async def test_copy_from_staging():
)


HERE = Path(__file__).parent.resolve()
OUTPUTS_DIR = os.path.join(HERE, "test_files_output")
@pytest.fixture
def staging_dir_with_notebook_job(static_test_files_dir, jp_scheduler_staging_dir):
staging_dir = jp_scheduler_staging_dir / "job-1"
job_filenames = ["helloworld-1.ipynb", "helloworld-1.html", "helloworld.ipynb"]

staged_job_files = []
staging_dir.mkdir()
for job_filename in job_filenames:
staged_job_file = shutil.copy2(static_test_files_dir / job_filename, staging_dir)
staged_job_files.append(staged_job_file)

return staged_job_files


@pytest.fixture
def clear_outputs_dir():
yield
shutil.rmtree(OUTPUTS_DIR)
# rmtree() is not synchronous; wait until it has finished running
while os.path.isdir(OUTPUTS_DIR):
time.sleep(0.01)


@pytest.mark.parametrize(
"output_formats, output_filenames, staging_paths, output_dir, redownload",
[
(
["ipynb", "html"],
{
def staging_dir_with_tar_job(static_test_files_dir, jp_scheduler_staging_dir):
staging_dir = jp_scheduler_staging_dir / "job-2"
job_tar_file = static_test_files_dir / "helloworld.tar.gz"

staging_dir.mkdir()
staged_tar_file = shutil.copy2(job_tar_file, staging_dir)

return staged_tar_file


@pytest.fixture
def downloader_parameters(
staging_dir_with_notebook_job,
staging_dir_with_tar_job,
request,
jp_scheduler_output_dir,
):
job_1_ipynb_file_path, job_1_html_file_path, job_1_input_file_path = (
staging_dir_with_notebook_job
)
job_2_tar_file_path = staging_dir_with_tar_job
index = request.param
parameters = [
{
"output_formats": ["ipynb", "html"],
"output_filenames": {
"ipynb": "job-1/helloworld-out.ipynb",
"html": "job-1/helloworld-out.html",
"input": "job-1/helloworld-input.ipynb",
},
{
"ipynb": os.path.join(HERE, "test_staging_dir", "job-1", "helloworld-1.ipynb"),
"html": os.path.join(HERE, "test_staging_dir", "job-1", "helloworld-1.html"),
"input": os.path.join(HERE, "test_staging_dir", "job-1", "helloworld.ipynb"),
"staging_paths": {
"ipynb": job_1_ipynb_file_path,
"html": job_1_html_file_path,
"input": job_1_input_file_path,
},
OUTPUTS_DIR,
False,
),
(
["ipynb", "html"],
{
"output_dir": jp_scheduler_output_dir,
"redownload": False,
},
{
"output_formats": ["ipynb", "html"],
"output_filenames": {
"ipynb": "job-2/helloworld-1.ipynb",
"html": "job-2/helloworld-1.html",
"input": "job-2/helloworld.ipynb",
},
{
"tar.gz": os.path.join(HERE, "test_staging_dir", "job-2", "helloworld.tar.gz"),
"staging_paths": {
"tar.gz": job_2_tar_file_path,
"ipynb": "job-2/helloworld-1.ipynb",
"html": "job-2/helloworld-1.html",
"input": "job-2/helloworld.ipynb",
},
OUTPUTS_DIR,
False,
),
],
)
def test_downloader_download(
clear_outputs_dir, output_formats, output_filenames, staging_paths, output_dir, redownload
):
downloader = Downloader(
output_formats=output_formats,
output_filenames=output_filenames,
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
"output_dir": jp_scheduler_output_dir,
"redownload": False,
},
]
return parameters[index]


@pytest.mark.parametrize("downloader_parameters", [0, 1], indirect=True)
def test_downloader_download(downloader_parameters):
output_formats, output_filenames, staging_paths, output_dir = (
downloader_parameters["output_formats"],
downloader_parameters["output_filenames"],
downloader_parameters["staging_paths"],
downloader_parameters["output_dir"],
)
downloader = Downloader(**downloader_parameters)
downloader.download()

assert os.path.exists(output_dir)
Expand Down
Loading

0 comments on commit e59cfe1

Please sign in to comment.