aiocouchdb 0.8.0 documentation

aiocouchdb.v1.server

Contents

Source code for aiocouchdb.v1.server

# -*- coding: utf-8 -*-
#
# Copyright (C) 2014-2015 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file LICENSE, which
# you should have received as part of this distribution.
#

import asyncio

from aiocouchdb.client import Resource
from aiocouchdb.feeds import EventSourceFeed, JsonFeed

from .authdb import AuthDatabase
from .config import ServerConfig
from .database import Database
from .session import Session


__all__ = (
    'Server',
)


[docs]class Server(object): """Implementation of :ref:`CouchDB Server API <api/server>`.""" #: Default :class:`~aiocouchdb.v1.database.Database` instance class database_class = Database #: Authentication database name authdb_name = '_users' #: Authentication database class authdb_class = AuthDatabase #: Default :class:`~aiocouchdb.v1.config.ServerConfig` instance class config_class = ServerConfig #: Default :class:`~aiocouchdb.v1.session.Session` instance class session_class = Session def __init__(self, url_or_resource='http://localhost:5984', *, authdb_class=None, authdb_name=None, config_class=None, database_class=None, session_class=None): if authdb_class is not None: self.authdb_class = authdb_class if authdb_name is not None: self.authdb_name = authdb_name if config_class is not None: self.config_class = config_class if database_class is not None: self.database_class = database_class if session_class is not None: self.session_class = session_class if isinstance(url_or_resource, str): url_or_resource = Resource(url_or_resource) self.resource = url_or_resource self._authdb = self.authdb_class(self.resource(self.authdb_name), dbname=self.authdb_name) self._config = self.config_class(self.resource) self._session = self.session_class(self.resource) def __getitem__(self, dbname): return self.database_class(self.resource(dbname), dbname=dbname) def __repr__(self): return '<{}.{}({}) object at {}>'.format( self.__module__, self.__class__.__qualname__, self.resource.url, hex(id(self))) @property
[docs] def authdb(self): """Proxy to the :class:`authentication database <aiocouchdb.v1.database.AuthDatabase>` instance.""" return self._authdb
@asyncio.coroutine
[docs] def db(self, dbname, *, auth=None): # pylint: disable=invalid-name """Returns :class:`~aiocouchdb.v1.database.Database` instance against specified database name. If database isn't accessible for provided auth credentials, this method raises :exc:`aiocouchdb.errors.HttpErrorException` with the related response status code. :param str dbname: Database name :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: :attr:`aiocouchdb.v1.server.Server.database_class` """ db = self[dbname] resp = yield from db.resource.head(auth=auth) if resp.status != 404: yield from resp.maybe_raise_error() yield from resp.read() return db
@asyncio.coroutine
[docs] def info(self, *, auth=None): """Returns server :ref:`meta information and welcome message <api/server/root>`. :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: dict """ resp = yield from self.resource.get(auth=auth) yield from resp.maybe_raise_error() return (yield from resp.json())
@asyncio.coroutine
[docs] def active_tasks(self, *, auth=None): """Returns list of :ref:`active tasks <api/server/active_tasks>` which runs on server. :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: list """ resp = yield from self.resource.get('_active_tasks', auth=auth) yield from resp.maybe_raise_error() return (yield from resp.json())
@asyncio.coroutine
[docs] def all_dbs(self, *, auth=None): """Returns list of available :ref:`databases <api/server/all_dbs>` on server. :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: list """ resp = yield from self.resource.get('_all_dbs', auth=auth) yield from resp.maybe_raise_error() return (yield from resp.json())
@property
[docs] def config(self): """Proxy to the related :class:`~aiocouchdb.v1.server.config_class` instance.""" return self._config
@asyncio.coroutine
[docs] def db_updates(self, *, auth=None, feed_buffer_size=None, feed=None, timeout=None, heartbeat=None): """Emits :ref:`databases events <api/server/db_updates>` for the related server instance. :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :param int feed_buffer_size: Internal buffer size for fetched feed items :param str feed: Feed type :param int timeout: Timeout in milliseconds :param bool heartbeat: Whenever use heartbeats to keep connection alive Depending on feed type returns: - :class:`dict` - for default or ``longpoll`` feed - :class:`aiocouchdb.feeds.JsonFeed` - for ``continuous`` feed - :class:`aiocouchdb.feeds.EventSourceFeed` - for ``eventsource`` feed """ params = {} if feed is not None: params['feed'] = feed if timeout is not None: params['timeout'] = timeout if heartbeat is not None: params['heartbeat'] = heartbeat resp = yield from self.resource.get('_db_updates', auth=auth, params=params) yield from resp.maybe_raise_error() if feed == 'continuous': return JsonFeed(resp, buffer_size=feed_buffer_size) elif feed == 'eventsource': return EventSourceFeed(resp, buffer_size=feed_buffer_size) else: return (yield from resp.json())
@asyncio.coroutine # pylint: disable=W0622
[docs] def log(self, *, bytes=None, offset=None, auth=None): """Returns a chunk of data from the tail of :ref:`CouchDB's log <api/server/log>` file. :param int bytes: Bytes to return :param int offset: Offset in bytes where the log tail should be started :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: str """ params = {} if bytes is not None: params['bytes'] = bytes if offset is not None: params['offset'] = offset resp = yield from self.resource.get('_log', auth=auth, params=params) yield from resp.maybe_raise_error() return (yield from resp.read()).decode('utf-8')
@asyncio.coroutine # pylint: disable=W0142, W0612, W0622
[docs] def replicate(self, source, target, *, # pylint: disable=R0914 auth=None, authobj=None, cancel=None, continuous=None, create_target=None, doc_ids=None, filter=None, headers=None, proxy=None, query_params=None, since_seq=None, checkpoint_interval=None, connection_timeout=None, http_connections=None, retries_per_request=None, socket_options=None, use_checkpoints=None, worker_batch_size=None, worker_processes=None): """:ref:`Runs a replication <api/server/replicate>` from ``source`` to ``target``. :param str source: Source database name or URL :param str target: Target database name or URL :param auth: :class:`aiocouchdb.authn.AuthProvider` instance (don't confuse with ``authobj`` which belongs to replication options) :param dict authobj: Authentication object for the target database :param bool cancel: Cancels active replication :param bool continuous: Runs continuous replication :param bool create_target: Creates target database if it not exists :param list doc_ids: List of specific document ids to replicate :param str filter: Filter function name :param dict headers: Custom replication request headers :param str proxy: Proxy server URL :param dict query_params: Custom query parameters for filter function :param since_seq: Start replication from specified sequence number :param int checkpoint_interval: Tweaks `checkpoint_interval`_ option :param int connection_timeout: Tweaks `connection_timeout`_ option :param int http_connections: Tweaks `http_connections`_ option :param int retries_per_request: Tweaks `retries_per_request`_ option :param str socket_options: Tweaks `socket_options`_ option :param bool use_checkpoints: Tweaks `use_checkpoints`_ option :param int worker_batch_size: Tweaks `worker_batch_size`_ option :param int worker_processes: Tweaks `worker_processes`_ option :rtype: dict .. _checkpoint_interval: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/checkpoint_interval .. _connection_timeout: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/connection_timeout .. _http_connections: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/http_connections .. _retries_per_request: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/retries_per_request .. _socket_options: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/socket_options .. _use_checkpoints: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/use_checkpoints .. _worker_batch_size: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/worker_batch_size .. _worker_processes: http://docs.couchdb.org/en/latest/config/replicator.html#replicator/worker_processes """ params = dict((key, value) for key, value in locals().items() if (key not in {'self', 'source', 'target', 'auth'} and value is not None)) if authobj is not None: params['auth'] = params.pop('authobj') doc = {'source': source, 'target': target} doc.update(params) resp = yield from self.resource.post('_replicate', auth=auth, data=doc) yield from resp.maybe_raise_error() return (yield from resp.json())
@asyncio.coroutine
[docs] def restart(self, *, auth=None): """:ref:`Restarts <api/server/restart>` server instance. :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: dict """ resp = yield from self.resource.post('_restart', auth=auth) yield from resp.maybe_raise_error() return (yield from resp.json())
@property
[docs] def session(self): """Proxy to the related :class:`~aiocouchdb.v1.server.Server.session_class` instance.""" return self._session
@asyncio.coroutine # pylint: disable=W0622
[docs] def stats(self, metric=None, *, auth=None, flush=None, range=None): """Returns :ref:`server statistics <api/server/stats>`. :param str metric: Metrics name in format ``group/name``. For instance, ``httpd/requests``. If omitted, all metrics will be returned :param bool flush: If ``True``, collects samples right for this request :param int range: `Sampling range`_ :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: dict .. _Sampling range: http://docs.couchdb.org/en/latest/config/misc.html#stats/samples """ path = ['_stats'] params = {} if metric is not None: if '/' in metric: path.extend(metric.split('/', 1)) else: raise ValueError('invalid metric name. try "httpd/requests"') if flush is not None: params['flush'] = flush if range is not None: params['range'] = range resource = self.resource(*path) resp = yield from resource.get(auth=auth, params=params) yield from resp.maybe_raise_error() return (yield from resp.json())
@asyncio.coroutine
[docs] def uuids(self, *, auth=None, count=None): """Returns :ref:`UUIDs <api/server/uuids>` generated on server. :param int count: Amount of UUIDs to generate :param auth: :class:`aiocouchdb.authn.AuthProvider` instance :rtype: list """ params = {} if count is not None: params['count'] = count resp = yield from self.resource.get('_uuids', auth=auth, params=params) yield from resp.maybe_raise_error() return (yield from resp.json())['uuids']

Contents