Factorization machines (FM) is a predictor model that estimates parameters under the high sparsity. The model combines advantages of SVM and applies a factorized parameters instead of dense parametrization like in SVM [2]. FM is a supervised learning algorithm and can be used in classification, regression, and recommendation system tasks in machine learning. PySpark MLLib API provides a FMClassifier class to classify binary data with factorization machines method.
In
this tutorial, you'll briefly learn how to train and classify
binary classification data by using PySpark FMClassifier model. The
tutorial
covers:
- Preparing the data
- Prediction and accuracy check
- Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import FMClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series
For a teaching purpose I use a simple dataset in this tutorial. If you want to evaluate the real performance of FMClassifier model, you need to prepare a large sparse dataset.
We use Breast Cancer dataset to perform binary classification and it can be easily
loaded from the Scikit-learn dataset module. Below code explains how to
load
dataset and transform it into the pandas data frame type.
Next, we'll define SqlConext and create data frame by using df_bc data. You can check the data frame schema.
bc = load_breast_cancer()
df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target)
print(df_bc.head())
mean radius mean texture ... worst fractal dimension label
0 17.99 10.38 ... 0.11890 0
1 20.57 17.77 ... 0.08902 0
2 19.69 21.25 ... 0.08758 0
3 11.42 20.38 ... 0.17300 0
4 20.29 14.34 ... 0.07678 0
[5 rows x 31 columns]
Next, we'll define SqlConext and create data frame by using df_bc data. You can check the data frame schema.
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_bc)
print(data.printSchema())
root
|-- mean radius: double (nullable = true)
|-- mean texture: double (nullable = true)
|-- mean perimeter: double (nullable = true)
|-- mean area: double (nullable = true)
|-- mean smoothness: double (nullable = true)
|-- mean compactness: double (nullable = true)
|-- mean concavity: double (nullable = true)
|-- mean concave points: double (nullable = true)
|-- mean symmetry: double (nullable = true)
|-- mean fractal dimension: double (nullable = true)
|-- radius error: double (nullable = true)
|-- texture error: double (nullable = true)
|-- perimeter error: double (nullable = true)
|-- area error: double (nullable = true)
|-- smoothness error: double (nullable = true)
|-- compactness error: double (nullable = true)
|-- concavity error: double (nullable = true)
|-- concave points error: double (nullable = true)
|-- symmetry error: double (nullable = true)
|-- fractal dimension error: double (nullable = true)
|-- worst radius: double (nullable = true)
|-- worst texture: double (nullable = true)
|-- worst perimeter: double (nullable = true)
|-- worst area: double (nullable = true)
|-- worst smoothness: double (nullable = true)
|-- worst compactness: double (nullable = true)
|-- worst concavity: double (nullable = true)
|-- worst concave points: double (nullable = true)
|-- worst symmetry: double (nullable = true)
|-- worst fractal dimension: double (nullable = true)
|-- label: long (nullable = true)
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.
features = bc.feature_names
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
+--------------------+-----+
| features|label|
+--------------------+-----+
|[17.99,10.38,122....| 0|
|[20.57,17.77,132....| 0|
|[19.69,21.25,130....| 0|
+--------------------+-----+
only showing top 3 rows
Next, we'll split data into the train and test parts.
# split data into train and test
(train, test) = va_df.randomSplit([0.9, 0.1])
Prediction and Accuracy Check
We'll define the factorization machines model by using the FMClassifier
class and fit the model on train data. Here, we'll set 0.001 into the stepSize parameter. To predict test data, we can use trasnform() method.
# training
fmc = FMClassifier(labelCol="label", stepSize=0.001)
fmc = fmc.fit(train)
# prediction
pred = fmc.transform(test)
pred.show(3)
+--------------------+-----+--------------------+--------------------+----------+
| features|label| rawPrediction| probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[13.03,18.42,82.6...| 1|[-15.744299067728...|[1.45324129618984...| 1.0|
|[13.08,15.71,85.6...| 1|[-10.213107316564...|[3.66849487735132...| 1.0|
|[13.28,20.28,87.3...| 0|[19.3255629862012...|[0.99999999595410...| 0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
After
predicting test data, we'll check the prediction accuracy. Here, we can
use MulticlassClassificationEvaluator. Confusion matrix can be created
by using confusion_matrix
function of sklearn.metrics module.
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Prediction Accuracy: 0.9642857142857143
Confusion Matrix:
[[21 1]
[ 1 33]]
Finally, we'll stop spark context session.
# Stop session
sc.stop()
In this tutorial, we've briefly learned how to fit and classify data by using PySpark FMClassifier class. The full
source code is listed below.
Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import FMClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_breast_cancer
from pandas import DataFrame, Series
bc = load_breast_cancer()
df_bc = DataFrame(bc.data, columns=bc.feature_names)
df_bc['label'] = Series(bc.target)
print(df_bc.head())
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_bc)
print(data.printSchema())
features = bc.feature_names
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
# split data into train and test
(train, test) = va_df.randomSplit([0.9, 0.1])
# training
fmc = FMClassifier(labelCol="label", stepSize=0.001)
fmc = fmc.fit(train)
# prediction
pred = fmc.transform(test)
pred.show(3)
# accucary check
evaluator=MulticlassClassificationEvaluator(metricName="accuracy")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)
# confusion matrix
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
sc.stop()
References:
No comments:
Post a Comment