Last updated:
0 purchases
pipeasyspark 0.2.1
pipeasy-spark
an easy way to define preprocessing data pipelines for pysparark
Free software: MIT license
Documentation: https://pipeasy-spark.readthedocs.io.
Verified compatibility:
python 3.5, 3.6
pyspark 2.1 -> 2.4
Documentation
https://pipeasy-spark.readthedocs.io/en/latest/
Installation
pip install pipeasy-spark
Usage
The goal of this package is to easily create a pyspark.ml.Pipeline instance that can
transform the columns of a pyspark.sql.Dataframe in order to prepare it for a regressor or classifier.
Assuming we have the titanic dataset as a Dataframe:
df = titanic.select('Survived', 'Sex', 'Age').dropna()
df.show(2)
# +--------+------+----+
# |Survived| Sex| Age|
# +--------+------+----+
# | 0| male|22.0|
# | 1|female|38.0|
# +--------+------+----+
A basic transformation pipeline can be created as follows. We define for each
column of the dataframe a list of transformers that are applied sequencially.
Each transformer is an instance of a transformer from pyspark.ml.feature.
Notice that we do not provide the parameters inputCol or outputCol to
these transformers.
from pipeasy_spark import build_pipeline
from pyspark.ml.feature import (
StringIndexer,
OneHotEncoderEstimator,
VectorAssembler,
StandardScaler,
)
pipeline = build_pipeline(column_transformers={
# 'Survived' : this variable is not modified, it can also be omitted from the dict
'Survived': [],
'Sex': [StringIndexer(), OneHotEncoderEstimator(dropLast=False)],
# 'Age': a VectorAssembler must be applied before the StandardScaler
# as the latter only accepts vectors as input.
'Age': [VectorAssembler(), StandardScaler()]
})
trained_pipeline = pipeline.fit(df)
trained_pipeline.transform(df).show(2)
# +--------+-------------+--------------------+
# |Survived| Sex| Age|
# +--------+-------------+--------------------+
# | 0|(2,[0],[1.0])|[1.5054181442954726]|
# | 1|(2,[1],[1.0])| [2.600267703783089]|
# +--------+-------------+--------------------+
This preprocessing pipeline can be included in a full prediction pipeline :
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
full_pipeline = Pipeline(stages=[
pipeline,
# all the features have to be assembled in a single column:
VectorAssembler(inputCols=['Sex', 'Age'], outputCol='features'),
LogisticRegression(featuresCol='features', labelCol='Survived')
])
trained_predictor = full_pipeline.fit(df)
trained_predictor.transform(df).show(2)
# +--------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
# |Survived| Sex| Age| features| rawPrediction| probability|prediction|
# +--------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
# | 0|(2,[0],[1.0])|[1.5054181442954726]|[1.0,0.0,1.505418...|[2.03811507112527...|[0.88474119316485...| 0.0|
# | 1|(2,[1],[1.0])| [2.600267703783089]|[0.0,1.0,2.600267...|[-0.7360149659890...|[0.32387617489886...| 1.0|
# +--------+-------------+--------------------+--------------------+--------------------+--------------------+----------+
As of this writing, these are the transformers from pyspark.ml.feature that are supported:
[
Binarizer(threshold=0.0, inputCol=None, outputCol=None),
BucketedRandomProjectionLSH(inputCol=None, outputCol=None, seed=None, numHashTables=1, bucketLength=None),
Bucketizer(splits=None, inputCol=None, outputCol=None, handleInvalid='error'),
CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=262144, binary=False, inputCol=None, outputCol=None),
DCT(inverse=False, inputCol=None, outputCol=None),
ElementwiseProduct(scalingVec=None, inputCol=None, outputCol=None),
FeatureHasher(numFeatures=262144, inputCols=None, outputCol=None, categoricalCols=None),
HashingTF(numFeatures=262144, binary=False, inputCol=None, outputCol=None),
IDF(minDocFreq=0, inputCol=None, outputCol=None),
Imputer(strategy='mean', missingValue=nan, inputCols=None, outputCols=None),
IndexToString(inputCol=None, outputCol=None, labels=None),
MaxAbsScaler(inputCol=None, outputCol=None),
MinHashLSH(inputCol=None, outputCol=None, seed=None, numHashTables=1),
MinMaxScaler(min=0.0, max=1.0, inputCol=None, outputCol=None),
NGram(n=2, inputCol=None, outputCol=None),
Normalizer(p=2.0, inputCol=None, outputCol=None),
OneHotEncoder(dropLast=True, inputCol=None, outputCol=None),
OneHotEncoderEstimator(inputCols=None, outputCols=None, handleInvalid='error', dropLast=True),
PCA(k=None, inputCol=None, outputCol=None),
PolynomialExpansion(degree=2, inputCol=None, outputCol=None),
QuantileDiscretizer(numBuckets=2, inputCol=None, outputCol=None, relativeError=0.001, handleInvalid='error'),
RegexTokenizer(minTokenLength=1, gaps=True, pattern='\\\\s+', inputCol=None, outputCol=None, toLowercase=True),
StandardScaler(withMean=False, withStd=True, inputCol=None, outputCol=None),
StopWordsRemover(inputCol=None, outputCol=None, stopWords=None, caseSensitive=False),
StringIndexer(inputCol=None, outputCol=None, handleInvalid='error', stringOrderType='frequencyDesc'),
Tokenizer(inputCol=None, outputCol=None),
VectorAssembler(inputCols=None, outputCol=None),
VectorIndexer(maxCategories=20, inputCol=None, outputCol=None, handleInvalid='error'),
VectorSizeHint(inputCol=None, size=None, handleInvalid='error'),
VectorSlicer(inputCol=None, outputCol=None, indices=None, names=None),
Word2Vec(vectorSize=100, minCount=5, numPartitions=1, stepSize=0.025, maxIter=1, seed=None, inputCol=None, outputCol=None, windowSize=5, maxSentenceLength=1000)
]
These are not supported as it is not possible to specity the input column(s).
[
ChiSqSelector(numTopFeatures=50, featuresCol='features', outputCol=None, labelCol='label', selectorType='numTopFeatures', percentile=0.1, fpr=0.05, fdr=0.05, fwe=0.05),
LSHParams(),
RFormula(formula=None, featuresCol='features', labelCol='label', forceIndexLabel=False, stringIndexerOrderType='frequencyDesc', handleInvalid='error'),
SQLTransformer(statement=None)
]
Contributing
In order to contribute to the project, here is how you should configure your local development environment:
download the code and create a local virtual environment with the required dependencies:
# getting the project
$ git clone [email protected]:Quantmetry/pipeasy-spark.git
$ cd pipeasy-spark
# creating the virtual environnment and activating it
$ make install_dev
$ source .venv/bin/activate
(.venv) $ make test
# start the demo notebook
(.venv) $ make notebook
Note: the make install step installs the package in editable mode into the local virtual environment.
This step is required as it also installs the package dependencies as they are listed in the setup.py file.
make sure that Java is correcly installed. Download
Java and install it. Then you should set the JAVA_HOME environment variable. For instance you can add the
following line to your ~/.bash_profile:
# ~/.bash_profile -> this depends on your platform
# note that the actual path might change for you
export JAVA_HOME="/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home/"
Note: notice that we did not install spark itself. Having a valid Java Runtime Engine installed and installing
pyspark (done when installing the package's dependencies) is enough to run the tests.
run the tests of the project (you need to activate your local virtual environnment):
$ source .venv/bin/activate
# this runs the tests/ with the current python version
(.venv) $ make test
# check that the developped code follows the standards
# (we use flake8 as a linting engine, it is configured in the tox.ini file)
(.venv) $ make lint
# run the tests for several python versions + run linting step
(.venv) $ tox
Note: the continuous integration process (run by TravisCI) performs the latest operation ($ tox). Consequently you should make sure that this step is successfull on your machine before pushing new code to the repository. However you might not have all python versions installed on your local machine; this is ok in most cases.
(optional) configure your text editor. Because the tests include a linting step, it is convenient to add this linting to your
editor. For instance you can use VSCode with the Python extension and add linting with flake8 in the settings.
It is a good idea to use as a python interpreter for linting (and code completetion etc.) the one in your local virtuel environnment.
flake8 configuration options are specified in the tox.ini file.
Notes on setting up the project
I setup the project using this cookiecutter project
create a local virtual environnment and activate it
install requirements_dev.txt
$ python3 -m venv .venv
$ source .venv/bin./activate
(.venv) $ pip install -r requirements_dev.txt
I update my vscode config to use this virtual env as the python interpreter for this project (doc)
(the modified file is <my_repo>/.vscode/settings.json)
I update the Makefile to be able to run tests (this way I don't have to mess with the PYTHONPATH):
# avant
test: ## run tests quickly with the default Python
py.test
# modification
test: ## run tests quickly with the default Python
python -m pytest tests/
I can now run (when inside my local virtual environment):
(.venv) $ make test
I can also run tox which runs the tests agains several python versions:
(.venv) $ tox
py27: commands succeeded
ERROR: py34: InterpreterNotFound: python3.4
ERROR: py35: InterpreterNotFound: python3.5
py36: commands succeeded
flake8: commands succeeded
I log into Travis with my Github account. I can see and configure the builds for this repository (I have admin rights on the repo).
I can trigger a build without pushing to the repository (More Options / Trigger Build). Everything runs fine!
I push this to a new branch : Travis triggers tests on this branch (even without creating a pull request).
The tests fail because I changed README.rst to README.md. I need to also change this in setup.py.
I create an account on pypi.org and link it to the current project
(documentation)
$ brew install travis
$ travis encrypt ****** --add deploy.password
This modifies the .travis.yml file. I customize it slightly because the package name is wrong:
# .travis.yml
deploy:
on:
tags: true
# the repo was not correct:
repo: Quantmetry/pipeasy-spark
I update the version and push a tag:
$ bumpversion patch
$ git push --tags
$ git push
I can indeed see the tag (and an associated release) on the github interface. However Travis does not deploy
on this commit. This is the intended behaviour. Be default
travis deploys only on the master branch.
I setup an account on readthedoc.io. Selecting the repo is enough to have the documentation online!
When I merge to master Travis launches a build bus says it will not deploy
(see this build for instance). However the library
was indeed deployed to pypi: I can pip install it..
Note: when Omar pushed new commits, travis does not report their status.
I update setup.py to add the dependence to pyspark. I also modify slightly the development setup (see Development section above).
=======
History
0.2.0 (2019-01-06)
First usable version of the package. We decided on the api:
pipeasy_spark.build_pipeline(column_transformers={'column': []}) is the core function
where you can define a list of transormers for each columns.
pipeasy_spark.build_pipeline_by_dtypes(df, string_transformers=[]) allows
you to define a list of transormers for two types of columns: string_ and numeric_.
pipeasy_spark.build_default_pipeline(df, exclude_columns=['target']) builds a default
transformer for the df dataframe.
0.1.2 (2018-10-12)
I am still learning how all these tools interact with each other
0.1.1 (2018-10-12)
First release on PyPI.
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.