Source code for aiocouchdb.feeds
# -*- coding: utf-8 -*-
#
# Copyright (C) 2014-2015 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file LICENSE, which
# you should have received as part of this distribution.
#
import asyncio
import json
from aiohttp.helpers import parse_mimetype
from .hdrs import CONTENT_TYPE
__all__ = (
'Feed',
'JsonFeed',
'ViewFeed',
'ChangesFeed',
'LongPollChangesFeed',
'ContinuousChangesFeed',
'EventSourceFeed',
'EventSourceChangesFeed'
)
[docs]class Feed(object):
"""Wrapper over :class:`HttpResponse` content to stream continuous response
by emitted chunks."""
#: Limits amount of items feed would fetch and keep for further iteration.
buffer_size = 0
def __init__(self, resp, *, loop=None, buffer_size=buffer_size):
self._active = True
self._exc = None
self._queue = asyncio.Queue(maxsize=buffer_size or self.buffer_size,
loop=loop)
self._resp = resp
ctype = resp.headers.get(CONTENT_TYPE, '').lower()
*_, params = parse_mimetype(ctype)
self._encoding = params.get('charset', 'utf-8') # pylint: disable=E1101
asyncio.Task(self._loop(), loop=loop)
@asyncio.coroutine
def _loop(self):
try:
while not self._resp.content.at_eof() and self._active:
chunk = yield from self._resp.content.read()
if not chunk or chunk == b'\n': # ignore heartbeats
continue
yield from self._queue.put(chunk)
except Exception as exc:
self._exc = exc
self.close(True)
else:
self.close()
@asyncio.coroutine
[docs] def next(self):
"""Emits the next response chunk or ``None`` is feed is empty.
:rtype: bytearray
"""
if not self.is_active():
if self._exc is not None:
raise self._exc from None # pylint: disable=raising-bad-type
return None
chunk = yield from self._queue.get()
if chunk is None:
# in case of race condition, raising an error should have more
# priority then returning stop signal
if self._exc is not None:
raise self._exc from None # pylint: disable=raising-bad-type
return chunk
[docs] def is_active(self):
"""Checks if the feed is still able to emit any data.
:rtype: bool
"""
return self._active or not self._queue.empty()
[docs] def close(self, force=False):
"""Closes feed and the related request connection. Closing feed doesnt
means that all
:param bool force: In case of True, close connection instead of release.
See :meth:`aiohttp.client.ClientResponse.close` for
the details
"""
self._active = False
self._resp.close(force=force)
# put stop signal into queue to break waiting loop on queue.get()
self._queue.put_nowait(None)
[docs]class JsonFeed(Feed):
"""As :class:`Feed`, but for chunked JSON response. Assumes that each
received chunk is valid JSON object and decodes them before emit."""
@asyncio.coroutine
[docs] def next(self):
"""Decodes feed chunk with JSON before emit it.
:rtype: dict
"""
chunk = yield from super().next()
if chunk is not None:
return json.loads(chunk.decode(self._encoding))
[docs]class ViewFeed(Feed):
"""Like :class:`JsonFeed`, but uses CouchDB view response specifics."""
_total_rows = None
_offset = None
_update_seq = None
@asyncio.coroutine
[docs] def next(self):
"""Emits view result row.
:rtype: dict
"""
chunk = yield from super().next()
if chunk is None:
return chunk
chunk = chunk.decode(self._encoding).strip('\r\n,')
if chunk.startswith('{"total_rows"'):
chunk += ']}'
event = json.loads(chunk)
self._total_rows = event['total_rows']
self._offset = event.get('offset')
return (yield from self.next())
elif chunk.startswith(('{"rows"', ']}')):
return (yield from self.next())
else:
return json.loads(chunk)
@property
[docs] def offset(self):
"""Returns view results offset."""
return self._offset
@property
[docs] def total_rows(self):
"""Returns total rows in view."""
return self._total_rows
@property
[docs] def update_seq(self):
"""Returns update sequence for a view."""
return self._update_seq
[docs]class EventSourceFeed(Feed):
"""Handles `EventSource`_ response following the W3.org spec with single
exception: it expects field `data` to contain valid JSON value.
.. _EventSource: http://www.w3.org/TR/eventsource/
"""
@asyncio.coroutine
[docs] def next(self):
"""Emits decoded EventSource event.
:rtype: dict
"""
chunk = (yield from super().next())
if chunk is None:
return chunk
chunk = chunk.decode(self._encoding)
event = {}
data = event['data'] = []
for line in chunk.splitlines():
if not line:
break
if line.startswith(':'):
# If the line starts with a U+003A COLON character (:)
# Ignore the line.
continue
if ':' not in line:
# Otherwise, the string is not empty but does not contain
# a U+003A COLON character (:)
# Process the field using the steps described below, using
# the whole line as the field name, and the empty string as
# the field value.
field, value = line, ''
else:
# If the line contains a U+003A COLON character (:)
# Collect the characters on the line before the first
# U+003A COLON character (:), and let field be that string.
#
# Collect the characters on the line after the first U+003A
# COLON character (:), and let value be that string.
# If value starts with a U+0020 SPACE character, remove it
# from value.
#
# Process the field using the steps described below, using
# field as the field name and value as the field value.
field, value = line.split(':', 1)
if value.startswith(' '):
value = value[1:]
if field in ('id', 'event'):
event[field] = value
elif field == 'data':
# If the field name is "data":
# Append the field value to the data buffer,
# then append a single U+000A LINE FEED (LF) character
# to the data buffer.
data.append(value)
data.append('\n')
elif field == 'retry':
# If the field name is "retry":
# If the field value consists of only ASCII digits,
# then interpret the field value as an integer in base ten.
event[field] = int(value)
else:
# Otherwise: The field is ignored.
continue # pragma: no cover
data = ''.join(data).strip()
event['data'] = json.loads(data) if data else None
return event
[docs]class ChangesFeed(Feed):
"""Processes database changes feed."""
_last_seq = None
@asyncio.coroutine
[docs] def next(self):
"""Emits the next event from changes feed.
:rtype: dict
"""
chunk = yield from super().next()
if chunk is None:
return chunk
if chunk.startswith((b'{"results"', b'\n]')):
return (yield from self.next())
event = json.loads(chunk.strip(b',').decode(self._encoding))
self._last_seq = event['seq']
return event
@property
[docs] def last_seq(self):
"""Returns last emitted sequence number.
:rtype: int
"""
return self._last_seq
[docs]class LongPollChangesFeed(ChangesFeed):
"""Processes long polling database changes feed."""
[docs]class ContinuousChangesFeed(ChangesFeed, JsonFeed):
"""Processes continuous database changes feed."""
@asyncio.coroutine
[docs] def next(self):
"""Emits the next event from changes feed.
:rtype: dict
"""
event = yield from JsonFeed.next(self)
if event is None:
return None
if 'last_seq' in event:
self._last_seq = event['last_seq']
return (yield from self.next())
self._last_seq = event['seq']
return event
[docs]class EventSourceChangesFeed(ChangesFeed, EventSourceFeed):
"""Process event source database changes feed.
Similar to :class:`EventSourceFeed`, but includes specifics for changes feed
and emits events in the same format as others :class:`ChangesFeed` does.
"""
@asyncio.coroutine
[docs] def next(self):
"""Emits the next event from changes feed.
:rtype: dict
"""
event = (yield from EventSourceFeed.next(self))
if event is None:
return event
if event.get('event') == 'heartbeat':
return (yield from self.next())
if 'id' in event:
self._last_seq = int(event['id'])
return event['data']