Source code for livebridge.base.sources

# -*- coding: utf-8 -*-
#
# Copyright 2016 dpa-infocom GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from livebridge.components import get_db_client


logger = logging.getLogger(__name__)


class BaseSource(object):
    """Base class for sources."""
    __module__ = "livebridge.base"

    type = ""
    mode = ""

[docs] def __init__(self, *, config={}, **kwargs): """Base constructor for sources. :param config: Configuration passed from the control file. """ pass
@property def _db(self): """Database client for accessing storage. :returns: :class:`livebridge.storages.base.BaseStorage` """ if not hasattr(self, "_db_client") or getattr(self, "_db_client") is None: self._db_client = get_db_client() return self._db_client
[docs] async def filter_new_posts(self, source_id, post_ids): """Filters ist of post_id for new ones. :param source_id: id of the source :type string: :param post_ids: list of post ids :type list: :returns: list of unknown post ids.""" new_ids = [] try: db_client = self._db posts_in_db = await db_client.get_known_posts(source_id, post_ids) new_ids = [p for p in post_ids if p not in posts_in_db] except Exception as exc: logger.error("Error when filtering for new posts {} {}".format(source_id, post_ids)) logger.exception(exc) return new_ids
[docs] async def get_last_updated(self, source_id): """Returns latest update-timestamp from storage for source. :param source_id: id of the source (source_id, ticker_id, blog_id pp) :type string: :returns: :py:class:`datetime.datetime` object of latest update datetime in db.""" last_updated = await self._db.get_last_updated(source_id) logger.info("LAST UPDATED: {} {}".format(last_updated, self)) return last_updated
[docs]class PollingSource(BaseSource): """Base class for sources which are getting polled. Any custom adapter source, which \ should get polled, has to be inherited from this base class.""" mode = "polling"
[docs] async def poll(self): """Method has to be implemented by the concrete inherited source class. :func:`poll` gets called by the interval defined by environment var *POLLING_INTERVALL*. The inheriting class has to implement the actual poll request for the source in this method. :return: list of new posts""" raise NotImplementedError("Method 'poll' not implemented.")
[docs] async def stop(self): """Method can be implemented by the concrete inherited source class. By implementing this method, the source class is able to handle the shutdown event explicitly.""" pass
[docs]class StreamingSource(BaseSource): """Base class for streaming sources. Any custom adapter source, which is using a websocket, SSE or\ any other stream as source has to be inherited from this base class.""" mode = "streaming"
[docs] async def listen(self, callback): """Method has to be implemented by the concrete inherited source class. A websocket connection has to be opened and given *callback* method has to be called with the new post as argument. :param callback: Callback method which has to be called with list of new posts. :return: True""" raise NotImplementedError("Method 'listen' not implemented.")
[docs] async def stop(self): """Method has to be implemented by the concrete inherited source class. By calling this method, the websocket-connection has to be stopped. :return: True""" raise NotImplementedError("Method 'stop' not implemented.")