Source code for salmagundi.utils

"""Utilities.

.. versionadded:: 0.5.0
"""

import errno
import os
import socket
import sys
import string
import tempfile
import textwrap
from contextlib import contextmanager, suppress

from ._stopwatch import StopWatch, StopWatchError

__all__ = ['AlreadyRunning', 'StopWatch', 'StopWatchError', 'check_bytes_like',
           'check_path_like', 'check_type', 'docopt_helper',
           'ensure_single_instance', 'sys_exit']


[docs]def check_type(obj, classinfo, name='object', msg=None): """Check the type of an object. >>> utils.check_type(1, str, 'num') Traceback (most recent call last): ... TypeError: the type of 'num' must be 'str', got 'int' >>> utils.check_type(1, (str, float), 'num') Traceback (most recent call last): ... TypeError: the type of 'num' must be one of 'str, float', got 'int' >>> utils.check_type(1, str, msg='wrong type for num') Traceback (most recent call last): ... TypeError: wrong type for num :param object obj: the object :param classinfo: see :func:`isinstance` :param str name: name shown in the exception message :param str msg: message for the exception :raises TypeError: if the check fails """ if not isinstance(obj, classinfo): if not msg: if isinstance(classinfo, tuple): msg = ('the type of %r must be one of %r, got %r' % (name, ', '.join(map(lambda x: x.__name__, classinfo)), obj.__class__.__name__)) else: msg = ('the type of %r must be %r, got %r' % (name, classinfo.__name__, obj.__class__.__name__)) raise TypeError(msg)
[docs]def check_path_like(obj, name='object', msg=None): """Check if an object is a :term:`path-like object`. :param object obj: the object :param str name: name shown in the exception message :param str msg: alternative message :raises TypeError: if the check fails """ if not msg: msg = ('%r must be a path-like object, got %r' % (name, obj.__class__.__name__)) check_type(obj, (str, bytes, os.PathLike), None, msg)
[docs]def check_bytes_like(obj, name='object', msg=None): """Check if an object is a :term:`bytes-like object`. :param object obj: the object :param str name: name shown in the exception message :param str msg: alternative message :raises TypeError: if the check fails """ if not msg: msg = ('%r must be a bytes-like object, got %r' % (name, obj.__class__.__name__)) try: memoryview(obj) except TypeError: raise TypeError(msg) from None
[docs]def docopt_helper(text, *, name=None, version=None, version_str=None, argv=None, help=True, options_first=False, converters=None, err_code=1, **kwargs): """Helper function for `docopt <https://pypi.org/project/docopt/>`_. The ``name`` defaults to ``os.path.basename(sys.argv[0])``. If ``version`` is a :class:`tuple` it will be converted to a string with ``'.'.join(map(str, version))``. If ``version_str`` is set it will be printed if called with ``--version``. Else if ``version`` is set the resulting string will be ``name + ' ' + version``. Within the help message string substitution is supported with :ref:`template strings <python:template-strings>`. The placeholder identifiers ``name``, ``version`` and ``version_str`` are always available; more can be added with ``kwargs``. If the help message is indented it will be dedented so that the least indented lines line up with the left edge of the display. .. |docopt_api| replace:: docopt.docopt() .. _docopt_api: https://pypi.org/project/docopt/#api .. _ref-convs: The optional argument ``converters`` is a mapping with the same keys as in the dictionary returned by |docopt_api|_. The values are callables which take one argument of an appropriate type and return a value of the desired type. It is not required to provide a converter for every option, argument and command. If a value cannot be converted the converter should raise a :exc:`ValueError`. Example (naval_fate.py): .. literalinclude:: /_files/docopt_helper_example.py :language: python3 .. literalinclude:: /_files/docopt_helper_example.txt :language: none :emphasize-lines: 26,29,30 :param str text: help message :param str name: name of program/script :param version: version :type version: str or tuple :param str version_str: version string :param argv: see: |docopt_api|_ :type argv: list(str) :param bool option_first: see: |docopt_api|_ :param bool help: see: |docopt_api|_ :param dict converters: see :ref:`above <ref-convs>` :param int err_code: exit status code if an error occurs :param kwargs: additional values for substitution in the help message :return: result of |docopt_api|_ :rtype: dict :raises SystemExit: if program was invoked with incorrect arguments or a converter function raised a :exc:`ValueError` .. versionadded:: 0.10.0 .. versionchanged:: 0.11.0 Add parameter ``err_code`` """ from . import _docopt if name is None: name = os.path.basename(sys.argv[0]) if isinstance(version, tuple): version = '.'.join(map(str, version)) if version_str is None and version: version_str = f'{name} {version}' mapping = dict(name=name, version=version, version_str=version_str) try: arguments = _docopt.docopt( string.Template( textwrap.dedent(text)).substitute(mapping, **kwargs), version=version_str, argv=argv, help=help, options_first=options_first) except SystemExit as ex: sys_exit(ex, err_code) if converters: for key, conv in converters.items(): try: arguments[key] = conv(arguments[key]) except ValueError as ex: sys_exit(f'error in {key!r}: {ex}', err_code) return arguments
[docs]def sys_exit(arg=None, code=None, *, logger=None): """Exit from Python. If ``code`` is not an integer, this function calls :func:`sys.exit` with ``arg`` as its argument. Otherwise ``arg`` will be printed to :data:`sys.stderr` if it is not ``None`` and :func:`sys.exit` will be called with ``code`` as its argument. If ``logger`` is set, the message, if any, will be logged with level ``CRITICAL`` instead of printing it to :data:`sys.stderr`. :param arg: see: :func:`sys.exit` :param code: exit code (ignored if not an :class:`int`) :type code: int or None :param logger: a logger :type logger: logging.Logger :raises SystemExit: .. versionadded:: 0.11.0 """ if not isinstance(code, int): code = None if code is None: if logger and arg is not None and not isinstance(arg, int): logger.critical(str(arg)) sys.exit(1) else: sys.exit(arg) elif arg is not None: if logger: logger.critical(str(arg)) else: print(arg, file=sys.stderr) sys.exit(code)
[docs]class AlreadyRunning(Exception): """Raised by :func:`ensure_single_instance`."""
[docs]@contextmanager def ensure_single_instance(lockname=None, *, lockdir=None, # noqa: C901 extra=None, err_code=1, err_msg=None, use_socket=False): """Make sure that only one instance of the program/script is running. The result of this function can be used as a context manager in `with` statements or as a decorator. .. code-block:: python def main(): ... if __name__ == '__main__': with ensure_single_instance(): main() # is equivalent to: @ensure_single_instance() def main(): ... if __name__ == '__main__': main() If ``lockname`` is not set the name will be constructed from the absolute path of the program/script and the lock file will be created in the ``lockdir`` (which defaults to the temporary directory). On Linux ,if ``use_socket=True``, an abstract domain socket will be used instead of a lock file and the name of the socket will be the value of ``lockname``. This function should work on Windows and any platform that supports :mod:`fcntl` but it is only tested on Linux. The user running the program/script must have the permissions to create and delete the lock file. If the program/script will be run by multiple users the single instance restriction can be per user or system wide. The temporary directory on Windows is normally user specific; on unix-like systems it is normally one directory for all users. To create a user specific lock name the ``extra`` argument can be used: .. code-block:: python import getpass from salmagundi.utils import ensure_single_instance with ensure_single_instance(extra=getpass.getuser()): ... :param str lockname: user defined lock name :param lockdir: user defined directory for lock files (must exist and the path must be absolute; ignored if ``use_socket=True``) :type lockdir: :term:`path-like object` :param str extra: will be appended to lock name :param err_code: exit status code if another instance is running (if set to ``None`` :exc:`AlreadyRunning` will be raised instead of :exc:`SystemExit`) :type err_code: int or None :param err_msg: error message (if ``None`` it defaults to ``f'already running: {sys.argv[0]}'``) :type err_msg: str or None :param bool use_socket: if true an abstract domain socket is used (**Linux only**) :raises SystemExit: if another instance is running and ``err_code`` is not None :raises AlreadyRunning: if another instance is running and ``err_code`` is None :raises RuntimeError: if ``lockdir`` is not absolute or ``use_socket=True`` and platform is not Linux :raises OSError: if the lock file could not be created/deleted .. versionadded:: 0.11.0 .. versionchanged:: 0.11.2 Rename parameter ``lockfile`` to ``lockname`` """ if err_code is not None and not isinstance(err_code, int): err_code = 1 if not lockname: lockname = os.path.abspath( sys.argv[0]).translate(str.maketrans(r'\/.: ', '-----')).strip('-') if extra: lockname += '-' + extra if use_socket: if not sys.platform.startswith('linux'): raise RuntimeError( 'abstract domain sockets only supported on Linux') sock = socket.socket(socket.AF_UNIX) try: sock.bind('\0' + lockname) yield except OSError as ex: if ex.errno == errno.EADDRINUSE: _already_running(err_code, err_msg) raise finally: sock.close() else: if lockdir and not os.path.isabs(lockdir): raise RuntimeError('lockdir path must be absolute') if not lockdir: lockdir = tempfile.gettempdir() lockfile = os.path.join(lockdir, lockname + '.lock') if sys.platform == 'win32': try: os.remove(lockfile) except FileNotFoundError: pass except OSError as ex: if ex.winerror == 32: # ERROR_SHARING_VIOLATION _already_running(err_code, err_msg) raise try: fd = os.open(lockfile, os.O_CREAT | os.O_EXCL | os.O_WRONLY) try: yield finally: os.close(fd) os.remove(lockfile) except FileExistsError: # another process was faster _already_running(err_code, err_msg) else: import fcntl if os.path.exists(lockfile): flags = os.O_WRONLY else: flags = os.O_CREAT | os.O_WRONLY mask = os.umask(0) try: fd = os.open(lockfile, flags, 0o222) finally: os.umask(mask) try: fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) try: yield finally: fcntl.lockf(fd, fcntl.LOCK_UN) os.close(fd) with suppress(PermissionError): os.remove(lockfile) except OSError as ex: if ex.errno in (errno.EACCES, errno.EAGAIN): _already_running(err_code, err_msg) raise
def _already_running(err_code, err_msg): if err_msg is None: err_msg = f'already running: {sys.argv[0]}' if err_code is None: raise AlreadyRunning(err_msg) from None else: sys_exit(err_msg or None, err_code)