Skip to content

Python library for creating DAGman job files for HTCondor

License

Notifications You must be signed in to change notification settings

MichaelLampe/pydagman

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pydagman

Python library for programmatic creation of DAGman job files for Condor

Usage

To work with the package, put it in an accessible location and import the Dagfile and Job classes

from pydagman.dagfile import Dagfile
from pydagman.job import Job

Create a dagfile object

mydag = Dagfile()

Create a job. You must specify the path to your submit file. Optionally, you can specify a name, otherwise the library will automatically generate a uuid to serve as the job name.

job1 = Job('job1.submit', 'job1')
# OR
job1 = Job('job1.submit') # Job name will be automatically generated uuid

The Job.noop attribute controls whether the job is actually run or not. Job.noop is False by default

job1 = Job('job1.submit')
job1.noop = True

Jobs can have variables (VARS in the dagfile) that will be injected into the submit file.

job1.add_var('mem', '20G')
job1.add_var('outdir', '/tmp/job1')

Jobs can also have pre and post scripts. Job.add_pre and Job.add_post take the path to the script as the first parameter, and optionally as many additional parameters as you need for script arguments.

job1.add_pre('pre_stage1.sh') # No additional arguments
job1.add_post('post_stage1.sh', '/tmp/job1', '/tmp/job2') # Expands to 'post-stage1.sh /tmp/job1 /tmp/job2'

Job.pre_skip will cause the entire node to complete successfully if the PRE script exits with the given exit code

job1.pre_skip("1")

Of course we can also specify an arbitrary number of parent/child relationships between jobs. Job.add_parent ensures that the parent isn't already in the child job's parent list. If it is, Job will throw DuplicateParentError. The library could choose to just ignore the duplicate Job.add_parent, but since it almost certainly indicates a logic issue with the program using this library we choose to fail early.

job2 = Job('job2.submit')
job2.add_parent(job1)
job2.add_parent(job1) # Adding the same parent a second time throws DuplicateParentError

Once you have configured your job's variables, pre/post scripts, and parent/child relationships you add them to the dagfile with Dagfile.add_job.

my_dag.add_job(job1)
my_dag.add_job(job2)

Dagfile.add_job also checks for circular dependencies and throws CircularDependencyError if one is found.

job3 = Job('job3.submit')
job4 = Job('job4.submit')
job3.add_parent(job4)
job4.add_parent(job3) # We now have a circular dependency between job3 and job4, but they are not checked until the jobs are added to the dagfile
mydag.add_job(job3) # No error here, since we've only added job3 to the dagfile so far
mydag.add_job(job4) # Now CircularDependencyError is thrown, because job3 already specified job4 as it's parent

You can use job.Job.add_category and dagfile.Dagfile.set_maxjobs to control the number of concurrent jobs in any category

job3 = Job('job3.submit')
job3.add_category("BigCPU")
mydag.add_job(job3)
mydag.set_maxjobs("BigCPU", 24)

Dagfile.abort_dag_on adds conditions instruction DAGman to abort the entire workflow if the job returns a certain exit code. An alternate return value can also be specified

mydag.abort_dag_on(job3.name, "3")
mydag.abort_dag_on(job3.name, "3", "1") # The return code for the dag workflow is now 1 instead of 3

The state of a job is saved to the dagfile object when add_job is called. Therefore, adding further attributes to a job (vars, pre/post, parent) after a job has already been added to a dagfile object will have no affect:

job5 = Job('job5.submit')
mydag.add_job(job5)
job5.add_var('mem', '20G') # This has no effect since the job was already added to the dagfile object

Example with output

# my_workflow.py
from pydagman.dagfile import Dagfile
from pydagman.job import Job

mydag = Dagfile()

job1 = Job('job1.submit', 'JOB1')
job1.add_var('mem', '20G')
job1.add_var('cpus', '12')
job1.retry(4)
mydag.add_job(job1)

job2 = Job('job2.submit', 'JOB2')
job2.add_var('mem', '4G')
job2.add_var('cpus', '2')
job2.retry(2)
job2.add_parent(job1)
mydag.add_job(job2)

job3 = Job('job3.submit', 'JOB3')
job3.add_pre('job3_pre.sh', '/tmp/JOB2', '/tmp/JOB3')
job3.add_var('mem', '4G')
job3.add_var('cpus', '2')
job3.retry(2)
job3.add_parent(job2)
mydag.add_job(job3)

job4 = Job('job4.submit', 'JOB4')
job4.add_pre('job4_pre.sh', '/tmp/JOB2', '/tmp/JOB4')
job4.add_var('mem', '4G')
job4.add_var('cpus', '2')
job4.retry(2)
job4.add_parent(job2)
mydag.add_job(job4)

mydag.save('my_workflow.dag')
# my_workflow.dag
JOB JOB1 job1.submit
VARS JOB1 mem="20G"
VARS JOB1 cpus="12"
RETRY JOB1 4

JOB JOB2 job2.submit
VARS JOB2 mem="4G"
VARS JOB2 cpus="2"
RETRY JOB2 2

JOB JOB3 job3.submit
SCRIPT PRE JOB3 job3_pre.sh /tmp/JOB2 /tmp/JOB3
VARS JOB3 mem="4G"
VARS JOB3 cpus="2"
RETRY JOB3 2

JOB JOB4 job4.submit
SCRIPT PRE JOB4 job4_pre.sh /tmp/JOB2 /tmp/JOB4
VARS JOB4 mem="4G"
VARS JOB4 cpus="2"
RETRY JOB4 2

PARENT JOB1 CHILD JOB2
PARENT JOB2 CHILD JOB3
PARENT JOB2 CHILD JOB4

About

Python library for creating DAGman job files for HTCondor

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%