Source code for dcar.raw

"""Raw message data."""

import io
from contextlib import contextmanager

from .const import MAX_MESSAGE_LEN, MAX_MSG_UNIX_FDS, MAX_VARIANT_NESTING_DEPTH
from .errors import MessageError, TooLongError

__all__ = ['RawData']


[docs]class RawData(io.BytesIO): """Raw messge data.""" def __init__(self, initial_bytes=b''): super().__init__(initial_bytes) self.byteorder = None self._unix_fds = [] self._nesting_depth = 0 @property def unix_fds(self): """Return list with unix file descriptors.""" return self._unix_fds @unix_fds.setter def unix_fds(self, fds): """Set list with unix file descriptors.""" if len(fds) > MAX_MSG_UNIX_FDS: raise TooLongError('too many unix fds: %d' % len(fds)) self._unix_fds = fds
[docs] def write(self, b): """Write bytes.""" n = super().write(b) if self.tell() > MAX_MESSAGE_LEN: raise TooLongError('message too long: %d bytes' % self.tell()) return n
[docs] def write_nul_bytes(self, n): """Write n NUL bytes.""" self.write(b'\x00' * n)
[docs] def write_padding(self, alignment): """Write padding bytes.""" self.write_nul_bytes(self._padding_len(alignment))
[docs] def skip_padding(self, alignment): """Skip padding bytes.""" b = self.read(self._padding_len(alignment)) if any(list(b)): raise MessageError('none-NUL byte in padding: %s' % b)
[docs] def set_value(self, pos, fixed_type, value): """Set value at position pos.""" self.seek(pos) fixed_type.marshal(self, value) self.seek(0, io.SEEK_END)
[docs] def add_unix_fd(self, fd): """Add unix file descriptor.""" if fd in self._unix_fds: return self._unix_fds.index(fd) else: self._unix_fds.append(fd) fd_cnt = len(self._unix_fds) if fd_cnt > MAX_MSG_UNIX_FDS: raise TooLongError('too many unix fds: %d' % fd_cnt) return fd_cnt - 1
def _padding_len(self, alignment): x = self.tell() % alignment if x: return alignment - x return 0 def __repr__(self): return '<%s: byteorder=%s>' % (self.__class__.__name__, self.byteorder.name if self.byteorder is not None else None)
[docs] @contextmanager def nesting_depth(self): """Context manager for checking the nesting depth of variants.""" self._nesting_depth += 1 if self._nesting_depth > MAX_VARIANT_NESTING_DEPTH: raise MessageError('nesting depth > %d' % MAX_VARIANT_NESTING_DEPTH) try: yield finally: self._nesting_depth -= 1