Riko
A Python stream processing engine modeled after Yahoo! Pipes
Install / Use
/learn @nerevu/RikoREADME
riko: A stream processing engine modeled after Yahoo! Pipes
|travis| |versions| |pypi|
Index
Introduction_ | Requirements_ | Word Count_ | Motivation_ | Usage_ |
Installation_ | Design Principles_ | Scripts_ | Command-line Interface_ |
Contributing_ | Credits_ | More Info_ | Project Structure_ | License_
Introduction
riko is a pure Python library_ for analyzing and processing streams of
structured data. riko has synchronous_ and asynchronous_ APIs, supports parallel execution, and is well suited for processing RSS feeds [#]. riko also supplies
a command-line interface_ for executing flows, i.e., stream processors aka workflows.
With riko, you can
- Read csv/xml/json/html files
- Create text and data based
flowsvia modularpipes_ - Parse, extract, and process RSS/Atom feeds
- Create awesome mashups [#]_, APIs, and maps
- Perform
parallel processing_ via cpus/processors or threads - and much more...
Notes ^^^^^
.. [#] Really Simple Syndication_
.. [#] Mashup (web application hybrid)_
Requirements
riko has been tested and is known to work on Python 3.7, 3.8, and 3.9; and PyPy3.7.
Optional Dependencies ^^^^^^^^^^^^^^^^^^^^^
======================== =================== ===========================
Feature Dependency Installation
======================== =================== ===========================
Async API Twisted_ pip install riko[async]
Accelerated xml parsing lxml_ [#]_ pip install riko[xml]
Accelerated feed parsing speedparser_ [#]_ pip install riko[xml]
======================== =================== ===========================
Notes ^^^^^
.. [#] If lxml isn't present, riko will default to the builtin Python xml parser
.. [#] If speedparser isn't present, riko will default to feedparser
Word Count
In this example, we use several pipes_ to count the words on a webpage.
.. code-block:: python
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> from riko.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> #
>>> # Notes:
>>> # 1. the `detag` option will strip all html tags from the result
>>> # 2. fetch the text contained inside the 'body' tag of the hackernews
>>> # homepage
>>> # 3. replace newlines with spaces and assign the result to 'content'
>>> # 4. tokenize the resulting text using whitespace as the delimeter
>>> # 5. count the number of times each token appears
>>> # 6. obtain the raw stream
>>> # 7. extract the first word and its count
>>> # 8. extract the second word and its count
>>> # 9. extract the third word and its count
>>> url = 'https://news.ycombinator.com/'
>>> fetch_conf = {
... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1
>>>
>>> replace_conf = {
... 'rule': [
... {'find': '\r\n', 'replace': ' '},
... {'find': '\n', 'replace': ' '}]}
>>>
>>> flow = (
... SyncPipe('fetchpage', conf=fetch_conf) # 2
... .strreplace(conf=replace_conf, assign='content') # 3
... .tokenizer(conf={'delimiter': ' '}, emit=True) # 4
... .count(conf={'count_key': 'content'})) # 5
>>>
>>> stream = flow.output # 6
>>> next(stream) # 7
{"'sad": 1}
>>> next(stream) # 8
{'(': 28}
>>> next(stream) # 9
{'(1999)': 1}
Motivation
Why I built riko ^^^^^^^^^^^^^^^^
Yahoo! Pipes [#]_ was a user friendly web application used to
aggregate, manipulate, and mashup content from around the web
Wanting to create custom pipes, I came across pipe2py_ which translated a
Yahoo! Pipe into python code. pipe2py suited my needs at the time
but was unmaintained and lacked asynchronous or parallel processing.
riko addresses the shortcomings of pipe2py but removed support for
importing Yahoo! Pipes json workflows. riko contains ~ 40 built-in_
modules, aka pipes, that allow you to programatically perform most of the
tasks Yahoo! Pipes allowed.
Why you should use riko ^^^^^^^^^^^^^^^^^^^^^^^
riko provides a number of benefits / differences from other stream processing
applications such as Huginn, Flink, Spark, and Storm [#]_. Namely:
- a small footprint (CPU and memory usage)
- native RSS/Atom support
- simple installation and usage
- a pure python library with
pypy_ support - builtin modular
pipesto filter, sort, and modifystreams
The subsequent tradeoffs riko makes are:
- not distributed (able to run on a cluster of servers)
- no GUI for creating
flows - doesn't continually monitor
streamsfor new data - can't react to specific events
- iterator (pull) based so streams only support a single consumer [#]_
The following table summarizes these observations:
======= =========== ========= ===== =========== ===== ======== ======== =========== library Stream Type Footprint RSS simple [#]_ async parallel CEP [#]_ distributed ======= =========== ========= ===== =========== ===== ======== ======== =========== riko pull small √ √ √ √ pipe2py pull small √ √ Huginn push med √ [#]_ √ √ Others push large [#]_ [#]_ [#]_ √ √ √ ======= =========== ========= ===== =========== ===== ======== ======== ===========
For more detailed information, please check-out the FAQ_.
Notes ^^^^^
.. [#] Yahoo discontinued Yahoo! Pipes in 2015, but you can view what remains_
.. [#] Huginn, Flink, Spark, and Storm
.. [#] You can mitigate this via the split_ module
.. [#] Doesn't depend on outside services like MySQL, Kafka, YARN, ZooKeeper, or Mesos
.. [#] Complex Event Processing_
.. [#] Huginn doesn't appear to make async web requests_
.. [#] Many libraries can't parse RSS streams without the use of 3rd party libraries
.. [#] While most libraries offer a local mode, many require integrating with a data ingestor (e.g., Flume/Kafka) to do anything useful
.. [#] I can't find evidence that these libraries offer an async APIs (and apparently Spark doesn't_)
Usage
riko is intended to be used directly as a Python library.
Usage Index ^^^^^^^^^^^
Fetching feeds_Synchronous processing_Parallel processing_Asynchronous processing_Cookbook_
Fetching feeds ^^^^^^^^^^^^^^
riko can fetch rss feeds from both local and remote filepaths via "source"
pipes. Each "source" pipe returns a stream, i.e., an iterator of
dictionaries, aka items.
.. code-block:: python
>>> from riko.modules import fetch, fetchsitefeed
>>>
>>> ### Fetch an RSS feed ###
>>> stream = fetch.pipe(conf={'url': 'https://news.ycombinator.com/rss'})
>>>
>>> ### Fetch the first RSS feed found ###
>>> stream = fetchsitefeed.pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})
>>>
>>> ### View the fetched RSS feed(s) ###
>>> #
>>> # Note: regardless of how you fetch an RSS feed, it will have the same
>>> # structure
>>> item = next(stream)
>>> item.keys()
dict_keys(['title_detail', 'author.uri', 'tags', 'summary_detail', 'author_detail',
'author.name', 'y:published', 'y:title', 'content', 'title', 'pubDate',
'guidislink', 'id', 'summary', 'dc:creator', 'authors', 'published_parsed',
'links', 'y:id', 'author', 'link', 'published'])
>>> item['title'], item['author'], item['id']
('Gravity doesn’t care about quantum spin',
'Chris Lee',
'http://arstechnica.com/?p=924009')
Please see the FAQ_ for a complete list of supported file types_ and
protocols. Please see Fetching data and feeds for more examples.
Synchronous processing ^^^^^^^^^^^^^^^^^^^^^^
riko can modify streams via the 40 built-in_ pipes
.. code-block:: python
>>> from riko.collections import SyncPipe
>>>
>>> ### Set the pipe configurations ###
>>> fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
>>> filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
>>> xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
>>> xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}
>>>
>>> ### Create a SyncPipe flow ###
>>> #
>>> # `SyncPipe` is a convenience class that creates chainable flows
>>> # and allows for parallel processing.
>>> #
>>> # The following flow will:
>>> # 1. fetch the hackernews RSS feed
>>> # 2. filter for items with '.com' in the link
>>> # 3. sort the items ascending by title
>>> # 4. fetch the first comment from each item
>>> # 5. flatten the result into one raw stream
>>> # 6. extract the first item's content
>>> #
>>> # Note: sorting is not lazy so take caution when using this pipe
>>>
>>> flow = (
... SyncPipe('fetch', conf=fetch_conf) # 1
... .filter(conf={'rule': filter_rule}) # 2
... .sort(conf={'rule': {'sort_key': 'title'}}) # 3
... .xpathfetchpage(conf=xpath_conf)) # 4
>>>
>>> stream = flow.output # 5
>>>
