Skip to content

Commit

Permalink
✨ add generated parameters for Bunch, Flow and Task. #20
Browse files Browse the repository at this point in the history
  • Loading branch information
perillaroc committed Apr 30, 2022
1 parent 9636f17 commit 206e804
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 6 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
'lark',
'grpcio',
'typer',
'pydantic'
],

extras_require={
Expand Down
4 changes: 4 additions & 0 deletions takler/constant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@


DEFAULT_HOST = "localhost"
DEFAULT_PORT = "33083"
55 changes: 53 additions & 2 deletions takler/core/bunch.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from typing import Optional, Dict, Union
from typing import Optional, Dict, Union, List

from pydantic import BaseModel, validator

from takler import constant

from .node_container import NodeContainer
from .flow import Flow
from .node import Node
from .parameter import (
Parameter, TAKLER_HOST, TAKLER_PORT, TAKLER_HOME
)


class Bunch(NodeContainer):
def __init__(self, name: str = ""):
def __init__(self, name: str = "", port: str = None):
super(Bunch, self).__init__(name=name)
self.flows = dict() # type: Dict[str, Flow]
self.server_state = ServerState(port=port) # type: ServerState
self.server_state.setup()

# Flow ------------------------------------------------

def add_flow(self, flow: Union[Flow, str]) -> Flow:
if isinstance(flow, str):
flow = Flow(name=flow)
self.flows[flow.name] = flow
flow.bunch = self
return flow

def find_flow(self, name: str) -> Optional[Flow]:
Expand All @@ -32,6 +44,8 @@ def delete_flow(self, flow: Union[str, Flow]) -> Flow:

return flow

# Node access -----------------------------------------------

def find_node(self, node_path: str) -> Optional[Node]:
if not Node.check_absolute_node_path(node_path):
raise ValueError(f"absolute node path is illegal: {node_path}")
Expand All @@ -42,3 +56,40 @@ def find_node(self, node_path: str) -> Optional[Node]:
if a_flow is None:
return None
return a_flow.find_node(node_path)

# Parameter ------------------------------------------------

def find_generated_parameter(self, name: str) -> Optional[Parameter]:
p = self.server_state.find_parameter(name)
return p


class ServerState(BaseModel):
server_parameters: List[Parameter] = []
host: str = constant.DEFAULT_HOST
port: Optional[str] = constant.DEFAULT_PORT

class Config:
arbitrary_types_allowed = True
validate_assignment = True

@validator("port")
def set_port(cls, p):
if p is None:
p = constant.DEFAULT_PORT
return p

def setup(self):
self.setup_default_server_parameters()

def setup_default_server_parameters(self):
self.server_parameters.append(Parameter(TAKLER_HOST, self.host))
self.server_parameters.append(Parameter(TAKLER_PORT, self.port))
self.server_parameters.append(Parameter(TAKLER_HOME, "."))

def find_parameter(self, name: str) -> Optional[Parameter]:
for p in self.server_parameters:
if p.name == name:
return p

return None
25 changes: 24 additions & 1 deletion takler/core/flow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
from .task_node import Task
from typing import TYPE_CHECKING, Optional

from .node_container import NodeContainer
from .parameter import Parameter

if TYPE_CHECKING:
from .bunch import Bunch


class Flow(NodeContainer):
def __init__(self, name: str):
super(Flow, self).__init__(name)

self.bunch = None # type: Optional[Bunch]

# Node access --------------------------------------

def get_bunch(self): # type: () -> Optional[Bunch]
return self.bunch

# Parameter ----------------------------------------

def find_parent_parameter(self, name: str) -> Optional[Parameter]:
p = super(Flow, self).find_parent_parameter(name)
if p is not None:
return p

if self.bunch is None:
return None

return self.bunch.find_parent_parameter(name)
45 changes: 42 additions & 3 deletions takler/core/node.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from __future__ import annotations

from typing import Union, List, Optional, Dict
from typing import Union, List, Optional, Dict, TYPE_CHECKING
from pathlib import PurePosixPath
from collections import defaultdict
from abc import ABC, abstractmethod

from .state import State, NodeStatus
from .parameter import Parameter
from .expression import Expression

if TYPE_CHECKING:
from .bunch import Bunch


# def compute_node_status(node: Node, immediate: bool) -> NodeStatus:
# """
Expand Down Expand Up @@ -70,7 +74,7 @@ def compute_most_significant_status(nodes: List[Node], immediate: bool) -> NodeS
return NodeStatus.unknown


class Node(object):
class Node(ABC):
def __init__(self, name: str):
# 树形
self.name = name # type: str
Expand Down Expand Up @@ -184,6 +188,12 @@ def get_root(self) -> Node:
root = root.parent
return root

def get_bunch(self) -> Optional[Bunch]:
if self.parent is not None:
return self.parent.get_bunch()
else:
return None

def find_node(self, a_path: str) -> Optional[Node]:
"""
use node path to find a node.
Expand Down Expand Up @@ -384,8 +394,27 @@ def find_parameter(self, name: str) -> Optional[Parameter]:
"""
Find a ``parameter`` only in this node.
"""
p = self.find_user_parameter(name)
if p is not None:
return p

p = self.find_generated_parameter(name)
if p is not None:
return p

def find_user_parameter(self, name: str) -> Optional[Parameter]:
"""
Find user ``parameter`` only in this node.
"""
return self.parameters.get(name, None)

# @abstractmethod
def find_generated_parameter(self, name: str) -> Optional[Parameter]:
"""
Find generated ``parameter`` only in this node.
"""
return None

def find_parent_parameter(self, name: str) -> Optional[Parameter]:
"""
Find a ``Parameter`` up along the node tree.
Expand All @@ -401,7 +430,17 @@ def find_parent_parameter(self, name: str) -> Optional[Parameter]:
return p
parent_node = parent_node.parent

return None
bunch = self.get_bunch()
if bunch is None:
return None

return bunch.find_parameter(name)

def update_generated_parameters(self):
"""
Update generated parameters for this node.
"""
pass

# Node Operations ------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions takler/core/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# Task level
TASK = "TASK"
TAKLER_NAME = "TAKLER_NAME"
TAKLER_RID = "TAKLER_RID"


class Parameter(object):
Expand Down
48 changes: 48 additions & 0 deletions takler/core/task_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
import asyncio
from typing import Optional

from pydantic import BaseModel

from .node import Node
from .state import NodeStatus
from .parameter import (
Parameter,
TASK, TAKLER_NAME, TAKLER_RID
)
from ._logger import logger


Expand All @@ -12,12 +18,17 @@ def __init__(self, name: str):
super(Task, self).__init__(name)
self.task_id = None # type: Optional[str]

self.generated_parameters = TaskNodeGeneratedParameters(node=self) # type: TaskNodeGeneratedParameters

def __repr__(self):
return f"Task {self.name}"

# State management --------------------------------------------

def computed_status(self, immediate: bool) -> NodeStatus:
"""
The status of a task is its own status.
"""
return self.state.node_status

def swim_status_change(self):
Expand Down Expand Up @@ -46,8 +57,17 @@ def resolve_dependencies(self) -> bool:

# run jobs
self.run()

return True

# Parameter ---------------------------------------------------

def find_generated_parameter(self, name: str) -> Optional[Parameter]:
return self.generated_parameters.find_parameter(name)

def update_generated_parameters(self):
self.generated_parameters.update_parameters()

# Node Operation ----------------------------------------------
# Node operation is used to control the flow.

Expand Down Expand Up @@ -80,6 +100,34 @@ def abort(self):
self.handle_status_change()


class TaskNodeGeneratedParameters(BaseModel):
node: Task
task: Parameter = Parameter(TASK, None)
takler_name: Parameter = Parameter(TAKLER_NAME, None)
takler_rid: Parameter = Parameter(TAKLER_RID, None)

class Config:
arbitrary_types_allowed = True

def update_parameters(self):
"""
Update generated parameters from task node's attrs.
"""
self.task.value = self.node.name
self.takler_name.value = self.node.node_path
self.takler_rid.value = self.node.task_id

def find_parameter(self, name: str) -> Optional[Parameter]:
if name == TASK:
return self.task
elif name == TAKLER_NAME:
return self.takler_name
elif name == TAKLER_RID:
return self.takler_rid
else:
return None


def task(name: str):
"""
Decorator to create inline task.
Expand Down

0 comments on commit 206e804

Please sign in to comment.