Skip to content

Commit

Permalink
models: replace workflow.run_number with generation and restart number
Browse files Browse the repository at this point in the history
Change the workflow table to split the run_number into two integers: one
referring to the generation of the workflows, and the other one
referring to the restart number, thus removing the limit of 9 restarts.

Closes reanahub#186.
  • Loading branch information
giuseppe-steduto committed Oct 11, 2023
1 parent 6271ecd commit ff651aa
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 45 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The list of contributors in alphabetical order:
- `Camila Diaz <https://orcid.org/0000-0001-5543-797X>`_
- `Diego Rodriguez <https://orcid.org/0000-0003-0649-2002>`_
- `Dinos Kousidis <https://orcid.org/0000-0002-4914-4289>`_
- `Giuseppe Steduto <https://orcid.org/0009-0002-1258-8553>`_
- `Jan Okraska <https://orcid.org/0000-0002-1416-3244>`_
- `Leticia Wanderley <https://orcid.org/0000-0003-4649-6630>`_
- `Marco Donadoni <https://orcid.org/0000-0003-2922-5505>`_
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Changes
=======

Version 0.9.3 (UNRELEASED)
--------------------------
- Replaces ``run_number`` column of the ``Workflow`` table with two new columns ``generation_number`` and ``restart_number``, to allow for more than 9 restarts.

Version 0.9.2 (2023-09-26)
--------------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Separate run number into generation and restart number.
Revision ID: b85c3e601de4
Revises: 377cfbfccf75
Create Date: 2023-10-02 12:08:18.292490
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "b85c3e601de4"
down_revision = "377cfbfccf75"
branch_labels = None
depends_on = None


def upgrade():
"""Upgrade to b85c3e601de4 revision."""
# Add new columns (generation_number, restart_number)
op.add_column(
"workflow", sa.Column("generation_number", sa.Integer()), schema="__reana"
)
op.add_column(
"workflow",
sa.Column("restart_number", sa.Integer(), default=0),
schema="__reana",
)

# Data migration (split run_number into generation_number and restart_number)
op.get_bind().execute(
sa.text(
"UPDATE __reana.workflow"
" SET generation_number = FLOOR(run_number), "
" restart_number = (run_number - FLOOR(run_number)) * 10"
),
)

# Delete old constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Drop old run_number column
op.drop_column("workflow", "run_number", schema="__reana")

# Add new constraint (the primary key is not run_number anymore, but with generation and restart number
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "generation_number", "restart_number"],
schema="__reana",
)

# Update restart_number for workflows that have been restarted more than 10 times
# (thus erroneously having the following generation_number), in case some of them
# were created before the limit on 9 restarts was introduced.
op.get_bind().execute(
sa.text(
"""
UPDATE __reana.workflow AS w
SET
generation_number = to_be_updated.new_generation_number,
restart_number = (w.restart_number + (w.generation_number - to_be_updated.new_generation_number) * 10)
FROM (
SELECT MIN(w1.generation_number) - 1 AS new_generation_number, w1.workspace_path
FROM __reana.workflow w1
WHERE w1.restart AND w1.restart_number = 0
GROUP BY w1.workspace_path
) AS to_be_updated
WHERE w.workspace_path = to_be_updated.workspace_path
"""
),
)


def downgrade():
"""Downgrade to 377cfbfccf75 revision."""
# Revert constraint
op.drop_constraint("_user_workflow_run_uc", "workflow", schema="__reana")

# Add old run_number column back
op.add_column("workflow", sa.Column("run_number", sa.Float()), schema="__reana")

# Check that there are no workflows discarded more than 10 times
# This is because of the way the info about restarts is stored in
# the run_number column (see https://github.com/reanahub/reana-db/issues/186)
restarted_ten_times = (
op.get_bind()
.execute("SELECT COUNT(*) FROM __reana.workflow WHERE restart_number >= 10")
.fetchone()[0]
)
if restarted_ten_times != 0:
raise ValueError(
"Cannot migrate database because some workflows have been restarted 10 or more times,"
" and the previous database revision only supports up to 9 restarts."
" If you want to downgrade, you should manually delete them."
)

# Data migration (combine generation_number and restart_number back to run_number)
op.get_bind().execute(
"UPDATE __reana.workflow SET run_number=generation_number+(restart_number * 1.0 /10)"
)

# Drop new columns
op.drop_column("workflow", "generation_number", schema="__reana")
op.drop_column("workflow", "restart_number", schema="__reana")

# Restore old constraint
op.create_unique_constraint(
"_user_workflow_run_uc",
"workflow",
["name", "owner_id", "run_number"],
schema="__reana",
)
85 changes: 51 additions & 34 deletions reana_db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import uuid
from datetime import datetime
from functools import reduce
from typing import Dict, List
from typing import Dict, List, Tuple

from reana_commons.config import (
MQ_MAX_PRIORITY,
Expand Down Expand Up @@ -60,6 +60,7 @@
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
split_run_number,
update_users_cpu_quota,
update_users_disk_quota,
update_workflow_cpu_quota,
Expand Down Expand Up @@ -459,7 +460,8 @@ class Workflow(Base, Timestamp, QuotaBase):
run_started_at = Column(DateTime)
run_finished_at = Column(DateTime)
run_stopped_at = Column(DateTime)
_run_number = Column("run_number", Float)
generation_number = Column(Integer)
restart_number = Column(Integer, default=0)
job_progress = Column(JSONType, default=dict)
workspace_path = Column(String)
restart = Column(Boolean, default=False)
Expand Down Expand Up @@ -487,7 +489,11 @@ class Workflow(Base, Timestamp, QuotaBase):

__table_args__ = (
UniqueConstraint(
"name", "owner_id", "run_number", name="_user_workflow_run_uc"
"name",
"owner_id",
"generation_number",
"restart_number",
name="_user_workflow_run_uc",
),
{"schema": "__reana"},
)
Expand Down Expand Up @@ -527,7 +533,9 @@ def __init__(
self.git_repo = git_repo
self.git_provider = git_provider
self.restart = restart
self._run_number = self.assign_run_number(run_number)
self.generation_number, self.restart_number = self.get_new_run_number(
run_number
)
self.workspace_path = workspace_path or build_workspace_path(
self.owner_id, self.id_
)
Expand All @@ -537,54 +545,65 @@ def __repr__(self):
"""Workflow string representation."""
return "<Workflow %r>" % self.id_

@hybrid_property
def run_number(self):
def run_number(self) -> str:
"""Property of run_number."""
if self._run_number.is_integer():
return int(self._run_number)
return self._run_number

@run_number.expression
def run_number(cls):
return func.abs(cls._run_number)
if self.restart_number != 0:
return f"{self.generation_number}.{self.restart_number}"
return str(self.generation_number)

def assign_run_number(self, run_number):
"""Assing run number."""
def _get_last_workflow(self, run_number):
"""Fetch the last workflow restart given a certain run number."""
from .database import Session

if run_number:
generation_number, restart_number = split_run_number(run_number)
last_workflow = (
Session.query(Workflow)
.filter(
Workflow.name == self.name,
Workflow.run_number >= int(run_number),
Workflow.run_number < int(run_number) + 1,
Workflow.generation_number == generation_number,
Workflow.owner_id == self.owner_id,
)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.generation_number.desc(), Workflow.restart_number.desc()
)
.first()
)
else:
last_workflow = (
Session.query(Workflow)
.filter_by(name=self.name, restart=False, owner_id=self.owner_id)
.order_by(Workflow.run_number.desc())
.order_by(
Workflow.generation_number.desc(), Workflow.restart_number.desc()
)
.first()
)
if last_workflow and self.restart:
# FIXME: remove the limit of nine restarts when we fix the way in which
# we save `run_number` in the DB
num_restarts = round(last_workflow.run_number * 10) % 10
if num_restarts == LIMIT_RESTARTS:
return last_workflow

def get_new_run_number(self, run_number) -> Tuple[int, int]:
"""Return the generation and restart numbers for a new workflow.
Return a tuple where the first element is the generation number and the
second element is the restart number.
"""
last_workflow = self._get_last_workflow(run_number)

if not last_workflow:
if self.restart:
raise REANAValidationError(
f"Cannot restart a workflow more than {LIMIT_RESTARTS} times"
"Cannot restart a workflow that has not been run before."
)
return round(last_workflow.run_number + 0.1, 1)
return 1, 0 # First generation, no restart

else:
if not last_workflow:
return 1
if not self.restart:
generation_number = last_workflow.generation_number + 1
restart_number = 0
else:
return last_workflow.run_number + 1
generation_number = last_workflow.generation_number
restart_number = last_workflow.restart_number + 1

return generation_number, restart_number

def get_input_parameters(self):
"""Return workflow parameters."""
Expand All @@ -604,7 +623,7 @@ def get_owner_access_token(self):

def get_full_workflow_name(self):
"""Return full workflow name including run number."""
return "{}.{}".format(self.name, str(self.run_number))
return "{}.{}".format(self.name, self.run_number)

def get_workspace_disk_usage(self, summarize=False, search=None):
"""Retrieve disk usage information of a workspace."""
Expand Down Expand Up @@ -643,15 +662,13 @@ def get_all_restarts(self):
"""Get all the restarts of this workflow, including the original workflow.
Returns all the restarts of this workflow, that is all the workflows that have
the same name and the same run number (up to the dot). This includes the
the same name and the same generation number. This includes the
original workflow, as well as all the following restarts.
"""
run_number = int(self.run_number)
restarts = Workflow.query.filter(
Workflow.name == self.name,
Workflow.owner_id == self.owner_id,
Workflow.run_number >= run_number,
Workflow.run_number < run_number + 1,
Workflow.generation_number == self.generation_number,
)
return restarts

Expand Down
30 changes: 24 additions & 6 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None):
return workspace_path


def split_run_number(run_number):
"""Split run number into generation and restart numbers."""
run_number = str(run_number)
if "." in run_number:
return tuple(map(int, run_number.split(".", maxsplit=1)))
return int(run_number), 0


def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
"""Get Workflow from database with uuid or name.
Expand Down Expand Up @@ -128,21 +136,31 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
return _get_workflow_by_name(workflow_name, user_uuid)

# `run_number` was specified.
# Check `run_number` is valid.
try:
run_number = float(run_number)
generation_number, restart_number = run_number.split(".", maxsplit=1)
except ValueError:
# There were not enough dot-separated substrings, so probably
# the `restart_number` was not specified.
generation_number = run_number
restart_number = 0

# Check `run_number` and `restart_number` are valid.
try:
generation_number = int(generation_number)
restart_number = int(restart_number)
except ValueError:
# `uuid_or_name` was split, so it is a dot-separated string
# `uuid_or_name` was split, so it is a dot-separated string,
# but it didn't contain a valid `run_number`.
# Assume that this dot-separated string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)

# `run_number` is valid.
# Search by `run_number` since it is a primary key.
# Search by `generation_number` and `restart_number`, since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number == run_number,
Workflow.generation_number == generation_number,
Workflow.restart_number == restart_number,
Workflow.owner_id == user_uuid,
).one_or_none()
if not workflow:
Expand All @@ -169,7 +187,7 @@ def _get_workflow_by_name(workflow_name, user_uuid):
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(Workflow.run_number.desc())
.order_by(Workflow.generation_number.desc(), Workflow.restart_number.desc())
.first()
)
if not workflow:
Expand Down
Loading

0 comments on commit ff651aa

Please sign in to comment.