Computational Pipelines with pydoit¶
pydoit is an excellent tool for describing computational pipelines. It is similar to make, but much more flexible.
The following is a real-life example of how to combine pydoit
with a
cluster scheduling system. In this example, we do some quantum optimal control
(the task of steering a quantum system in some desired way)
using the QDYN Fortran library (without going into any details about the
underlying physics). QDYN provides two tools:
qdyn_prop_traj [options] <rf>
reads a bunch of input data from files in the “runfolder”<rf>
, simulates the dynamics of the quantum system, and writes new analysis data into the runfolderqdyn_optimize [options] <rf>
runs an iterative optimization scheme on the controls stored in the runfolder. In each iteration, the evolution of the quantum system gets closer to some desired target. The program performs a fixed number of these iterations, and then writes the “optimized” controls to files in the runfolder
Both of these programs are parallelized using MPI (see also the section about Parallelization Paradigms).
The entire chain of computations is expressed in a table:
import pandas as pd
params_data_str = r'''
# T lambda_a n_trajs iter_stop
10 0.001 10 10
10 0.0005 10 15
10 0.0001 10 30
20 0.001 10 20
50 0.001 10 30
70 0.001 10 30
'''
params_df = pd.read_fwf(
StringIO(params_data_str), comment='#', header=1,
names=['T', 'lambda_a', 'n_trajs', 'iter_stop'])
We want to look at the control problem for different physical durations T
of the process; The parameter lambda_a
is a “step width” of the
optimization algorithm, n_trajs
is how many parallel MPI processes we can
use, and iter_stop
is the number of iterations of the control algorithm.
Our computation pipeline contains four actions:
- For each duration
T
in the table, create a unique runfolder and write the appropriate input data to files in this folder - For the different values of
iter_stop
for any givenT
, modify the data in the runfolder to reflect the correspondinglambda_a
- Submit a job to the cluster that runs
qdyn_optimize
untiliter_stop
is reached. - Finally, after all optimizations are done, for each runfolder (i.e., each
unique
T
), submit a job to the cluster that runsqdyn_prop_traj
in order to analyze the result of the optimization
Thus, based on the above table, the following specific steps are required:
- Generate input data for
T=10
in the runfolder./data/rf10
- Generate input data for
T=20
in the runfolder./data/rf20
- Generate input data for
T=50
in the runfolder./data/rf50
- Generate input data for
T=70
in the runfolder./data/rf70
- Modify the data in the
./data/rf10
to do an optimization with step width 0.001, stopping after 10 iterations - Modify the data in the
./data/rf20
to do an optimization with step width 0.001, stopping after 20 iterations - Modify the data in the
./data/rf50
to do an optimization with step width 0.001, stopping after 30 iterations - Modify the data in the
./data/rf70
to do an optimization with step width 0.001, stopping after 30 iterations - Submit a job the the cluster that runs
qdyn_optimize [options] ./data/rf10
- Submit a job the the cluster that runs
qdyn_optimize [options] ./data/rf20
- Submit a job the the cluster that runs
qdyn_optimize [options] ./data/rf50
- Submit a job the the cluster that runs
qdyn_optimize [options] ./data/rf70
- After submitted job for
./data/rf10
finishes, modify the data in that runfolder to continue the optimization with step width 0.0005, stopping after iteration 15 - Submit another job the the cluster that runs
qdyn_optimize [options] ./data/rf10
- After that job finishes, modify the data in that runfolder to continue the optimization with step width 0.0001, stopping after iteration 30
- Submit another job the the cluster that runs
qdyn_optimize [options] ./data/rf10
- After submitted optimization jobs for
./data/rf10
finish, submit a job that runsqdyn_prop_traj [options] ./data/rf10
- After submitted optimization jobs for
./data/rf20
finish, submit a job that runsqdyn_prop_traj [options] ./data/rf20
- After submitted optimization jobs for
./data/rf50
finish, submit a job that runsqdyn_prop_traj [options] ./data/rf50
- After submitted optimization jobs for
./data/rf70
finish, submit a job that runsqdyn_prop_traj [options] ./data/rf70
Actions¶
The first two of the actions in our pipeline (generating the input data, and
modifying that data for a specific value of lambda_a
/iter_stop
) do not
involve clusterjob
. These are handled by two Python routines:
write_model(rf, T, lambda_a, iter_stop)
: write input data to the runfolderrf
update_config(rf, lambda_a, iter_stop)
: update the config file in the runfolder file new data
For the remaining two actions, we use routines that generate and submit job scripts. We set this up as
import clusterjob
clusterjob.JobScript.read_defaults('cluster.ini')
where the configuration file cluster.ini
contains:
[Attributes]
backend = slurm
cache_folder = ./data/cache
module_load =
module load intel
module load mpi
[Resources]
nodes = 1
threads = 1
mem = 10000
This is for running the pipeline on a single workstation with the SLURM scheduler installed. With a different configuration file, we could use a different scheduler or submit to a remote cluster with more compute nodes.
The action routines now are:
from textwrap import dedent
def submit_optimization(rf, n_trajs, task):
body = dedent(r'''
{module_load}
cd {rf}
OMP_NUM_THREADS=1 mpirun -n {n_trajs} qdyn_optimize --n-trajs={n_trajs} \
--J_T=J_T_sm .
''')
taskname = "oct_%s" % task.name.replace(":", '_')
jobscript = clusterjob.JobScript(
body=body, filename=join(rf, 'oct.slr'),
jobname=taskname, nodes=1, ppn=int(n_trajs), threads=1,
stdout=join(rf, 'oct.log'))
jobscript.rf = rf
jobscript.n_trajs = str(int(n_trajs))
run = jobscript.submit(cache_id=taskname)
run.dump(join(rf, 'oct.job.dump'))
def submit_propagation(rf, n_trajs):
body = dedent(r'''
{module_load}
cd {rf}
OMP_NUM_THREADS=1 mpirun -n {n_trajs} qdyn_prop_traj --n-trajs={n_trajs} \
--use-oct-pulses --write-final-state=state_final.dat .
''')
taskname = "prop_" + os.path.split(rf)[-1]
jobscript = clusterjob.JobScript(
body=body, filename=join(rf, 'prop.slr'),
jobname=taskname, nodes=1, ppn=int(n_trajs), threads=1,
stdout=join(rf, 'prop.log'))
jobscript.rf = rf
jobscript.n_trajs = str(int(n_trajs))
run = jobscript.submit(cache_id=taskname, force=True)
run.dump(join(rf, 'prop.job.dump'))
Both of these store a dump of the submitted job to a file inside the runfolder. We then have another action that polls the scheduler, waiting for the job to finish successfully:
def wait_for_clusterjob(dumpfile):
try:
run = clusterjob.AsyncResult.load(dumpfile)
run.wait()
os.unlink(dumpfile)
return run.successful()
except OSError:
# dump file was already removed in earlier execution
pass
Tasks¶
We now build the pipeline of pydoit
tasks using the above actions, and the
information in the params_df
table. For convenience, we identify the
appropriate runfolder for each row in params_df
as
def runfolder(row):
return './data/rf%d' % row['T']
First, we create a runfolder for each unique value of T
:
def task_create_runfolder():
jobs = {}
for ind, row in params_df.iterrows():
rf = runfolder(row)
if rf in jobs:
# only one task per runfolder, not per row!
continue
jobs[rf] = {
'name': str(rf),
'actions': [
(write_model, [], dict(
rf=rf, T=row['T'], lambda_a=row['lambda_a'],
iter_stop=int(row['iter_stop'])))],
'targets': [join(rf, 'config')],
'uptodate': [True, ] # up to date if target exists
}
for job in jobs.values():
yield job
Next, we have a task that updates the config file data as necessary.
def task_update_runfolder():
rf_jobs = defaultdict(list)
for ind, row in params_df.iterrows():
rf = runfolder(row)
# we only update the config after any earlier optimization has finished
task_dep = ['wait_for_optimization:%s' % ind2 for ind2 in rf_jobs[rf]]
rf_jobs[rf].append(ind)
yield {
'name': str(ind),
'actions': [
(update_config, [], dict(
rf=rf, lambda_a=row['lambda_a'],
iter_stop=int(row['iter_stop'])))],
'file_dep': [join(rf, 'config')],
'uptodate': [False, ], # always run task
'task_dep': task_dep}
The crucial part of this is the task-dependency: we only update the data after
any earlier optimization in the same runfolder has finished (the
wait_for_optimization
task will be defined below). There is an implicit
dependence on task_create_runfolder
through the existence of the file
‘config’ inside the runfolder.
In order to run the optimization, we have one task to run the submit_optimization
action.
def task_submit_optimization():
rf_jobs = defaultdict(list)
for ind, row in params_df.iterrows():
rf = runfolder(row)
task_dep = ['wait_for_optimization:%s' % ind2 for ind2 in rf_jobs[rf]]
task_dep.append('update_runfolder:%s' % ind)
yield {
'name': str(ind),
'actions': [
(submit_optimization, [rf, ], dict(n_trajs=row['n_trajs']))],
# 'task' keyword arg is added automatically
'task_dep': task_dep,
'uptodate': [(pulses_uptodate, [], {'rf': rf}), ],
}
Again, we only start an optimization, after each earlier optimization in the same runfolder has finished. This relies on a task that simply waits for the submitted job to finish.
It is worth noting that we define the pipeline primarily using task
dependencies, not file dependencies. We have custom routine pulses_uptodate
that checks whether an optimization needs to be run.
Task dependencies are a feature that is unique to pydoit
(in comparison to make
, which defines targets entirely through files). That
being said, we use task dependencies here only because we want to dynamically
change the data in the runfolder between tasks. In our example, the reason for
this is that write_model
generate a large amount of data to the runfolder,
whereas update_config
only makes a very small modification. In a situation
where the pipeline can be expressed through file dependencies (if later tasks
do not modify the data from earlier tasks), it is often more straightforward to
do this.
def task_wait_for_optimization():
for ind, row in params_df.iterrows():
rf = runfolder(row)
yield {
'name': str(ind),
'task_dep': ['submit_optimization:%d' % ind],
'actions': [
(wait_for_clusterjob, [join(rf, 'oct.job.dump')], {}),]}
The propagation is handled separately and independent of the optimization. Again, we have two tasks, one to submit the propagation, and one to wait for the submitted job to finish.
def task_submit_propagation():
jobs = {}
for ind, row in params_df.iterrows():
rf = runfolder(row)
jobs[rf] = {
'name': str(rf),
'actions': [
(submit_propagation, [rf, ], dict(n_trajs=row['n_trajs']))],
'file_dep': [join(rf, 'pulse1.oct.dat'),],}
for job in jobs.values():
yield job
def task_wait_for_propagation():
jobs = {}
for ind, row in params_df.iterrows():
rf = runfolder(row)
jobs[rf] = {
'name': str(rf),
'task_dep': ['submit_propagation:%s' % rf],
'actions': [
(wait_for_clusterjob, [join(rf, 'prop.job.dump')], {}),]}
for job in jobs.values():
yield job
Running the pipline¶
It is often convenient to use the Jupyter notebook to define the pipeline; in this case, we use the %doit
magic to run it:
from doit.tools import register_doit_as_IPython_magic
register_doit_as_IPython_magic()
Then,
%doit -n 4 wait_for_optimization
produces:
. create_runfolder:./data/rf10
. create_runfolder:./data/rf20
. create_runfolder:./data/rf50
. create_runfolder:./data/rf70
. update_runfolder:0
. update_runfolder:3
. update_runfolder:4
. submit_optimization:0
. update_runfolder:5
. submit_optimization:3
. submit_optimization:5
. submit_optimization:4
. wait_for_optimization:0
. wait_for_optimization:3
. wait_for_optimization:5
. wait_for_optimization:4
. update_runfolder:1
. submit_optimization:1
. wait_for_optimization:1
. update_runfolder:2
. submit_optimization:2
. wait_for_optimization:2
After that, we run the propagation as
%doit -n 4 wait_for_propagation
resulting in:
. submit_propagation:./data/rf10
. submit_propagation:./data/rf20
. submit_propagation:./data/rf50
. submit_propagation:./data/rf70
. wait_for_propagation:./data/rf10
. wait_for_propagation:./data/rf20
. wait_for_propagation:./data/rf50
. wait_for_propagation:./data/rf70
Calling doit
with 4 processes (-n 4
) can provide small speedup: by
having split our tasks into “submission” and “wait”, we already largely have an
asynchronous pipeline (submit_optimizaton
finishes immediately). However,
with the additional parallelization we create all the runfolder in parallel,
and we also monitor the scheduler for several jobs at the same time
(wait_for_optimization
runs in parallel, instead of in series).
If the pipeline is run again after it finishes, only the actions of the
update_runfolder
tasks are actually execute; pydoit
recognizes
everything else as “up-to-date”. If we add or modify rows in params_df
,
re-running the pipeline will only add the missing data.
Moreover, the caching feature of clusterjob
ensures that we could actually
kill pydoit
, respectively the notebook containing the pipeline. If we then
were to re-execute it at some later time, clusterjob
would pick up already
submitted jobs.