"""Abstraction for job scripts and cluster schedulers, for a variety of
scheduling backends (e.g., SLURM, PBS/TORQUE, ...)
Note:
To see debug messages, set::
import logging
logging.basicConfig(level=logging.DEBUG)
"""
from __future__ import absolute_import
__version__ = "2.0.0-dev"
import os
import tempfile
import re
try:
import cPickle as pickle
except ImportError:
import pickle
try:
# Python 2
from ConfigParser import SafeConfigParser
from ConfigParser import Error as ConfigParserError
except ImportError:
# Python 3
from configparser import ConfigParser as SafeConfigParser
from configparser import Error as ConfigParserError
import subprocess as sp
from glob import glob
from textwrap import dedent
from collections import OrderedDict, defaultdict
import logging
import importlib
import pprint
import pkgutil
import time
from .backends import ClusterjobBackend, ResourcesNotSupportedError
from .backends.lpbs import LPbsBackend
from .backends.lsf import LsfBackend
from .backends.pbs import PbsBackend
from .backends.pbspro import PbsProBackend
from .backends.sge import SgeBackend
from .backends.slurm import SlurmBackend
from .status import (STATUS_CODES, COMPLETED, FAILED, CANCELLED, PENDING,
str_status)
from .utils import (set_executable, run_cmd, upload_file, mkdir,
time_to_seconds)
_BACKENDS = [LPbsBackend(), LsfBackend(), PbsBackend(), PbsProBackend(),
SgeBackend(), SlurmBackend()]
def _init_with_read_defaults(cls):
"""Class decorator that calls the read_defaults class method in order to
set default values for class attributes"""
cls.read_defaults(filename=None)
return cls
def _init_default_backends(cls):
"""Register all built-in backends"""
for backend in _BACKENDS:
cls.register_backend(backend)
return cls
[docs]@_init_default_backends
@_init_with_read_defaults
class JobScript(object):
"""Encapsulation of a job script
Arguments:
body (str): Body template for the jobscript as multiline string.
Will be stored in the `body` instance attribute, and processed by
the :meth:`render_script` method before execution.
jobname (str): Name of the job. Will be stored in the
`resources['jobname']` instance attribute.
aux_scripts (dict(str=>str), optional): dictionary of auxiliary
scripts, to be stored in the `aux_scripts` attribute.
Keyword arguments (`kwargs`) that correspond to :ref:`known attributes
<Class/Instance Attributes>` set the value of that (instance) attribute.
Any other keyword arguments are stored as entries in the `resources`
attribute, to be processed by the backend. The following
_`keyword arguments` set resource specification that should be handled by
any backend (or, the backend should raise a
:exc:`~clusterjob.backends.ResourcesNotSupportedError`).
Keyword Arguments:
queue (str): Name of queue/partition to which to submit the job
time (str): Maximum runtime.
See :func:`~clusterjob.utils.time_to_seconds` for
acceptable formats.
nodes (int): Number of nodes on which to run. Depending on the
configuration of the scheduler, if the number of used
cores per node is smaller than the number of CPU cores
on a physical node, multiple jobs may or may not be
placed on the same *physical* node.
ppn (int): (MPI) processes to run per node. The total number of MPI
processes will be ``nodes*ppn``. Note that `ppn` is
*not* the same as the `ppn` keyword in PBS/TORQUE (which
refers to the total number of CPU cores used per node).
threads (int): Number of OpenMP threads, or subprocesses, spawned per
process. The total number of CPU cores used per
node will be ``ppn*threads``.
mem (int): Required memory, per node in MB
stdout (str): Name of file to which to write the jobs stdout
stderr (str): Name of file to which to write the jobs stderr
The above list constitutes the simplified resource model supported by the
`clusterjob` package, as a lowest common denominator of various schedulig
systems. Other keyword argument can be used, but they will be
backend-specific, and may or may not be handled correctly. In the default
SLURM backend, any keyword arguments not in the above list are transformed
directly to arguments for ``sbatch``, where single-letter argument names
are prepended with ``-``, and multi-letter argument names with ``--``. An
argument with boolean values is passed without any value iff the value is
True::
contiguous=True -> --contiguous
dependency='after:12454' -> --dependency=after:12454
F='nodefile.txt' -> -F nodefile.txt
All backends are encouraged to implement a similar behavior, to handle
arbitrary resource requirements. Note that an alternative (and preferred)
way of setting properties (especially backend-specific ones) is through the
:meth:`read_settings` method.
.. rubric:: _`Class Attributes`
The following class attributes cannot be shadowed by instance attributes
of the same name (attempting to do so raises an `AttributeError`)
Class Attributes:
cache_folder (str or None): Local folder in which to cache the
:class:`AsyncResult` instances resulting from job submission. If
None (default), caching is disabled.
cache_prefix (str): Prefix for cache filenames. If caching is enabled,
jobs will be stored inside `cachefolder` in a file
`cache_prefix`.`cache_id`.cache, where `cache_id` is defined in the
`submit` method.
resources (OrderedDict): Dictionary of *default* resource requirements.
Modifying the `resources` class attribute affects the default
resources for all future instantiations.
Note:
The preferred way to set these class attributes is through the
:meth:`read_defaults` class method.
.. rubric:: _`Class/Instance Attributes`
The following are class attributes, with the expectation that
they may be shadowed by instance attributes of the same name.
Attributes:
backend (str): Name of backend, must be an element in
:attr:`JobScript.backends`. That is, if `backend` does not refer to
one of the default backends, the :meth:`register_backend` class
method must be used to register the backend before any job may use
it. Defaults to 'slurm'.
shell (str): Shell that is used to execute the job script. Defaults to
``/bin/bash``.
remote (str or None): Remote server on which to execute submit
commands. If None (default), submit locally.
rootdir (str): Root directory for `workdir`, locally or remote. Defaults
to ``'.'``, i.e., the current working directory. The `rootdir` is
guaranteed not to have a trailing slash.
workdir (str): Work directory (local or remote) in which the job script
file will be placed, and from which the submission command will be
called. Relative to `rootdir`. Defaults to ``'.'`` (current working
directory). The `workdir` is guaranteed not to have a trailing
slash.
filename (str or None): Name of file to which the job script will be
written (inside `rootdir`/`workdir`). If None (default), the
filename will be set from the job name (`resources['jobname']`
attribute) together with a backend-specific file extension
prologue (str): Multiline shell script that will be executed *locally*
in the current working directory before submitting the job. Before
running, the script will be rendered using the
:meth:`render_script` method.
epilogue (str): multiline shell script that will be executed *locally*
in the current working directory the first time that the job is
known to have finished. It will be rendered using the
:meth:`render_script` method at the time that the job is
submitted. It's execution will be handled by the
:class:`AsyncResult` object resulting from the job submission. The
main purpose of the epilogue script is to move data from a remote
cluster upon completion of the job.
max_sleep_interval (int): Upper limit for the number of seconds to
sleep between polling the status of a submitted job.
ssh (str): The executable to use for ssh. If not a full path, must be
in the ``$PATH``.
scp (str): The executable to use for scp. If not a full path, must be
in the ``$PATH``.
This allows to define defaults for all jobs by setting the class attribute,
and overriding them for specific jobs by setting the instance attribute.
For example,
>>> jobscript = JobScript(body='echo "Hello"', jobname='test')
>>> jobscript.shell = '/bin/sh'
sets the shell for only this specific jobscript, whereas
>>> JobScript.shell = '/bin/sh'
sets the class attribute, and thus the default shell for *all* JobScript
instances, both future and existing instantiation:
>>> job1 = JobScript(body='echo "Hello"', jobname='test1')
>>> job2 = JobScript(body='echo "Hello"', jobname='test2')
>>> assert job1.shell == job2.shell == '/bin/sh' # class attribute
>>> JobScript.shell = '/bin/bash'
>>> assert job1.shell == job2.shell == '/bin/bash' # class attribute
>>> job1.shell = '/bin/sh'
>>> assert job1.shell == '/bin/sh' # instance attribute
>>> assert job2.shell == '/bin/bash' # class attribute
Note:
* The preferred way to set these attributes as class attributes (i.e.,
to provide defaults for any instance) is through
the :meth:`read_defaults` class method. To set them as instance
attributes, or to set values in the `resources` instance attribute
defined below, the :meth:`read_settings` method should be used.
* A common purpose of the `prologue` and `epilogue` scripts is
to move data to a remote cluster, e.g. via the `prologue` commands::
ssh {remote} 'mkdir -p {rootdir}/{workdir}'
rsync -av {workdir}/ {remote}:{rootdir}/{workdir}
.. rubric:: _`Instance Attributes`
The following attributes are local to any `JobScript` instance, and are set
automatically during instantiation.
Attributes:
body (str): Multiline string of shell commands. Should not contain
backend-specific resource headers. Before submission, it will be
rendered using the :meth:`render_script` method.
resources (dict): Dictionary of submission options describing resource
requirements. Set on instantiation, based on the default values in
the `resources` class attribute and the keyword arguments passed to
the instantiator.
aux_scripts (dict(str=>str)): Dictionary mapping filenames to script
bodies for any auxiliary scripts. As the main job script (`body`)
is written during submission, any script defined in this
dictionary will also be rendered using the :meth:`render_script`
method and will be written in the same folder as the main script.
While generally not needed, auxiliary scripts may be useful in
structuring a large job.
Example:
>>> body = r'''
... echo "####################################################"
... echo "Job id: $CLUSTERJOB_ID"
... echo "Job name: $CLUSTERJOB_WORKDIR"
... echo "Job started on" `hostname` `date`
... echo "Current directory:" `pwd`
... echo "####################################################"
...
... echo "####################################################"
... echo "Full Environment:"
... printenv
... echo "####################################################"
...
... sleep 90
...
... echo "Job Finished: " `date`
... exit 0
... '''
>>> jobscript = JobScript(body, backend='slurm', jobname='printenv',
... queue='test', time='00:05:00', nodes=1, threads=1, mem=100,
... stdout='printenv.out', stderr='printenv.err')
>>> print(jobscript)
#!/bin/bash
#SBATCH --job-name=printenv
#SBATCH --mem=100
#SBATCH --nodes=1
#SBATCH --partition=test
#SBATCH --error=printenv.err
#SBATCH --output=printenv.out
#SBATCH --cpus-per-task=1
#SBATCH --time=00:05:00
<BLANKLINE>
echo "####################################################"
echo "Job id: $SLURM_JOB_ID"
echo "Job name: $SLURM_SUBMIT_DIR"
echo "Job started on" `hostname` `date`
echo "Current directory:" `pwd`
echo "####################################################"
<BLANKLINE>
echo "####################################################"
echo "Full Environment:"
printenv
echo "####################################################"
<BLANKLINE>
sleep 90
<BLANKLINE>
echo "Job Finished: " `date`
exit 0
<BLANKLINE>
Note:
The fact that arbitrary attributes can be added to an existing
object can be exploited to define arbitrary template variables in
the job script:
>>> body = r'''
... echo {myvar}
... '''
>>> jobscript = JobScript(body, jobname='myvar_test')
>>> jobscript.myvar = 'Hello'
>>> print(jobscript)
#!/bin/bash
#SBATCH --job-name=myvar_test
<BLANKLINE>
echo Hello
<BLANKLINE>
"""
# the following class attribute are fall-backs for intended instance
# attributes. That is, if there is an instance attribute of the same name
# shadowing the class attribute, the instance attribute is used in any
# context
_attributes = {
'backend': 'slurm',
'shell': '/bin/bash',
'remote': None,
'rootdir': '.',
'workdir': '.',
'filename': None,
'prologue': '',
'epilogue': '',
'max_sleep_interval': 900,
'ssh': 'ssh',
'scp': 'scp',
}
# the following are genuine class attributes:
_protected_attributes = {
'_backends': {},
'cache_folder': None,
'cache_prefix': 'clusterjob',
'_cache_counter': 0,
'_run_cmd': staticmethod(run_cmd), # for easy mocking
'_upload_file': staticmethod(upload_file), # for easy mocking
}
# Trying to create an instance attribute of the same name will raise an
# AttributeError.
# The class attributes listed in _attributes and _protected_attributes are
# created and initialized by the _init_with_read_defaults class decorator,
# which runs the `read_defaults` class method with `filename=None` after
# the class definition was processed.
# the `resources` class attribute is copied into an instance attribute on
# every instantiation
resources = OrderedDict()
[docs] @classmethod
def register_backend(cls, backend, name=None):
"""Register a new backend.
Arguments:
backend (clusterjob.backends.ClusterjobBackend): The backend to
register. After registration, the `backend` attribute of a
`ClusterJob` instance may then refer to the backend by name.
name (str): The name under which to register the backend. If not
given, use the name defind in the `backend`'s `name` attribute.
This attribute will be updated with `name`, if given, to ensure
that the name under which the backend is registered and the
`backend`'s internal `name` attribute are the same.
Raises:
TypeError: if backend is not an instance of ClusterjobBackend, or
does not implement the backend interface correctly
AttributeError: if backend does not have the attributes `name` and
`extension`
"""
logger = logging.getLogger(__name__)
if not isinstance(backend, ClusterjobBackend):
raise TypeError("backend must be an instance of ClusterjobBackend")
if name is None:
name = backend.name
else:
backend.name = name
if backend.extension != str(backend.extension):
raise TypeError("backend "+name+" must have extension attribute "
"of type str")
cls._backends[name] = backend
[docs] @classmethod
def clear_cache_folder(cls):
"""Remove all files in the :attr:`cache_folder`"""
if cls.cache_folder is not None:
for file in glob(os.path.join(cls.cache_folder, '*')):
os.unlink(file)
def __init__(self, body, jobname, aux_scripts=None, **kwargs):
self.resources = self.__class__.resources.copy()
self.resources['jobname'] = str(jobname)
self.body = str(body)
self.aux_scripts = {}
if aux_scripts is not None:
self.aux_scripts = {}
# There is no way to preserve the order of the kwargs, so we sort them
# to at least guarantee a stable behavior
for kw in sorted(kwargs):
if kw in self.__class__._attributes:
# We define an instance attribute that shadows the underlying
# class attribute
self.__setattr__(kw, kwargs[kw])
else:
self.resources[kw] = kwargs[kw]
@property
def backends(self):
"""List of names of registered backends"""
return sorted(self._backends.keys())
[docs] def __setattr__(self, name, value):
"""Set attributes while preventing shadowing the "genuine" class
attributes by raising an AttributeError. Perform some checks on the
value, raising a ValueError if necessary."""
if name in self.__class__._protected_attributes:
raise AttributeError("'%s' can only be set as a class attribute"
% name)
else:
self.__dict__[name] = self._sanitize_attr(name, value)
@classmethod
def _sanitize_attr(cls, name, value):
if name == 'backend':
if not value in cls._backends:
raise ValueError("Unknown backend %s" % value)
elif name in ['rootdir', 'workdir']:
value = value.strip()
if value.endswith('/'):
value = value[:-1] # strip trailing slash
elif name in ['prologue', 'epilogue']:
if value is None:
raise ValueError('prologue and epilogue must be strings, '
'not None')
value = dedent(value).strip()
return value
[docs] @classmethod
def read_defaults(cls, filename=None):
"""Set class attributes from the INI file with the given file name
The file must be in the format specified in
https://docs.python.org/3.5/library/configparser.html#supported-ini-file-structure
with the default ConfigParser settings, except that all keys are case
sensitive. It must contain exactly one or both of the sections
"Attributes" and "Resources". The key-value pairs in the Attributes
sections are set as class attributes, whereas the key-value pairs in
the "Resources" section are set as keys and values in the `resources`
class attribute.
All keys in the "Attributes" section must be start with a letter, and
must consist only of letters, numbers, and underscores. Keys in the
"Resources" section can be arbitrary string. The key names 'resources'
and 'backends' may not be used. An example for a valid config file is::
[Attributes]
remote = login.cluster.edu
prologue =
ssh {remote} 'mkdir -p {rootdir}/{workdir}'
rsync -av {workdir}/ {remote}:{rootdir}/{workdir}
epilogue = rsync -av {remote}:{rootdir}/{workdir}/ {workdir}
rootdir = ~/jobs/
# the following is a new attribute
text = Hello World
[Resources]
queue = exec
nodes = 1
threads = 1
mem = 10
If no filename is given, reset all class attributes to their initial
value, and delete any attributes that do not exist by default. This
restores the JobScript class to a pristine state.
"""
logger = logging.getLogger(__name__)
def attr_setter(key, val):
if not re.match(r'^[a-zA-Z]\w*$', key):
raise ConfigParserError(("Key '%s' is invalid. Keys "
"must be valid attribute names, i.e., they must match "
"the regular expression '^[a-zA-Z]\w*$'") % key)
val = cls._sanitize_attr(key, val)
logger.debug("Set class attribute %s = %s", key, val)
setattr(cls, key, val)
def rsrc_setter(key, val):
logger.debug("Set class resources key %s = %s", key, val)
cls.resources[key] = val
if filename is None:
# restore the original class attributes
known_attrs = set.union(set(cls._attributes.keys()),
set(cls._protected_attributes.keys()),
set(['backends', ]) )
for attr in list(cls.__dict__.keys()):
if ((not attr.startswith('_'))
and (attr not in known_attrs)
and (not callable(getattr(cls, attr)))):
logger.debug("Removing class attribute '%s'", attr)
delattr(cls, attr)
for attr in cls._attributes:
logger.debug("Set class attribute '%s' to original value '%s'",
attr, cls._attributes[attr])
setattr(cls, attr, cls._attributes[attr])
for attr in cls._protected_attributes:
# For the '_backends' attribute, the setattr below sets
# cls._backends to a *reference* to
# cls._protected_attributes['_backends'], not a copy. As a
# consequence, any call to register_backend will modify both
# locations, and we don't lose registered backends when
# resetting.
if attr == '_backends':
logger.debug("Keeping known backends: %s", list(
cls._protected_attributes['_backends'].keys()))
else:
logger.debug("Set class attribute '%s' to original value "
"'%s'", attr, cls._protected_attributes[attr])
setattr(cls, attr, cls._protected_attributes[attr])
cls.resources = OrderedDict()
logger.debug("Set class attribute 'resources' to original value "
"OrderedDict()")
else:
cls._read_inifile(filename, attr_setter, rsrc_setter)
[docs] def read_settings(self, filename):
"""Set instance attribute from the INI file with the given file name
This method behaves exactly like the :meth:`read_defaults` class
method, but instead of setting class attributes, it sets instance
attributes ("Attributes" section in the INI file), and instead of
setting values in :attr:`JobScript.resources`, it sets values in the
instance's `resources` dictionary ("Resources" section in the INI
file).
"""
logger = logging.getLogger(__name__)
def attr_setter(key, val):
if not re.match(r'^[a-zA-Z]\w*$', key):
raise ConfigParserError(("Key '%s' is invalid. Keys "
"must be valid attribute names, i.e., they must match "
"the regular expression '^[a-zA-Z]\w*$'") % key)
logger.debug("Set instance attribute %s = %s", key, val)
self.__setattr__(key, val)
def rsrc_setter(key, val):
logger.debug("Set instance resource key %s = %s", key, val)
self.resources[key] = val
self._read_inifile(filename, attr_setter, rsrc_setter)
@staticmethod
def _read_inifile(filename, attr_setter, rsrc_setter):
logger = logging.getLogger(__name__)
config = SafeConfigParser()
config.optionxform=str
with open(filename) as in_fh:
config.readfp(in_fh)
setters = { # section name => where to store keys/values
'Attributes': attr_setter,
'Resources': rsrc_setter,
}
readers = {
# for values that are not strings, be must specify a reader
'Attributes': defaultdict(lambda:config.get,
{'max_sleep_interval': config.getint, }
),
'Resources': defaultdict(lambda:config.get,
{'nodes': config.getint,
'threads': config.getint,
'mem': config.getint,
}
),
}
allowed_sections = sorted(setters.keys())
if len(config.sections()) == 0:
raise ConfigParserError("Inifile must contain at least one "
"of the sections "+str(allowed_sections))
illegal_keys = ['resources', 'backends']
for section in config.sections():
logger.debug("Processing section %s in %s", section, filename)
if section not in allowed_sections:
raise ConfigParserError("Invalid section '%s' in %s. "
"Allowed sections are %s" % (section, filename,
allowed_sections))
for key, __ in config.items(section=section):
if key in illegal_keys:
raise ConfigParserError("Keys %s are not allowed"
% str(illegal_keys))
setters[section](key, readers[section][key](section, key))
def _default_filename(self):
"""If self.filename is None, attempt to set it from the jobname"""
if self.filename is None:
if 'jobname' in self.resources:
self.filename = "%s.%s" \
% (self.resources['jobname'],
self._backends[self.backend].extension)
[docs] def render_script(self, scriptbody, jobscript=False):
"""Render the body of a script. This brings both the main `body`, as
well as the `prologue`, `epilogue`, and any auxiliary scripts into the
final form in which they will be executed.
Rendering proceeds in the following steps:
* Add a "shbang" (e.g. ``#!/bin/bash``) based on the `shell` attribute
if the `scriptbody` does not yet have a shbang on the first line
(otherwise the existing shbang will remain)
* If rendering the body of a JobScript (`jobscript=True`), add
backend-specific resource headers (based on the `resources`
attribute)
* Map environment variables to their corresponding scheduler-specific
version, using the backend's :meth:`~clusterjob.backends.ClusterjobBackend.replace_body_vars`
method. Note that the prologue and epilogue will not be run by a
scheduler, and thus will not have access to the same environment
variables as a job script.
* Format each line with known attributes (see
https://docs.python.org/3.5/library/string.html#formatspec).
In order of precedence (highest to lowest), the following keys will
be replaced:
- keys in the `resources` attribute
- instance attributes
- class attributes
"""
rendered_lines = []
# add the resource headers
backend = self._backends[self.backend]
if jobscript:
rendered_lines.extend(backend.resource_headers(self))
# apply environment variable mappings
scriptbody = backend.replace_body_vars(scriptbody)
# apply attribute mappings
mappings = dict(self.__class__.__dict__)
mappings.update(self.__dict__)
mappings.update(self.resources)
for line_index, line in enumerate(scriptbody.split("\n")):
if line_index == 0:
if not line.startswith("#!"):
# add shbang for file that does not have one
rendered_lines.insert(0, "#!%s" % self.shell)
try:
rendered_lines.append(line.format(**mappings))
except KeyError as exc:
key = str(exc)[1:-1] # stripping out quotes
raise KeyError("The scriptbody contains a formatting "
"placeholder '{"+key+"}', but there is no matching "
"attribute or resource entry")
return "\n".join(rendered_lines)
[docs] def __str__(self):
"""String representation of the job, i.e., the fully rendered
jobscript"""
return self.render_script(self.body, jobscript=True)
[docs] def write(self, filename=None):
"""Write out the fully rendered jobscript to file. If filename is not
None, write to the given *local* file. Otherwise, write to the local or
remote file specified in the filename attribute, in the folder
specified by the rootdir and workdir attributes. The folder will be
created if it does not exist already. A '~' in `filename` will be
expanded to the user's home directory.
"""
remote = self.remote
if filename is None:
self._default_filename()
filename = self.filename
filename = os.path.join(self.rootdir, self.workdir, filename)
else:
remote = None
if filename is None:
raise ValueError("filename not given")
if remote is None:
filename = os.path.expanduser(filename)
self._write_script(str(self), filename, remote)
def _write_script(self, scriptbody, filename, remote):
filepath = os.path.split(filename)[0]
if len(filepath) > 0:
self._run_cmd(['mkdir', '-p', filepath], remote,
ignore_exit_code=False, ssh=self.ssh)
if remote is None:
with open(filename, 'w') as run_fh:
run_fh.write(scriptbody)
set_executable(filename)
else:
with tempfile.NamedTemporaryFile('w', delete=False) as run_fh:
run_fh.write(scriptbody)
tempfilename = run_fh.name
set_executable(tempfilename)
try:
self._upload_file(tempfilename, remote, filename, scp=self.scp)
finally:
os.unlink(tempfilename)
def _run_prologue(self):
"""Render and run the prologue script"""
if self.prologue is not None:
prologue = self.render_script(self.prologue)
with tempfile.NamedTemporaryFile('w', delete=False) as prologue_fh:
prologue_fh.write(prologue)
tempfilename = prologue_fh.name
set_executable(tempfilename)
try:
sp.check_output( [tempfilename, ], stderr=sp.STDOUT)
except sp.CalledProcessError as e:
logger = logging.getLogger(__name__)
logger.error(r'''
Prologue script did not exit cleanly.
CWD: {cwd}
prologue: ---
{prologue}
---
response: ---
{response}
---
'''.format(cwd=os.getcwd(), prologue=self.prologue,
response=e.output))
raise
finally:
os.unlink(tempfilename)
[docs] def submit(self, block=False, cache_id=None, force=False, retry=True):
"""Run the :attr:`prologue` script (if defined), then submit the job to
a local or remote scheduler.
Parameters
----------
block: boolean, optional
If `block` is True, wait until the job is finished, and return the
exit status code (see :mod:`clusterjob.status`). Otherwise, return
an :class:`AsyncResult` object.
cache_id: str or None, optional
An ID uniquely defining the submission, used as identifier for the
cached :class:`AsyncResult` object. If not given, the `cache_id` is
determined internally. If an :class:`AsyncResult` with a matching
`cache_id` is present in the :attr:`cache_folder`, nothing is
submitted to the scheduler, and the cached :class:`AsyncResult`
object is returned. The :attr:`prologue` script is not re-run when
recovering a cached result.
force: boolean, optional
If True, discard any existing cached :class:`AsyncResult` object,
ensuring that the job is sent to the scheduler.
retry: boolean, optional
If True, and the existing cached :class:`AsyncResult` indicates
that the job finished with an error (``CANCELLED``/``FAILED``),
resubmit the job, discard the cache and return a fresh
:class:`AsyncResult` object
"""
logger = logging.getLogger(__name__)
if self.remote is None:
logger.info("Submitting job %s locally",
self.resources['jobname'])
else:
logger.info("Submitting job %s on %s",
self.resources['jobname'], self.remote)
submitted = False
if cache_id is None:
JobScript._cache_counter += 1
cache_id = str(JobScript._cache_counter)
else:
cache_id = str(cache_id)
cache_file = None
backend = self._backends[self.backend]
if self.cache_folder is not None:
mkdir(self.cache_folder)
cache_file = os.path.join(
self.cache_folder,
"%s.%s.cache" % (self.cache_prefix, cache_id))
if os.path.isfile(cache_file):
if force:
try:
os.unlink(cache_file)
except OSError:
pass
else:
logger.debug("Reloading AsyncResult from %s", cache_file)
ar = AsyncResult.load(cache_file, backend=backend)
submitted = True
if ar._status >= CANCELLED:
if retry:
logger.debug("Cached run %s, resubmitting",
str_status[ar._status])
os.unlink(cache_file)
submitted = False
if not submitted:
for filename in self.aux_scripts:
self._write_script(
scriptbody=self.render_script(self.aux_scripts[filename]),
filename=os.path.join(self.rootdir, self.workdir,
filename),
remote=self.remote)
job_id = None
try:
self.write()
self._run_prologue()
cmd = backend.cmd_submit(self)
response = self._run_cmd(cmd, self.remote, self.rootdir,
self.workdir, ignore_exit_code=True,
ssh=self.ssh)
job_id = backend.get_job_id(response)
if job_id is None:
logger.error("Failed to submit job")
status = FAILED
else:
logger.info("Job ID: %s", job_id)
status = PENDING
except (sp.CalledProcessError, ResourcesNotSupportedError) as e:
logger.error("Failed to submit job: %s", e)
status = FAILED
ar = AsyncResult(backend=backend)
ar.ssh = self.ssh
ar.scp = self.scp
ar.remote = self.remote
ar.cache_file = cache_file
ar.backend = backend
try:
ar.max_sleep_interval \
= int(time_to_seconds(self.resources['time']) / 10)
if ar.max_sleep_interval < 10:
ar.max_sleep_interval = 10
except KeyError:
ar.max_sleep_interval = self.max_sleep_interval
if self.max_sleep_interval < ar.max_sleep_interval:
ar.max_sleep_interval = self.max_sleep_interval
ar._status = status
ar.job_id = job_id
if self.epilogue is not None:
epilogue = self.render_script(self.epilogue)
ar.epilogue = epilogue
if block:
result = ar.get()
else:
result = ar
ar.dump()
return result
[docs]class AsyncResult(object):
"""Result of submitting a jobscript
Arguments:
backend (clusterjob.backends.ClusterjobBackend): Value for the
:attr:`backend` attribute
Attributes:
remote (str or None): The remote host on which the job is running.
Passwordless ssh must be set up to reach the remote. A value of
None indicates that the job is running locally
cache_file (str or None): The full path and name of the file to be used
to cache the `AsyncResult` object. The cache file will be written
automatically anytime a change in status is detected
backend (clusterjob.backends.ClusterjobBackend): A reference to the
backend instance under which the job is running
max_sleep_interval (int): Upper limit for the number of seconds to
sleep between polls to the cluster scheduling systems when waiting
for the Job to finish
job_id (str): The Job ID assigned by the cluster scheduler
epilogue (str): Multiline script to be run once when the status changes
from "running" (pending/running) to "not running" (completed,
canceled, failed). The contents of this variable will be written
to a temporary file as is, and executed as a script in the current
working directory.
ssh (str): The executable to use for ssh. If not a full path, must be
in the ``$PATH``.
scp (str): The executable to use for scp. If not a full path, must be
in the ``$PATH``.
"""
_run_cmd = staticmethod(run_cmd)
# setting the sleep_interval < 1 can have some very problematic
# consequences, so we build in a safety net.
_min_sleep_interval = 1
# For testing, we can still get around this under the assumption that we
# know exactly what we're doing!
def __init__(self, backend):
self.remote = None
self.cache_file = None
if not isinstance(backend, ClusterjobBackend):
raise TypeError("backend must be an instance of ClusterjobBackend")
self.backend = backend
self.max_sleep_interval = 160
self.job_id = ''
self._status = CANCELLED
self.epilogue = None
self.ssh = 'ssh'
self.scp = 'scp'
@property
def status(self):
"""Return the job status as one of the codes defined in the
`clusterjob.status` module.
finished, communicate with the cluster to determine the job's status.
"""
if self._status >= COMPLETED:
return self._status
else:
cmd = self.backend.cmd_status(self, finished=False)
response = self._run_cmd(cmd, self.remote, ignore_exit_code=True,
ssh=self.ssh)
status = self.backend.get_status(response, finished=False)
if status is None:
cmd = self.backend.cmd_status(self, finished=True)
response = self._run_cmd(cmd, self.remote,
ignore_exit_code=True, ssh=self.ssh)
status = self.backend.get_status(response, finished=True)
prev_status = self._status
self._status = status
if self._status not in STATUS_CODES:
raise ValueError("Invalid status code %s", self._status)
if prev_status != self._status:
if self._status >= COMPLETED:
self.run_epilogue()
self.dump()
return self._status
[docs] def get(self, timeout=None):
"""Return status"""
status = self.status
if status >= COMPLETED:
return status
else:
self.wait(timeout)
return self.status
[docs] def dump(self, cache_file=None):
"""Write dump out to file `cache_file`, defaulting to
``self.cache_file``"""
if cache_file is None:
cache_file = self.cache_file
if cache_file is not None:
self.cache_file = cache_file
with open(cache_file, 'wb') as pickle_fh:
pickle.dump(
(self.remote, self.backend.name, self.max_sleep_interval,
self.job_id, self._status, self.epilogue, self.ssh,
self.scp),
pickle_fh)
[docs] @classmethod
def load(cls, cache_file, backend=None):
"""Instantiate AsyncResult from dumped `cache_file`.
This is the inverse of :meth:`dump`.
Parameters
----------
cache_file: str
Name of file from which the run should be read.
backend: clusterjob.backends.ClusterjobBackend or None
The backend instance for the job. If None, the backend will be
determined by the *name* of the dumped job's backend.
"""
with open(cache_file, 'rb') as pickle_fh:
(remote, backend_name, max_sleep_interval, job_id, status,
epilogue, ssh, scp) = pickle.load(pickle_fh)
if backend is None:
backend = JobScript._backends[backend_name]
ar = cls(backend)
(ar.remote, ar.max_sleep_interval, ar.job_id, ar._status, ar.epilogue,
ar.ssh, ar.scp) \
= (remote, max_sleep_interval, job_id, status, epilogue, ssh, scp)
ar.cache_file = cache_file
return ar
[docs] def wait(self, timeout=None):
"""Wait until the result is available or until roughly timeout seconds
pass."""
logger = logging.getLogger(__name__)
if int(self.max_sleep_interval) < int(self._min_sleep_interval):
self.max_sleep_interval = int(self._min_sleep_interval)
t0 = time.time()
sleep_seconds = min(5, self.max_sleep_interval)
status = self.status
prev_status = status
while status < COMPLETED:
logger.debug("sleep for %d seconds", sleep_seconds)
time.sleep(sleep_seconds)
if 2*sleep_seconds <= self.max_sleep_interval:
sleep_seconds *= 2
if timeout is not None:
if int(time.time() - t0) > int(timeout):
return
status = self.status
if status != prev_status:
sleep_seconds = min(5, self.max_sleep_interval)
prev_status = status
[docs] def ready(self):
"""Return whether the job has completed."""
return (self.status >= COMPLETED)
[docs] def successful(self):
"""Return True if the job finished with a COMPLETED status, False if it
finished with a CANCELLED or FAILED status. Raise an `AssertionError`
if the job has not completed"""
status = self.status
assert status >= COMPLETED, "status is %s" % status
return (self.status == COMPLETED)
[docs] def cancel(self):
"""Instruct the cluster to cancel the running job. Has no effect if
job is not running"""
if self.status > COMPLETED:
return
cmd = self.backend.cmd_cancel(self)
self._run_cmd(cmd, self.remote, ignore_exit_code=True, ssh=self.ssh)
self._status = CANCELLED
self.dump()
[docs] def run_epilogue(self):
"""Run the epilogue script in the current working directory.
raises:
subprocess.CalledProcessError: if the script does not finish with
exit code zero.
"""
logger = logging.getLogger(__name__)
if self.epilogue is not None:
with tempfile.NamedTemporaryFile('w', delete=False) as epilogue_fh:
epilogue_fh.write(self.epilogue)
tempfilename = epilogue_fh.name
set_executable(tempfilename)
try:
sp.check_output( [tempfilename, ], stderr=sp.STDOUT)
except sp.CalledProcessError as e:
logger.error(dedent(r'''
Epilogue script did not exit cleanly.
CWD: {cwd}
epilogue: ---
{epilogue}
---
response: ---
{response}
---
''').format(cwd=os.getcwd(), epilogue=self.epilogue,
response=e.output))
raise
finally:
os.unlink(tempfilename)