aiocouchdb 0.8.0 documentation

aiocouchdb.authn

Contents

Source code for aiocouchdb.authn

# -*- coding: utf-8 -*-
#
# Copyright (C) 2014-2015 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file LICENSE, which
# you should have received as part of this distribution.
#

import abc
import base64
import hashlib
import hmac
import http.cookies
from collections import namedtuple

from .hdrs import (
    AUTHORIZATION,
    COOKIE,
    X_AUTH_COUCHDB_ROLES,
    X_AUTH_COUCHDB_TOKEN,
    X_AUTH_COUCHDB_USERNAME
)


__all__ = (
    'BasicAuthProvider',
    'BasicAuthCredentials',
    'CookieAuthProvider',
    'OAuthProvider',
    'OAuthCredentials',
    'ProxyAuthProvider',
    'ProxyAuthCredentials'
)


#: BasicAuth credentials
BasicAuthCredentials = namedtuple('BasicAuthCredentials', [
    'username', 'password'])

#: OAuth credentials
OAuthCredentials = namedtuple('OAuthCredentials', [
    'consumer_key', 'consumer_secret', 'resource_key', 'resource_secret'])

#: ProxyAuth credentials
ProxyAuthCredentials = namedtuple('ProxyAuthCredentials', [
    'username', 'roles', 'secret'])


class AuthProvider(object, metaclass=abc.ABCMeta):
    """Abstract authentication provider class."""

    @abc.abstractmethod
    def reset(self):
        """Resets provider instance to default state."""
        raise NotImplementedError  # pragma: no cover

    @abc.abstractmethod
    def credentials(self):
        """Returns authentication credentials if any."""
        raise NotImplementedError  # pragma: no cover

    @abc.abstractmethod
    def set_credentials(self, *args, **kwargs):
        """Sets authentication credentials."""
        raise NotImplementedError  # pragma: no cover

    @abc.abstractmethod
    def sign(self, url, headers):
        """Applies authentication routines on further request. Mostly used
        to set right `Authorization` header or cookies to pass the challenge.

        :param str url: Request URL
        :param dict headers: Request headers
        """
        raise NotImplementedError  # pragma: no cover

    @abc.abstractmethod
    def update(self, response):
        """Updates provider routines from the HTTP response data.

        :param response: :class:`aiocouchdb.client.HttpResponse` instance
        """
        raise NotImplementedError  # pragma: no cover


class NoAuthProvider(AuthProvider):
    """Dummy provider to apply no authentication routines."""

    def reset(self):
        pass  # pragma: no cover

    def credentials(self):
        pass  # pragma: no cover

    def set_credentials(self):
        pass  # pragma: no cover

    def sign(self, url, headers):
        pass  # pragma: no cover

    def update(self, response):
        pass  # pragma: no cover


[docs]class BasicAuthProvider(AuthProvider): """Provides authentication via BasicAuth method.""" _auth_header = None _credentials = None def __init__(self, name=None, password=None): if name or password: self.set_credentials(name, password)
[docs] def reset(self): """Resets provider instance to default state.""" self._auth_header = None self._credentials = None
[docs] def credentials(self): """Returns authentication credentials. :rtype: :class:`aiocouchdb.authn.BasicAuthCredentials` """ return self._credentials
[docs] def set_credentials(self, name, password): """Sets authentication credentials. :param str name: Username :param str password: User's password """ if name and password: self._credentials = BasicAuthCredentials(name, password) elif not name: raise ValueError("Basic Auth username is missing") elif not password: raise ValueError("Basic Auth password is missing")
[docs] def sign(self, url, headers): """Adds BasicAuth header to ``headers``. :param str url: Request URL :param dict headers: Request headers """ if self._auth_header is None: if self._credentials is None: raise ValueError('Basic Auth credentials was not specified') token = base64.b64encode( ('%s:%s' % self._credentials).encode('utf8')) self._auth_header = 'Basic %s' % (token.strip().decode('utf8')) headers[AUTHORIZATION] = self._auth_header
def update(self, response): pass # pragma: no cover
[docs]class CookieAuthProvider(AuthProvider): """Provides authentication by cookies.""" _cookies = None
[docs] def reset(self): """Resets provider instance to default state.""" self._cookies = None
def credentials(self): # Reserved for future use. pass # pragma: no cover def set_credentials(self, name, password): # Reserved for future use. pass # pragma: no cover
[docs] def sign(self, url, headers): """Adds cookies to provided ``headers``. If ``headers`` already contains any cookies, they would be merged with instance ones. :param str url: Request URL :param dict headers: Request headers """ if self._cookies is None: return cookie = http.cookies.SimpleCookie() if COOKIE in headers: cookie.load(headers.get(COOKIE, '')) del headers[COOKIE] for name, value in self._cookies.items(): if isinstance(value, http.cookies.Morsel): # use dict method because SimpleCookie class modifies value dict.__setitem__(cookie, name, value) else: cookie[name] = value headers[COOKIE] = cookie.output(header='', sep=';').strip()
[docs] def update(self, response): """Updates cookies from the response. :param response: :class:`aiocouchdb.client.HttpResponse` instance """ if response.cookies: self._cookies = response.cookies
[docs]class OAuthProvider(AuthProvider): """Provides authentication via OAuth1. Requires ``oauthlib`` package.""" _credentials = None def __init__(self, *, consumer_key=None, consumer_secret=None, resource_key=None, resource_secret=None): from oauthlib import oauth1 self._oauth1 = oauth1 self.set_credentials(consumer_key=consumer_key, consumer_secret=consumer_secret, resource_key=resource_key, resource_secret=resource_secret)
[docs] def reset(self): """Resets provider instance to default state.""" self._credentials = None
[docs] def credentials(self): """Returns OAuth credentials. :rtype: :class:`aiocouchdb.authn.OAuthCredentials` """ return self._credentials
[docs] def set_credentials(self, *, consumer_key=None, consumer_secret=None, resource_key=None, resource_secret=None): """Sets OAuth credentials. Currently, all keyword arguments are required for successful auth. :param str consumer_key: Consumer key (consumer token) :param str consumer_secret: Consumer secret :param str resource_key: Resource key (oauth token) :param str resource_secret: Resource secret (oauth token secret) """ creds = (consumer_key, consumer_secret, resource_key, resource_secret) if not all(creds): return self._credentials = OAuthCredentials(*creds)
[docs] def sign(self, url, headers): """Adds OAuth1 signature to ``headers``. :param str url: Request URL :param dict headers: Request headers """ if self._credentials is None: raise ValueError('OAuth credentials was not specified') client = self._oauth1.Client( client_key=self._credentials.consumer_key, client_secret=self._credentials.consumer_secret, resource_owner_key=self._credentials.resource_key, resource_owner_secret=self._credentials.resource_secret, signature_type=self._oauth1.SIGNATURE_TYPE_AUTH_HEADER) _, oauth_headers, _ = client.sign(url) headers[AUTHORIZATION] = oauth_headers['Authorization']
def update(self, response): pass # pragma: no cover
[docs]class ProxyAuthProvider(AuthProvider): """Provides CouchDB proxy authentication methods.""" _credentials = None #: Controls the name of header used to specify CouchDB username x_auth_username = X_AUTH_COUCHDB_USERNAME #: Controls the name of header used to specify list of CouchDB user roles x_auth_roles = X_AUTH_COUCHDB_ROLES #: Controls the name of header used to provide authentication token x_auth_token = X_AUTH_COUCHDB_TOKEN def __init__(self, username=None, roles=None, secret=None, *, x_auth_username=None, x_auth_roles=None, x_auth_token=None): if x_auth_username is not None: self.x_auth_username = x_auth_username if x_auth_roles is not None: self.x_auth_roles = x_auth_roles if x_auth_token is not None: self.x_auth_token = x_auth_token if username or roles or secret: self.set_credentials(username, roles, secret)
[docs] def reset(self): """Resets provider instance to default state.""" self._credentials = None
[docs] def credentials(self): """Returns three-element tuple of defined username, roles and secret.""" return self._credentials
[docs] def set_credentials(self, username, roles=None, secret=None): """Sets ProxyAuth credentials. :param str username: CouchDB username :param list roles: List of username roles :param str secret: ProxyAuth secret. Should match the one which defined on target CouchDB server. """ if not username: raise ValueError('Proxy Auth username should have non-empty value') self._credentials = ProxyAuthCredentials(username, roles, secret)
[docs] def sign(self, url, headers): """Adds ProxyAuth credentials to ``headers``. :param str url: Request URL :param dict headers: Request headers """ creds = self._credentials if creds is None or not creds.username: raise ValueError('Proxy Auth username is missing') else: headers[self.x_auth_username] = creds.username if creds.roles is not None: headers[self.x_auth_roles] = ','.join(creds.roles) if creds.secret is not None: headers[self.x_auth_token] = hmac.new( creds.secret.encode('utf-8'), creds.username.encode('utf-8'), hashlib.sha1).hexdigest()
def update(self, response): pass # pragma: no cover

Contents