Apache Spark is an analytic engine to process large scale dataset by using tools such as Spark SQL, MLLib and others. PySpark is a Python API to execute Spark applications in Python.
In this tutorial, we'll briefly learn how to fit and predict regression data by using PySpark and MLLib Linear Regression model. The
tutorial
covers:
- Preparing the data
- Fitting and accuracy check
- Visualizing the results
- Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
Preparing the data
We use Boston Housing Price dataset of Scikit-learn. We'll load dataset, transform it into the data frame type, and combine into single features type by using VectorAssembler in order to make the appropriate input data format for LinearRegression class of PySpark ML library.
boston = load_boston() df_boston = pd.DataFrame(boston.data,columns=boston.feature_names) df_boston['target'] = pd.Series(boston.target) print(df_boston.head())
Next, we'll define SqlConext and create data frame by using df_boston data.
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())
root
|-- CRIM: double (nullable = true)
|-- ZN: double (nullable = true)
|-- INDUS: double (nullable = true)
|-- CHAS: double (nullable = true)
|-- NOX: double (nullable = true)
|-- RM: double (nullable = true)
|-- AGE: double (nullable = true)
|-- DIS: double (nullable = true)
|-- RAD: double (nullable = true)
|-- TAX: double (nullable = true)
|-- PTRATIO: double (nullable = true)
|-- B: double (nullable = true)
|-- LSTAT: double (nullable = true)
|-- target: double (nullable = true)
To combine all feature data and separate 'label' data in a dataset, we use VectorAnalyzer.
features = boston.feature_names.tolist()
va = VectorAssembler(inputCols=features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
+--------------------+------+
| features|target|
+--------------------+------+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
+--------------------+------+
only showing top 3 rows
Fitting and Accuracy Check
Next, we'll define the regressor model by using the LinearRegression
class. Here, we can change the parameters according to your data content. After fitting the model we can check coefficients and intercept values.
lr=LinearRegression(featuresCol='features', labelCol='target',
regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(va_df)
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)
Coefficients: [-0.034024229130007695,0.009359015936752714,
0.0,2.247564189644528,-7.230786173732827,4.348712110587842,
0.0,-0.603564999831066,0.0,0.0,-0.8220712024477692,
0.00808024431913416,-0.5034480504252381]
Intercept: 20.025217329865892
Now, we check the accuracy scores of fitted data. The model provides easy summary report of accuracy metrics.
print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2)
MSE: 23.832602753248327
MAE: 3.3409807187310054
R-squared: 0.7176886039395777
Visualizing the results
We can use 'matplotlib' library to visualize the original and predicted 'label' data. We'll extract those data from the lr_model object.
mdata = lr_model.transform(va_df)
mdata.show(3)
+--------------------+------+------------------+
| features|target| prediction|
+--------------------+------+------------------+
|[0.00632,18.0,2.3...| 24.0|30.554831691938382|
|[0.02731,0.0,7.07...| 21.6| 25.47215641847489|
|[0.02729,0.0,7.07...| 34.7| 31.3186615896002|
+--------------------+------+------------------+
x_ax = range(0, mdata.count())
y_pred = mdata.select("prediction").collect()
y_orig = mdata.select("target").collect()
Finally, we'll visualize the original and predicted data in a plot.
plt.plot(x_ax, y_orig, label="original") plt.plot(x_ax, y_pred, label="predicted") plt.title("Boston test and predicted data") plt.xlabel('X-axis') plt.ylabel('Y-axis') plt.legend(loc='best',fancybox=True, shadow=True) plt.grid(True) plt.show()
If you do new executions of your code, do not forget to close the spark context session.
# Stop session
sc.stop()
In this tutorial, we've briefly learned how to fit and predict regression data by using PySpark and MLLib LinearRegression model. The full
source code is listed below.
Source code listing
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
sc = SparkContext().getOrCreate()
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_boston)
print(data.printSchema())
features = boston.feature_names.tolist()
va = VectorAssembler(inputCols = features, outputCol='features')
va_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
lr = LinearRegression(featuresCol='features', labelCol='target',
regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(va_df)
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)
print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2)
mdata = lr_model.transform(va_df)
mdata.show(3)
x_ax = range(0, mdata.count())
y_pred=mdata.select("prediction").collect()
y_orig=mdata.select("target").collect()
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Boston test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()
# Stop session
sc.stop()
References:
when i run the following code in jupyter notebook-
ReplyDeleteva_df = va.transform(data)
va_df = va_df.select(['features', 'target'])
va_df.show(3)
i get the following error-
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Input In [12], in ()
1 va_df = va.transform(data)
2 va_df = va_df.select(['features', 'target'])
----> 3 va_df.show(3)