Skip to content

Commit

Permalink
✨ add parameters list methods. #20
Browse files Browse the repository at this point in the history
✅ add test for list parameters.
  • Loading branch information
perillaroc committed May 3, 2022
1 parent a6da091 commit d7a5315
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 10 deletions.
9 changes: 9 additions & 0 deletions takler/core/bunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def find_generated_parameter(self, name: str) -> Optional[Parameter]:
p = self.server_state.find_parameter(name)
return p

def generated_parameters_only(self) -> Dict[str, Parameter]:
return self.server_state.generated_parameters()


class ServerState(BaseModel):
server_parameters: List[Parameter] = []
Expand Down Expand Up @@ -93,3 +96,9 @@ def find_parameter(self, name: str) -> Optional[Parameter]:
return p

return None

def generated_parameters(self) -> Dict[str, Parameter]:
params = dict()
for p in self.server_parameters:
params[p.name] = p
return params
57 changes: 54 additions & 3 deletions takler/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, name: str):
self.children = list() # type: List[Node]

# 参数
self.parameters = dict() # type: Dict[str, Parameter]
self.user_parameters = dict() # type: Dict[str, Parameter]

# 触发器
self.trigger_expression = None # type: Optional[Expression]
Expand Down Expand Up @@ -387,7 +387,7 @@ def add_parameter(self, name: str, value: Union[str, float, int, bool]):
Add a ``Parameter`` to this node.
"""
p = Parameter(name=name, value=value)
self.parameters[name] = p
self.user_parameters[name] = p
return p

def find_parameter(self, name: str) -> Optional[Parameter]:
Expand All @@ -406,7 +406,7 @@ def find_user_parameter(self, name: str) -> Optional[Parameter]:
"""
Find user ``parameter`` only in this node.
"""
return self.parameters.get(name, None)
return self.user_parameters.get(name, None)

# @abstractmethod
def find_generated_parameter(self, name: str) -> Optional[Parameter]:
Expand Down Expand Up @@ -442,6 +442,57 @@ def update_generated_parameters(self):
"""
pass

def parameters(self) -> Dict[str, Parameter]:
"""
Return all parameters accessible to this Node.
"""
params = self.parameters_only().copy()

parent_node = self.parent
while parent_node is not None:
parent_params = parent_node.parameters_only().copy()
for key, p in parent_params.items():
if key not in params:
params[key] = p
parent_node = parent_node.parent

bunch = self.get_bunch()
if bunch is not None:
bunch_params = bunch.parameters_only()
for key, p in bunch_params.items():
if key not in params:
params[key] = p

return params

def parameters_only(self) -> Dict[str, Parameter]:
"""
Return all parameters in this Node.
"""
user_params = self.user_parameters_only()
generated_params = self.generated_parameters_only()

params = {
**user_params
}
for key, p in generated_params.items():
if key not in params:
params[key] = p

return params

def user_parameters_only(self) -> Dict[str, Parameter]:
"""
Return user defined parameters in this Node.
"""
return self.user_parameters

def generated_parameters_only(self) -> Dict[str, Parameter]:
"""
Return generated parameters
"""
return dict()

# Node Operations ------------------------------------------------

def requeue(self):
Expand Down
13 changes: 12 additions & 1 deletion takler/core/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@
class Parameter(object):
def __init__(self, name: str, value: Optional[Union[str, int, float, bool]] = None):
self.name = name # type: str
self.value = value # type: Optional[Union[str, int, float, bool]]
self._value = value # type: Optional[Union[str, int, float, bool]]

def __repr__(self):
return f"Parameter<{self.name}, {self.value}>"

def __eq__(self, other):
return type(other) == type(self) and other.name == self.name and other.value == self.value

@property
def value(self):
return self._value

@value.setter
def value(self, v: Optional[Union[str, int, float, bool]]):
self._value = v
12 changes: 11 additions & 1 deletion takler/core/task_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import functools
import asyncio
from typing import Optional
from typing import Optional, Dict

from pydantic import BaseModel

Expand Down Expand Up @@ -68,6 +68,9 @@ def find_generated_parameter(self, name: str) -> Optional[Parameter]:
def update_generated_parameters(self):
self.generated_parameters.update_parameters()

def generated_parameters_only(self) -> Dict[str, Parameter]:
return self.generated_parameters.generated_parameters()

# Node Operation ----------------------------------------------
# Node operation is used to control the flow.

Expand Down Expand Up @@ -127,6 +130,13 @@ def find_parameter(self, name: str) -> Optional[Parameter]:
else:
return None

def generated_parameters(self) -> Dict[str, Parameter]:
return {
TASK: self.task,
TAKLER_NAME: self.takler_name,
TAKLER_RID: self.takler_rid
}


def task(name: str):
"""
Expand Down
127 changes: 127 additions & 0 deletions tests/core/test_parameter_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import pytest

from takler.core import Parameter, parameter, Bunch


@pytest.fixture
def simple_bunch(simple_flow_objects, simple_flow_2_objects):
bunch = Bunch("nwpc_op")
bunch.add_parameter("TAKLER_HOME", "/home/johndoe/takler_out")

# flow1
with simple_flow_objects["flow1"] as flow1:
bunch.add_flow(flow1)
flow1.add_parameter("TAKLER_HOME", "/home/johndoe/bunch/flow1")
flow1.add_parameter("HH", "00")
flow1.add_parameter("FORECAST_INTERVAL", 3)
flow1.add_parameter("FORECAST_LENGTH", 240)

with simple_flow_objects["container1"] as container1:
container1.add_parameter("CLASS", "serial_op")

with simple_flow_objects["task1"] as task1:
task1.add_parameter("UPLOAD_GRIB2", True)
task1.add_parameter("FORECAST_LENGTH", 120)

with simple_flow_objects["task3"] as task3:
task3.add_parameter("FORECAST_LENGTH", 24)
task3.add_parameter("TASKS", 64)

# flow2
with simple_flow_2_objects["flow2"] as flow2:
flow2.add_parameter("TAKLER_HOME", "/home/johndoe/bunch/flow2")
bunch.add_flow(flow2)

return bunch


def test_get_flow1_task1_parameters(simple_bunch, simple_flow_objects):
task1 = simple_flow_objects["task1"]
task1.update_generated_parameters()
params = task1.parameters()
params_keys = params.keys()
assert list(params_keys) == [
# task1
"UPLOAD_GRIB2",
"FORECAST_LENGTH",
"TASK",
"TAKLER_NAME",
"TAKLER_RID",

# container1
"CLASS",

# flow1
"TAKLER_HOME",
"HH",
"FORECAST_INTERVAL",

# bunch
"TAKLER_HOST",
"TAKLER_PORT",
]

# task1
assert params["UPLOAD_GRIB2"] == Parameter("UPLOAD_GRIB2", True)
assert params["FORECAST_LENGTH"] == Parameter("FORECAST_LENGTH", 120)
assert params["TASK"] == Parameter("TASK", "task1")
assert params["TAKLER_NAME"] == Parameter("TAKLER_NAME", "/flow1/container1/task1")
assert params["TAKLER_RID"] == Parameter("TAKLER_RID", None)

# container1
assert params["CLASS"] == Parameter("CLASS", "serial_op")

# flow1
assert params["TAKLER_HOME"] == Parameter("TAKLER_HOME", "/home/johndoe/bunch/flow1")
assert params["HH"] == Parameter("HH", "00")
assert params["FORECAST_INTERVAL"] == Parameter("FORECAST_INTERVAL", 3)

# bunch
assert params["TAKLER_HOST"] == Parameter("TAKLER_HOST", "localhost")
assert params["TAKLER_PORT"] == Parameter("TAKLER_PORT", "33083")


def test_get_flow1_task3_parameters(simple_bunch, simple_flow_objects):
task3 = simple_flow_objects["task3"]
task3.update_generated_parameters()
params = task3.parameters()
params_keys = params.keys()
assert list(params_keys) == [
# task1
"FORECAST_LENGTH",
"TASKS",
"TASK",
"TAKLER_NAME",
"TAKLER_RID",

# container1
"CLASS",

# flow1
"TAKLER_HOME",
"HH",
"FORECAST_INTERVAL",

# bunch
"TAKLER_HOST",
"TAKLER_PORT",
]

# task1
assert params["FORECAST_LENGTH"] == Parameter("FORECAST_LENGTH", 24)
assert params["TASKS"] == Parameter("TASKS", 64)
assert params["TASK"] == Parameter("TASK", "task3")
assert params["TAKLER_NAME"] == Parameter("TAKLER_NAME", "/flow1/container1/container2/task3")
assert params["TAKLER_RID"] == Parameter("TAKLER_RID", None)

# container1
assert params["CLASS"] == Parameter("CLASS", "serial_op")

# flow1
assert params["TAKLER_HOME"] == Parameter("TAKLER_HOME", "/home/johndoe/bunch/flow1")
assert params["HH"] == Parameter("HH", "00")
assert params["FORECAST_INTERVAL"] == Parameter("FORECAST_INTERVAL", 3)

# bunch
assert params["TAKLER_HOST"] == Parameter("TAKLER_HOST", "localhost")
assert params["TAKLER_PORT"] == Parameter("TAKLER_PORT", "33083")
10 changes: 5 additions & 5 deletions tests/core/test_parameter_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ def test_add_parameter(simple_flow_objects):
flow1.add_parameter("NODES", 4)
flow1.add_parameter("TIME_INTERVAL", 0.1)

assert flow1.parameters["ECF_HOME"] == Parameter(name="ECF_HOME", value="/home/johndoe")
assert flow1.parameters["NODES"] == Parameter(name="NODES", value=4)
assert flow1.parameters["TIME_INTERVAL"] == Parameter(name="TIME_INTERVAL", value=0.1)
assert flow1.user_parameters["ECF_HOME"] == Parameter(name="ECF_HOME", value="/home/johndoe")
assert flow1.user_parameters["NODES"] == Parameter(name="NODES", value=4)
assert flow1.user_parameters["TIME_INTERVAL"] == Parameter(name="TIME_INTERVAL", value=0.1)

container1 = simple_flow_objects["container1"]
container1.add_parameter("TASKS", 32)
assert container1.parameters["TASKS"] == Parameter(name="TASKS", value=32)
assert container1.user_parameters["TASKS"] == Parameter(name="TASKS", value=32)

task1 = simple_flow_objects["task1"]
task1.add_parameter("FLAG", True)
assert task1.parameters["FLAG"] == Parameter(name="FLAG", value=True)
assert task1.user_parameters["FLAG"] == Parameter(name="FLAG", value=True)


def test_find_parameter(simple_flow_objects_with_parameter):
Expand Down

0 comments on commit d7a5315

Please sign in to comment.