Source code for clusterjob.utils

"""Collection of utility functions"""
from __future__ import absolute_import
import os
import stat
import sys
import logging
import subprocess as sp
import pprint
import re
import json
try:
    from shlex import quote
except ImportError:
    from pipes import quote

CMD_RESPONSE_ENCODING = 'utf-8'


[docs]def set_executable(filename): """Set the exectuable bit on the given filename""" st = os.stat(filename) os.chmod(filename, st.st_mode | stat.S_IEXEC)
[docs]def write_file(filename, data): """Write data to the file with the given filename""" with open(filename, 'w') as out_fh: out_fh.write(data)
[docs]def split_seq(seq, n_chunks): """Split the given sequence into `n_chunks`. Suitable for distributing an array of jobs over a fixed number of workers. >>> split_seq([1,2,3,4,5,6], 3) [[1, 2], [3, 4], [5, 6]] >>> split_seq([1,2,3,4,5,6], 2) [[1, 2, 3], [4, 5, 6]] >>> split_seq([1,2,3,4,5,6,7], 3) [[1, 2], [3, 4, 5], [6, 7]] """ newseq = [] splitsize = 1.0/n_chunks*len(seq) for i in range(n_chunks): newseq.append(seq[int(round(i*splitsize)):int(round((i+1)*splitsize))]) return newseq
[docs]def read_file(filename): """ Return the contents of the file with the given filename as a string >>> write_file('read_write_file.txt', 'Hello World') >>> read_file('read_write_file.txt') 'Hello World' >>> os.unlink('read_write_file.txt') """ with open(filename) as in_fh: return in_fh.read()
[docs]def upload_file(localfile, remote, remotefile, scp='scp'): """Run ``{scp} {localfile} {remote}:{remotefile}`` Parameters: localfile (str): relative or absolute path to a local file remote (str): Host on which to put the file remotefile (str): remote path where to put the file. May start with '~' to indicate the home directory. scp (str): the scp executables. If not a full path, the executable must be in ``$PATH``. Raises: subprocess.CalledProcessError: if call to `scp` fails. """ sp.check_output( [scp, localfile, remote+':'+remotefile], stderr=sp.STDOUT)
[docs]def run_cmd(cmd, remote, rootdir='', workdir='', ignore_exit_code=False, ssh='ssh'): r'''Run the given cmd in the given workdir, either locally or remotely, and return the combined stdout/stderr Parameters: cmd (list of str or str): Command to execute, as list consisting of the command, and options. Alternatively, the command can be given a single string, which will then be executed as a shell command. Only use shell commands when necessary, e.g. when the command involves a pipe. remote (None or str): If None, run command locally. Otherwise, run on the given host (via SSH) rootdir (str, optional): Local or remote root directory. The `workdir` variable is taken relative to `rootdir`. If not specified, effectively the current working directory is used as the root for local commands, and the home directory for remote commands. Note that `~` may be used to indicate the home directory locally or remotely. workdir (str, optional): Local or remote directory from which to run the command, relative to `rootdir`. If `rootdir` is empty, `~` may be used to indicate the home directory. ignore_exit_code (boolean, optional): By default, `subprocess.CalledProcessError` will be raised if the call has an exit code other than 0. This exception can be supressed by passing `ignore_exit_code=False` ssh (str, optional): The executable to be used for ssh. If not a full path, the executable must be in ``$PATH`` Example: >>> import tempfile, os, shutil >>> tempfolder = tempfile.mkdtemp() >>> scriptfile = os.path.join(tempfolder, 'test.sh') >>> with open(scriptfile, 'w') as script_fh: ... script_fh.writelines(["#!/bin/bash\n", "echo Hello $1\n"]) >>> set_executable(scriptfile) >>> run_cmd(['./test.sh', 'World'], remote=None, workdir=tempfolder) 'Hello World\n' >>> run_cmd("./test.sh World | tr '[:upper:]' '[:lower:]'", remote=None, ... workdir=tempfolder) 'hello world\n' >>> shutil.rmtree(tempfolder) ''' logger = logging.getLogger(__name__) workdir = os.path.join(rootdir, workdir) if type(cmd) in [list, tuple]: use_shell = False else: cmd = str(cmd) use_shell = True try: if remote is None: # run locally workdir = os.path.expanduser(workdir) if use_shell: logger.debug("COMMAND: %s", cmd) else: logger.debug("COMMAND: %s", " ".join([quote(part) for part in cmd])) if workdir == '': response = sp.check_output(cmd, stderr=sp.STDOUT, shell=use_shell) else: response = sp.check_output(cmd, stderr=sp.STDOUT, cwd=workdir, shell=use_shell) else: # run remotely if not use_shell: cmd = " ".join(cmd) if workdir == '': cmd = [ssh, remote, cmd] else: cmd = [ssh, remote, 'cd %s && %s' % (workdir, cmd)] logger.debug("COMMAND: %s", " ".join([quote(part) for part in cmd])) response = sp.check_output(cmd, stderr=sp.STDOUT) except sp.CalledProcessError as e: if ignore_exit_code: response = e.output else: raise if sys.version_info >= (3, 0): # For Python 3, we should return a unicode string, so that the backends # can safely assume that string operations such as regex matching are # possible. response = response.decode(CMD_RESPONSE_ENCODING) if logger.getEffectiveLevel() <= logging.DEBUG: if "\n" in response: if len(response.splitlines()) == 1: logger.debug("RESPONSE: %s", response) else: logger.debug("RESPONSE: ---\n%s\n---", response) else: logger.debug("RESPONSE: '%s'", response) return response
def _wrap_run_cmd(jsonfile, mode='replay'): """Wrapper around :func:`run_cmd` for the testing using a record-replay model """ logger = logging.getLogger(__name__) records = [] counter = 0 json_opts = {'indent': 2, 'separators':(',',': '), 'sort_keys': True} def run_cmd_record(*args, **kwargs): response = run_cmd(*args, **kwargs) records.append({'args': args, 'kwargs': kwargs, 'response': response}) with open(jsonfile, 'w') as out_fh: json.dump(records, out_fh, **json_opts) return response def run_cmd_replay(*args, **kwargs): record = records.pop(0) logger.debug("cached run_cmd, args=%s, kwargs=%s" % (str(args), str(kwargs)) ) assert list(record['args']) == list(args), \ "run_cmd call #%d: Obtained args: '%s'; Expected args: '%s'" \ % (counter+1, str(args), str(record['args'])) assert record['kwargs'] == kwargs, \ "run_cmd call #%d: Obtained kwargs: '%s'; Expected kwargs: '%s'" \ % (counter+1, str(kwargs), str(record['kwargs'])) response = record['response'] if "\n" in response: if len(response.splitlines()) == 1: logger.debug("cached response: %s", response) else: logger.debug("cached response: ---\n%s\n---", response) else: logger.debug("cached response: '%s'", response) return response if mode == 'replay': with open(jsonfile) as in_fh: records = json.load(in_fh) return run_cmd_replay elif mode == 'record': return run_cmd_record else: raise ValueError("Invalid mode")
[docs]def time_to_seconds(time_str): """Convert a string describing a time duration into seconds. The supported formats are:: minutes minutes:seconds hours:minutes:seconds days-hours days-hours:minutes days-hours:minutes:seconds days:hours:minutes:seconds Raises: ValueError: if `time_str` has an invalid format. Examples: >>> time_to_seconds('10') 600 >>> time_to_seconds('10:00') 600 >>> time_to_seconds('10:30') 630 >>> time_to_seconds('1:10:30') 4230 >>> time_to_seconds('1-1:10:30') 90630 >>> time_to_seconds('1-0') 86400 >>> time_to_seconds('1-10') 122400 >>> time_to_seconds('1-1:10') 90600 >>> time_to_seconds('1-1:10:30') 90630 >>> time_to_seconds('1:1:10:30') 90630 >>> time_to_seconds('1 1:10:30') Traceback (most recent call last): ... ValueError: '1 1:10:30' has invalid pattern """ patterns = [ re.compile(r'^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$'), re.compile(r'^(?P<days>\d+)-(?P<hours>\d+)$'), re.compile(r'^(?P<minutes>\d+)$'), re.compile(r'^(?P<minutes>\d+):(?P<seconds>\d+)$'), re.compile(r'^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$'), re.compile( r'^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$'), re.compile( r'^(?P<days>\d+):(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$'), ] seconds = 0 for pattern in patterns: match = pattern.match(str(time_str).strip()) if match: if 'seconds' in match.groupdict(): seconds += int(match.group('seconds')) if 'minutes' in match.groupdict(): seconds += 60*int(match.group('minutes')) if 'hours' in match.groupdict(): seconds += 3600*int(match.group('hours')) if 'days' in match.groupdict(): seconds += 86400*int(match.group('days')) return seconds raise ValueError("'%s' has invalid pattern" % time_str)
[docs]def mkdir(name, mode=0o750): """Implementation of ``mkdir -p``: Creates folder with the given `name` and the given permissions (`mode`) * Create missing parents folder * Do nothing if the folder with the given `name` already exists * Raise `OSError` if there is already a file with the given `name` """ if os.path.isdir(name): pass elif os.path.isfile(name): raise OSError("A file with the same name as the desired " \ "dir, '%s', already exists." % name) else: os.makedirs(name, mode)