Source code for dcar.marshal

"""Marshal/unmarshal D-Bus Messages.

See:

* :ref:`ref-types-table`
* `Summary of types
  <https://dbus.freedesktop.org/doc/dbus-specification.html#idm495>`_
* `Marshalling (Wire Format)
  <https://dbus.freedesktop.org/doc/dbus-specification.html
  #message-protocol-marshaling>`_

.. note::
   Instances of the type classes should be used through the
   :data:`types` mapping which uses D-Bus type codes as keys.
"""

import array
import struct

from .const import MAX_ARRAY_LEN
from .errors import TooLongError, MessageError
from .signature import Signature
from .validate import validate_object_path, validate_signature

__all__ = ['types', 'marshal', 'unmarshal']


[docs]class Type: """Base class. :param str name: type name """ def __init__(self, name): self.name = name
[docs] def marshal(self, raw, data, signature=None): """Marshal an object of this type. The data must be of an appropriate type according to the column *Python IN* in the :ref:`types summary <ref-types-table>`. The ``signature`` parameter is only used in the classes :class:`Array`, :class:`Struct`, and :class:`DictEntry`. :param RawData raw: raw message data :param data: data to be marshalled :param signature: see :class:`~dcar.signature.Signature` :raises ~dcar.MessageError: if the data could not be marshalled """ raise NotImplementedError
[docs] def unmarshal(self, raw, signature=None): """Unmarshal an object of this type. The returned data will be of a type according to the column *Python OUT* in the :ref:`types summary <ref-types-table>`. The ``signature`` parameter is only used in the classes :class:`Array`, :class:`Struct`, and :class:`DictEntry`. :param RawData raw: raw message data :param signature: see :class:`~dcar.signature.Signature` :return: unmarshalled data :raises ~dcar.MessageError: if the data could not be unmarshalled """ raise NotImplementedError
def __repr__(self): signature = [k for k in types if types[k] is self][0] return '<%s:%s:%r>' % (self.__class__.__name__, self.name, signature)
[docs]class Fixed(Type): """Class for fixed types. :param str name: type name :param str struct_code: code from :mod:`struct` module """ def __init__(self, name, struct_code): super().__init__(name) self.code = struct_code self.size = struct.calcsize(struct_code) self.alignment = self.size
[docs] def marshal(self, raw, data, signature=None): raw.write_padding(self.alignment) try: raw.write(struct.pack(raw.byteorder.code + self.code, data)) except struct.error as ex: raise MessageError('marshal %s %r: %s' % (self.name, data, ex)) from ex
[docs] def unmarshal(self, raw, signature=None): raw.skip_padding(self.alignment) try: value = raw.read(self.size) return struct.unpack(raw.byteorder.code + self.code, value)[0] except struct.error as ex: raise MessageError('unmarshal %s %r: %s' % (self.name, value, ex)) from ex
[docs]class Boolean(Fixed): """Class for booleans.""" def __init__(self): super().__init__('BOOLEAN', 'I')
[docs] def marshal(self, raw, data, signature=None): if not isinstance(data, bool): raise MessageError('marshal %s: %r is not a valid boolean' % (self.name, data)) super().marshal(raw, data, signature)
[docs] def unmarshal(self, raw, signature=None): value = super().unmarshal(raw, signature) if value not in (0, 1): raise MessageError('unmarshal %s: %r is not a valid boolean' % (self.name, value)) return bool(value)
[docs]class UnixFd(Fixed): """Class for unix file descriptors.""" def __init__(self): super().__init__('UNIX_FD', 'I')
[docs] def marshal(self, raw, data, signature=None): if isinstance(data, int): idx = raw.add_unix_fd(data) elif hasattr(data, 'fileno'): idx = raw.add_unix_fd(data.fileno()) else: raise MessageError( 'marshal %s: %r is not a valid file (descriptor)' % (self.name, data)) super().marshal(raw, idx, signature)
[docs] def unmarshal(self, raw, signature=None): idx = super().unmarshal(raw, signature) if not raw.unix_fds or idx >= len(raw.unix_fds): raise MessageError( 'unmarshal %s: %d is not a valid index for fds' % (self.name, idx)) return raw.unix_fds[idx]
# mapping: dbus type code -> Fixed instance types = { 'y': Fixed('BYTE', 'B'), 'b': Boolean(), 'n': Fixed('INT16', 'h'), 'q': Fixed('UINT16', 'H'), 'i': Fixed('INT32', 'i'), 'u': Fixed('UINT32', 'I'), 'x': Fixed('INT64', 'q'), 't': Fixed('UINT64', 'Q'), 'd': Fixed('DOUBLE', 'd'), 'h': UnixFd(), }
[docs]class StringLike(Type): """Class for string like types. :param str name: type name :param str len_type: D-Bus type code for the length field :param validate_func: a ``validate_*`` function from the :mod:`validate` module or ``None`` """ def __init__(self, name, len_type, validate_func): super().__init__(name) self.len_type = types[len_type] self.alignment = self.len_type.alignment self.validate_func = validate_func
[docs] def marshal(self, raw, data, signature=None): if self.validate_func: self.validate_func(data) raw.write_padding(self.alignment) pos = raw.tell() raw.write_nul_bytes(self.len_type.size) # placeholder for length try: # encoding=UTF-8, errors=strict d = data.encode() except UnicodeError as ex: raise MessageError('marshal %s %r: %s' % (self.name, data, ex)) from ex raw.write(d) raw.write_nul_bytes(1) raw.set_value(pos, self.len_type, len(d)) # set length
[docs] def unmarshal(self, raw, signature=None): length = self.len_type.unmarshal(raw) try: # encoding=UTF-8, errors=strict value = raw.read(length) s = value.decode() except UnicodeError as ex: raise MessageError('unmarshal %s %r: %s' % (self.name, value, ex)) from ex if self.validate_func: self.validate_func(s) b = raw.read(1) if not b or b[0]: raise MessageError('no NUL byte after string: %s' % b) return s
# mapping: dbus type code -> StringLike instance types.update({ 's': StringLike('STRING', 'u', None), 'o': StringLike('OBJECT_PATH', 'u', validate_object_path), 'g': StringLike('SIGNATURE', 'y', validate_signature), })
[docs]class Container(Type): """Base class for container types. :param str name: type name :param int alignment: the type's alignment """ def __init__(self, name, alignment): super().__init__(name) self.alignment = alignment def __repr__(self): signature = [k for k in types if types[k] is self][0] return '<Container:%s:%r>' % (self.name, signature)
[docs]class Variant(Container): """Class for a D-Bus Variant.""" def __init__(self): super().__init__('VARIANT', 1)
[docs] def marshal(self, raw, data, signature=None): with raw.nesting_depth(): if len(data) != 2: raise MessageError('variant Python tuple must have 2 elements') sig = Signature(data[0]) if len(sig) != 1: raise MessageError( 'variant signature must be a single complete type') types['g'].marshal(raw, str(sig)) marshal(raw, (data[1],), sig)
[docs] def unmarshal(self, raw, signature=None): with raw.nesting_depth(): sig = Signature(types['g'].unmarshal(raw)) if len(sig) != 1: raise MessageError( 'variant signature must be a single complete type') return unmarshal(raw, sig)[0]
[docs]class Array(Container): """Class for a D-Bus Array.""" def __init__(self): super().__init__('ARRAY', types['u'].alignment) self.len_type = types['u']
[docs] def marshal(self, raw, data, signature): with raw.nesting_depth(): type_code = signature[0][0] if type_code == 'e' and isinstance(data, dict): data = list(data.items()) elif not isinstance(data, (list, array.array)): raise MessageError('wrong Python type for array: %r' % type(data)) raw.write_padding(self.alignment) pos = raw.tell() raw.write_nul_bytes(self.len_type.size) # placeholder for length el_type = types[type_code] raw.write_padding(el_type.alignment) start_pos = raw.tell() for v in data: el_type.marshal(raw, v, signature[0][1]) length = raw.tell() - start_pos if length > MAX_ARRAY_LEN: raise TooLongError('array too long: %d bytes' % length) raw.set_value(pos, self.len_type, length) # set length
[docs] def unmarshal(self, raw, signature): with raw.nesting_depth(): type_code = signature[0][0] raw.skip_padding(self.alignment) length = self.len_type.unmarshal(raw) if length > MAX_ARRAY_LEN: raise TooLongError('array too long: %d bytes' % length) el_type = types[type_code] raw.skip_padding(el_type.alignment) end_pos = raw.tell() + length lst = [] while raw.tell() < end_pos: lst.append(el_type.unmarshal(raw, signature[0][1])) if type_code == 'e': return dict(lst) return lst
[docs]class Struct(Container): """Class for a D-Bus Struct.""" def __init__(self, name='STRUCT'): super().__init__(name, 8)
[docs] def marshal(self, raw, data, signature): with raw.nesting_depth(): if not isinstance(data, tuple): raise MessageError('tuple expected not %r' % type(data)) raw.write_padding(self.alignment) marshal(raw, data, signature)
[docs] def unmarshal(self, raw, signature): with raw.nesting_depth(): raw.skip_padding(self.alignment) return unmarshal(raw, signature)
[docs]class DictEntry(Struct): """Class for a D-Bus DictEntry.""" def __init__(self): super().__init__('DICT_ENTRY')
# mapping: dbus type code -> Container instance types.update({'v': Variant(), 'a': Array()}) # dbus type codes for signatures type_codes = set(types) # mapping: dbus type code -> Container instance # (r and e not used in signatures) types.update({'r': Struct(), 'e': DictEntry()})
[docs]def marshal(raw, data, signature): """Marshal objects. The elements of the data tuple must be of appropriate types according to the column *Python IN* in the :ref:`types summary <ref-types-table>`. :param RawData raw: raw message data :param tuple data: tuple of data to be marshalled :param signature: see :class:`~dcar.signature.Signature` :raises ~dcar.MessageError: if the data could not be marshalled """ signature = _signature(signature) if len(signature) != len(data): raise MessageError('length: signature %d, args %d' % (len(signature), len(data))) for (sig, d) in zip(signature, data): t, s = sig types[t].marshal(raw, d, s)
[docs]def unmarshal(raw, signature): """Unmarshal objects. The elements of the returned tuple will be of types according to the column *Python OUT* in the :ref:`types summary <ref-types-table>`. :param RawData raw: raw message data :param signature: see :class:`~dcar.signature.Signature` :return: tuple of unmarshalled data :rtype: tuple :raises ~dcar.MessageError: if the data could not be unmarshalled """ signature = _signature(signature) data = [] for t, s in signature: data.append(types[t].unmarshal(raw, s)) return tuple(data)
def _signature(signature): if isinstance(signature, str): return Signature(signature) if signature is None: return Signature('') return signature