Source code for dcar.auth

"""Authentication to a message bus.

Supported mechanisms:

* EXTERNAL
* DBUS_COOKIE_SHA1
* ANONYMOUS

See: `Authentication Protocol
<https://dbus.freedesktop.org/doc/dbus-specification.html#auth-protocol>`_
"""

import binascii
import hashlib
import os
import secrets
import stat

from .errors import AuthenticationError

_all_ = ['authenticate']

COOKIE_DIR = os.path.expanduser(b'~/.dbus-keyrings')


def _external(sock):
    sock.sendall(b'AUTH EXTERNAL %b\r\n' % _auth_id())
    return _recv_line(sock)


def _dbus_cookie_sha1(sock):
    mode = os.stat(COOKIE_DIR).st_mode
    if mode & stat.S_IRWXG or mode & stat.S_IRWXO:
        return  # if group/others have permissions don't use it
    sock.sendall(b'AUTH DBUS_COOKIE_SHA1 %b\r\n' % _auth_id())
    reply = _recv_line(sock)
    if reply[0] == b'DATA':
        cookie_ctx, cookie_id, chall_str =\
            bytes.fromhex(reply[1].decode('ascii')).split()
        cookie_file = os.path.join(COOKIE_DIR, cookie_ctx)
        with open(cookie_file, 'br') as fh:
            for id_, _, c in (line.split() for line in fh):
                if id_ == cookie_id:
                    cookie = c
                    break
            else:
                cookie = None
        if cookie:
            client_chall = secrets.token_hex(16).encode('ascii')
            s = b':'.join([chall_str, client_chall, cookie])
            s = hashlib.sha1(s).hexdigest()
            s = b' '.join([client_chall, s.encode('ascii')])
            sock.sendall(b'DATA %b\r\n' % s.hex().encode('ascii'))
            return _recv_line(sock)


def _anonymous(sock):
    sock.sendall(b'AUTH ANONYMOUS\r\n')
    return _recv_line(sock)


auth_mechs = {
    b'EXTERNAL': _external,
    b'DBUS_COOKIE_SHA1': _dbus_cookie_sha1,
    b'ANONYMOUS': _anonymous,
}


[docs]def authenticate(sock, unix_fds): """Authenticate to a message bus. The passing of unix file descriptors will only be negotiated if ``unix_fds`` is ``True``. :param socket sock: a connected socket :param bool unix_fds: if the current :class:`~dcar.transports.Transport` supports the passing of unix file descriptors this must be ``True`` :return: the GUID of the server and a :class:`bool` that indicates whether unix file descriptor passing is possible (``True``) or not (``False``) :rtype: str, bool :raises ~dcar.AuthenticationError: if authentication failed """ sock.sendall(b'\0AUTH\r\n') auth_reply = _recv_line(sock) if auth_reply[0] == b'REJECTED': for auth_mech, func in auth_mechs.items(): if auth_mech not in auth_reply[1:]: continue reply = func(sock) if reply is None: sock.sendall(b'CANCEL\r\n') _recv_line(sock) # read REJECTED reply if reply and reply[0] == b'OK': guid = reply[1] if unix_fds: sock.sendall(b'NEGOTIATE_UNIX_FD\r\n') reply = _recv_line(sock) unix_fds = reply[0] == b'AGREE_UNIX_FD' sock.sendall(b'BEGIN\r\n') return guid.decode(), unix_fds else: raise AuthenticationError('no supported auth mech in %r' % b' '.join(reply[1:])) raise AuthenticationError('unexpected reply: %r' % b' '.join(auth_reply))
def _recv_line(sock): line = b'' while True: line += sock.recv(1024) if line.endswith(b'\r\n'): break return line.split() def _auth_id(): return binascii.hexlify(str(os.getuid()).encode())