PySpark MLlib API provides a DecisionTreeClassifier model to implement classification with decision tree method. A decision tree method is one of the well known and powerful supervised machine learning algorithms that can be used for classification and regression tasks. It is a tree-like, top-down flow learning method to extract rules from the training data. The branches of the tree are based on certain decision outcomes.
In
this tutorial, we'll briefly learn how to fit and classify
data by using PySpark DecisionTreeClassifier. The
tutorial
covers:
- Preparing the data
- Prediction and accuracy check
- Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd
Preparing the data
We use Iris dataset to perform classification and it can be easily loaded from the Scikit-learn dataset module. Below code explains how to load
dataset and transform it into the pandas data frame type.
iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = pd.Series(iris.target)
print(df_iris.head())
sepal length (cm) sepal width (cm) ... petal width (cm) label
0 5.1 3.5 ... 0.2 0
1 4.9 3.0 ... 0.2 0
2 4.7 3.2 ... 0.2 0
3 4.6 3.1 ... 0.2 0
4 5.0 3.6 ... 0.2 0
Next, we'll define SqlConext and create data frame by using df_iris data.
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_iris)
print(data.printSchema())
root
|-- sepal length (cm): double (nullable = true)
|-- sepal width (cm): double (nullable = true)
|-- petal length (cm): double (nullable = true)
|-- petal width (cm): double (nullable = true)
|-- label: long (nullable = true)
To combine all feature data and separate 'label' data in a dataset, we use VectorAssembler.
features = iris.feature_names
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
+-----------------+-----+
| features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]| 0|
|[4.9,3.0,1.4,0.2]| 0|
|[4.7,3.2,1.3,0.2]| 0|
+-----------------+-----+
only showing top 3 rows
Next, we'll split data into the train and test parts.
(train, test) = va_df.randomSplit([0.8, 0.2])
Prediction and Accuracy Check
Next, we'll define the decision tree classifier model by using the DecisionTreeClassifier
class and fit model on train data. We can predict test data by using trasnform() method.
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc = dtc.fit(train)
pred = dtc.transform(test)
pred.show(3)
+-----------------+-----+--------------+-------------+----------+
| features|label| rawPrediction| probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[4.4,2.9,1.4,0.2]| 0|[35.0,0.0,0.0]|[1.0,0.0,0.0]| 0.0|
|[4.6,3.1,1.5,0.2]| 0|[35.0,0.0,0.0]|[1.0,0.0,0.0]| 0.0|
|[5.0,3.6,1.4,0.2]| 0|[35.0,0.0,0.0]|[1.0,0.0,0.0]| 0.0|
+-----------------+-----+--------------+-------------+----------+
only showing top 3 rows
After training the model, we'll predict test data and check the accuracy metrics. Here, we can use MulticlassClassificationEvaluator to check the accuracy. Confusion matrix can be created by using confusion_matrix function of sklearn.metrics module.
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Prediction Accuracy: 1.0
Confusion Matrix:
[[13 0 0]
[ 0 5 0]
[ 0 0 10]]
Finally, we'll stop spark context session.
# Stop session
sc.stop()
In this tutorial, we've briefly learned how to fit and classify data by using PySpark DecisionTreeClassifier class. The full
source code is listed below.
Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix
from sklearn.datasets import load_iris
import pandas as pd
iris = load_iris()
df_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
df_iris['label'] = pd.Series(iris.target)
print(df_iris.head())
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_iris)
print(data.printSchema())
features = iris.feature_names
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)
(train, test) = va_df.randomSplit([0.8, 0.2])
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc = dtc.fit(train)
pred = dtc.transform(test)
pred.show(3)
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)
print("Prediction Accuracy: ", acc)
y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
sc.stop()
References:
No comments:
Post a Comment