Source code for salmagundi.files

"""File utilities.

If a file descriptor is given for the parameter ``file`` it will be closed
after reading from/writing to the file.

For a list of supported encodings see
:ref:`Standard Encodings in module codecs <python:standard-encodings>`.
The default encoding is platform dependant.

For a list of error handlers see
:ref:`Error Handlers in module codecs <python:error-handlers>`.
The default error handler is ``'strict'``.
"""

import datetime
import os
import time

__all__ = ['append_all', 'append_lines', 'copyfile', 'on_same_dev', 'read_all',
           'read_lines', 'touch', 'write_all', 'write_lines']


[docs]def read_all(file, binary=False, encoding=None, errors=None): """Read and return the content of the file. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param bool binary: if ``True`` the content will be returned as ``bytes`` else as ``str`` :param str encoding: name of the encoding (ignored if ``binary=True``) :param str errors: error handler (ignored if ``binary=True``) :return: the file content :rtype: bytes or str :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ mode, encoding, errors = (('rb', None, None) if binary else ('rt', encoding, errors)) with open(file, mode=mode, encoding=encoding, errors=errors) as fh: return fh.read()
[docs]def read_lines(file, predicate=None, encoding=None, errors=None): """Read and return the content of the file as a list of lines. Line breaks are not included in the resulting list. If ``predicate`` is given, it must be a callable that takes a single line as its argument and returns a bool. Only the lines for which ``True`` is returned are included in the result. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param predicate: predicate function :type predicate: callable(str) :param str encoding: name of the encoding :param str errors: error handler :return: list of lines :rtype: list(str) :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ result = [] with open(file, encoding=encoding, errors=errors) as fh: for line in fh: line = line.rstrip('\n') if not predicate or predicate(line): result.append(line) return result
[docs]def write_all(file, content, binary=False, encoding=None, errors=None): """Write the content to a file. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param content: file content :type content: bytes or str :param bool binary: if ``True`` the content must be ``bytes`` else ``str`` :param str encoding: name of the encoding (ignored if ``binary=True``) :param str errors: error handler (ignored if ``binary=True``) :return: number of bytes or characters written :rtype: int :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ mode, encoding, errors = (('wb', None, None) if binary else ('wt', encoding, errors)) with open(file, mode=mode, encoding=encoding, errors=errors) as fh: return fh.write(content)
[docs]def write_lines(file, lines, encoding=None, errors=None): """Write the lines to a file. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param list(str) lines: list of strings w/o newline :param str encoding: name of the encoding :param str errors: error handler :return: number of characters written :rtype: int :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ with open(file, 'w', encoding=encoding, errors=errors) as fh: cnt = fh.write('\n'.join(lines)) cnt += fh.write('\n') return cnt
[docs]def append_all(file, content, binary=False, encoding=None, errors=None): """Append the content to a file. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param content: file content :type content: bytes or str :param bool binary: if ``True`` the content must be ``bytes`` else ``str`` :param str encoding: name of the encoding (ignored if ``binary=True``) :param str errors: error handler (ignored if ``binary=True``) :return: number of bytes or characters written :rtype: int :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ mode, encoding, errors = (('ab', None, None) if binary else ('at', encoding, errors)) with open(file, mode=mode, encoding=encoding, errors=errors) as fh: return fh.write(content)
[docs]def append_lines(file, lines, encoding=None, errors=None): """Append the lines to a file. :param file: path to file or file descriptor :type file: :term:`path-like object` or int :param list(str) lines: list of strings w/o newline :param str encoding: name of the encoding :param str errors: error handler :return: number of characters written :rtype: int :raises OSError: on I/O failure .. versionchanged:: 0.6.0 Add parameter ``errors`` """ with open(file, 'a', encoding=encoding, errors=errors) as fh: cnt = fh.write('\n'.join(lines)) cnt += fh.write('\n') return cnt
[docs]def touch(filepath, new_time=None, atime=True, mtime=True, create=True): """Change file timestamps. The ``new_time`` parameter may be: ==================== === ``None`` the current time will be used ``int`` or ``float`` seconds since the epoch ``datetime`` from module :mod:`datetime` ``struct_time`` from module :mod:`time` ``path-like object`` path to a file which timestamps should be used ==================== === :param filepath: the file for which the timestamps should be changed :type filepath: :term:`path-like object` :param new_time: the new time (see above for more details) :param bool atime: if ``True`` change access time :param bool mtime: if ``True`` change modification time :param bool create: if ``True`` an empty file will be created if it does not exist :raises FileNotFoundError: if ``filepath`` does not exist and ``create=False`` or the reference file for ``new_time`` does not exist :raises TypeError: if ``new_time`` is of wrong type .. versionadded:: 0.5.0 """ if not os.path.exists(filepath) and create: with open(filepath, 'w'): pass file_atime = os.path.getatime(filepath) file_mtime = os.path.getmtime(filepath) if new_time is None: atime_s = mtime_s = time.time() elif isinstance(new_time, (int, float)): atime_s = mtime_s = new_time elif isinstance(new_time, datetime.datetime): atime_s = mtime_s = new_time.timestamp() elif isinstance(new_time, time.struct_time): atime_s = mtime_s = time.mktime(new_time) elif isinstance(new_time, (str, bytes, os.PathLike)): atime_s = os.path.getatime(new_time) mtime_s = os.path.getmtime(new_time) else: raise TypeError('wrong type for argument new_time') os.utime(filepath, (atime_s if atime else file_atime, mtime_s if mtime else file_mtime))
[docs]def on_same_dev(file1, file2): """Return ``True`` if both files are on the same device/partition. ``file1, file2`` may also refer to directories. :param file1: path to file or file descriptor :type file1: :term:`path-like object` or int :param file2: path to file or file descriptor :type file2: :term:`path-like object` or int :return: ``True`` if both files are on the same device/partition :rtype: bool """ stat1 = os.stat(file1) stat2 = os.stat(file2) return stat1.st_dev == stat2.st_dev
_COPY_CHUNK_SIZE = 16 * 1024
[docs]def copyfile(src, dst, callback, cancel_evt): r"""Copy a file. The progess of a long running copy process can be monitored and the process can be cancelled. The ``callback`` must be a callable that takes two parameters: - number of the copied bytes - size of the source file :: def cb(i, t): print('\r%d / %d (%.1f%%)' % (i, t, i / t * 100), end='', flush=True) evt = threading.Event() print('Start', end='', flush=True) try: copyfile('/path/to/source/file', '/path/to/destination/file', cb, evt) print() except KeyboardInterrupt: evt.set() print('\rAbbruch \n') :param src: source filepath :type src: :term:`path-like object` :param dst: destination filepath (not a directory) :type dst: :term:`path-like object` :param callback: callback function :param threading.Event cancel_evt: if set the process will be cancelled :raises OSError: if the file could not be copied .. versionadded:: 0.5.0 """ file_size = os.path.getsize(src) copied = 0 with open(src, 'rb') as ifh, open(dst, 'wb') as ofh: while True: cnt = ofh.write(ifh.read(_COPY_CHUNK_SIZE)) if not cnt: break copied += cnt if callback: callback(copied, file_size) if cancel_evt and cancel_evt.is_set(): break