PyFunctional
Python library for creating data pipelines with chain functional programming
Install / Use
/learn @EntilZha/PyFunctionalREADME
PyFunctional
Features
PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a
few examples of what it can do:
- Chained operators:
seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y) - Expressive and feature complete API
- Read and write
text,csv,json,jsonl,sqlite,gzip,bz2, andlzma/xzfiles - Parallelize "embarrassingly parallel" operations like
mapeasily - Complete documentation, rigorous unit test suite, 100% test coverage, and CI which provide robustness
PyFunctional's API takes inspiration from Scala collections, Apache Spark RDDs, and Microsoft
LINQ.
Table of Contents
- Installation
- Examples
- Writing to Files
- Parallel Execution
- Github Shortform Documentation
- Contributing and Bug Fixes
- Changelog
Installation
PyFunctional is available on pypi and can be
installed by running:
# Install from command line
$ pip install pyfunctional
Then in python run: from functional import seq
Examples
PyFunctional is useful for many tasks, and can natively open several common file types. Here
are a few examples of what you can do.
Simple Example
from functional import seq
seq(1, 2, 3, 4)\
.map(lambda x: x * 2)\
.filter(lambda x: x > 4)\
.reduce(lambda x, y: x + y)
# 14
# or if you don't like backslash continuation
(seq(1, 2, 3, 4)
.map(lambda x: x * 2)
.filter(lambda x: x > 4)
.reduce(lambda x, y: x + y)
)
# 14
Streams, Transformations and Actions
PyFunctional has three types of functions:
- Streams: read data for use by the collections API.
- Transformations: transform data from streams with functions such as
map,flat_map, andfilter - Actions: These cause a series of transformations to evaluate to a concrete value.
to_list,reduce, andto_dictare examples of actions.
In the expression seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y), seq is the
stream, map is the transformation, and reduce is the action.
Filtering a list of account transactions
from functional import seq
from collections import namedtuple
Transaction = namedtuple('Transaction', 'reason amount')
transactions = [
Transaction('github', 7),
Transaction('food', 10),
Transaction('coffee', 5),
Transaction('digitalocean', 5),
Transaction('food', 5),
Transaction('riotgames', 25),
Transaction('food', 10),
Transaction('amazon', 200),
Transaction('paycheck', -1000)
]
# Using the Scala/Spark inspired APIs
food_cost = seq(transactions)\
.filter(lambda x: x.reason == 'food')\
.map(lambda x: x.amount).sum()
# Using the LINQ inspired APIs
food_cost = seq(transactions)\
.where(lambda x: x.reason == 'food')\
.select(lambda x: x.amount).sum()
# Using PyFunctional with fn
from fn import _
food_cost = seq(transactions).filter(_.reason == 'food').map(_.amount).sum()
Aggregates and Joins
The account transactions example could be done easily in pure python using list comprehensions. To
show some of the things PyFunctional excels at, take a look at a couple of word count examples.
words = 'I dont want to believe I want to know'.split(' ')
seq(words).map(lambda word: (word, 1)).reduce_by_key(lambda x, y: x + y)
# [('dont', 1), ('I', 2), ('to', 2), ('know', 1), ('want', 2), ('believe', 1)]
In the next example we have chat logs formatted in json lines (jsonl) which contain messages and metadata. A typical jsonl file will have one valid json on each line of a file. Below are a few lines out of examples/chat_logs.jsonl.
{"message":"hello anyone there?","date":"10/09","user":"bob"}
{"message":"need some help with a program","date":"10/09","user":"bob"}
{"message":"sure thing. What do you need help with?","date":"10/09","user":"dave"}
from operator import add
import re
messages = seq.jsonl('examples/chat_logs.jsonl')
# Split words on space and normalize before doing word count
def extract_words(message):
return re.sub('[^0-9a-z ]+', '', message.lower()).split(' ')
word_counts = messages\
.map(lambda log: extract_words(log['message']))\
.flatten().map(lambda word: (word, 1))\
.reduce_by_key(add).order_by(lambda x: x[1])
Next, let's continue that example but introduce a json database of users from examples/users.json.
In the previous example we showed how PyFunctional can do word counts, in the next example let's
show how PyFunctional can join different data sources.
# First read the json file
users = seq.json('examples/users.json')
#[('sarah',{'date_created':'08/08','news_email':True,'email':'sarah@gmail.com'}),...]
email_domains = users.map(lambda u: u[1]['email'].split('@')[1]).distinct()
# ['yahoo.com', 'python.org', 'gmail.com']
# Join users with their messages
message_tuples = messages.group_by(lambda m: m['user'])
data = users.inner_join(message_tuples)
# [('sarah',
# (
# {'date_created':'08/08','news_email':True,'email':'sarah@gmail.com'},
# [{'date':'10/10','message':'what is a...','user':'sarah'}...]
# )
# ),...]
# From here you can imagine doing more complex analysis
CSV, Aggregate Functions, and Set functions
In examples/camping_purchases.csv there is a list of camping purchases. Let's do some cost analysis and compare it to the required camping gear list stored in examples/gear_list.txt.
purchases = seq.csv('examples/camping_purchases.csv')
total_cost = purchases.select(lambda row: int(row[2])).sum()
# 1275
most_expensive_item = purchases.max_by(lambda row: int(row[2]))
# ['4', 'sleeping bag', ' 350']
purchased_list = purchases.select(lambda row: row[1])
gear_list = seq.open('examples/gear_list.txt').map(lambda row: row.strip())
missing_gear = gear_list.difference(purchased_list)
# ['water bottle','gas','toilet paper','lighter','spoons','sleeping pad',...]
In addition to the aggregate functions shown above (sum and max_by) there are many more.
Similarly, there are several more set like functions in addition to difference.
Reading/Writing SQLite3
PyFunctional can read and write to SQLite3 database files. In the example below, users are read
from examples/users.db which stores them as rows with columns id:Int and name:String.
db_path = 'examples/users.db'
users = seq.sqlite3(db_path, 'select * from user').to_list()
# [(1, 'Tom'), (2, 'Jack'), (3, 'Jane'), (4, 'Stephan')]]
sorted_users = seq.sqlite3(db_path, 'select * from user order by name').to_list()
# [(2, 'Jack'), (3, 'Jane'), (4, 'Stephan'), (1, 'Tom')]
Writing to a SQLite3 database is similarly easy
import sqlite3
from collections import namedtuple
with sqlite3.connect(':memory:') as conn:
conn.execute('CREATE TABLE user (id INT, name TEXT)')
conn.commit()
User = namedtuple('User', 'id name')
# Write using a specific query
seq([(1, 'pedro'), (2, 'fritz')]).to_sqlite3(conn, 'INSERT INTO user (id, name) VALUES (?, ?)')
# Write by inserting values positionally from a tuple/list into named table
seq([(3, 'sam'), (4, 'stan')]).to_sqlite3(conn, 'user')
# Write by inferring schema from namedtuple
seq([User(name='tom', id=5), User(name='keiga', id=6)]).to_sqlite3(conn, 'user')
# Write by inferring schema from dict
seq([dict(name='david', id=7), dict(name='jordan', id=8)]).to_sqlite3(conn, 'user')
# Read everything back to make sure it wrote correctly
print(list(conn.execute('SELECT * FROM user')))
# [(1, 'pedro'), (2, 'fritz'), (3, 'sam'), (4, 'stan'), (5, 'tom'), (6, 'keiga'), (7, 'david'), (8, 'jordan')]
Writing to files
Just as PyFunctional can read from csv, json, jsonl, sqlite3, and text files, it can
also write them. For complete API documentation see the collections API table or the official docs.
Compressed Files
PyFunctional will auto-detect files compressed with gzip, lzma/xz, and bz2. This is done
by examining the first several bytes of the file to determine if it is compressed so therefore
requires no code changes to work.
To write compressed files, every to_ function has a parameter compression which can be set to
the default None for no compression, gzip or gz for gzip compression, lzma or xz for lzma
compression, and bz2 for bz2 compression.
Parallel Execution
The only change required to enable parallelism is to import from functional import pseq instead of
from functional import seq and use pseq where you would use seq. The following
operations are run in para
