PySpark学习笔记

pyspark使用笔记,含1.jupyter-docker环境搭建 2.dataframe2jdbc,jdbc2dataframe 3.模型训练及保存 4.模型导入及使用 等。

一 环境部署

执行以下命令,访问 ip:18804即可,密码为:my-password
镜像使用参考:https://jupyter-docker-stacks.readthedocs.io/en/latest

1
2
3
4
5
6
7
8
9
10
11
12
mkdir -p /data/pyspark && chmod 777 -R /data/pyspark

docker run -d \
--name my-spark-notebook \
--restart=always \
-e GRANT_SUDO=yes \
-v /data/pyspark:/home/jovyan \
--network host \
jupyter/all-spark-notebook \
start.sh jupyter lab \
--NotebookApp.password='sha1:7cca89c48283:e3c1f9fbc06d1d2aa59555dfd5beed925e30dd2c'

二 dataframe导出数据库

参考:

  1. https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/from_to_dbms.html
  2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.jdbc.html?highlight=dataframewriter

在jupyter打开的终端界面中执行以下命令,下载mysql驱动包

1
curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.sql import SparkSession
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from sklearn.datasets import load_boston
import pandas as pd
from pyspark.ml import Pipeline

spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext

boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())

sqlContext = SQLContext(sc)

data = sqlContext.createDataFrame(df_boston)
print(type(data))
print(data.printSchema())

data.write.jdbc(table='df_boston',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})

三 读取mysql数据完成线性回归并导出模型文件

参考:https://www.datatechnotes.com/2021/05/mllib-linear-regression-example-with.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd

spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext

df_boston = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456')
print(type(df_boston))
print(df_boston.head())
data = df_boston.to_spark()
print(type(data))
print(data.printSchema())

columns=data.columns
print(columns)
columns.remove('target')
print(columns)

va = VectorAssembler(inputCols=columns, outputCol='features')
print(data.count())
va_df = va.transform(data.limit(200))
va_df = va_df.select(['features', 'target'])
va_df.show(3)

lr=LinearRegression(featuresCol='features', labelCol='target',
regParam=0.3, elasticNetParam=0.8)

lr_model = lr.fit(va_df)
print(type(lr_model))
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)

print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2)

lr_model.save('LinearRegressionModel_model')

mdata = lr_model.transform(va_df)
mdata.show(3)

x_ax = range(0, mdata.count())
y_pred = mdata.select("prediction").collect()
y_orig = mdata.select("target").collect()

# Stop session
sc.stop()

四 加载模型并进行预测

注意 LinearRegressionModel 和 LinearRegression 的关系。
在上一步 LinearRegression fit之后产生LinearRegressionModel并保存为文件。
现在再使用 LinearRegressionModel的load方法从文件重新加载回来进行 transform 。

参考pipeline的概念的话:
LinearRegression 是 Estimator
而 LinearRegressionModel 是 Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
import pyspark.ml as ml
import pyspark as pyspark
import sys

spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext

result_mysqlSource = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456').to_spark()
result_transition0=result_mysqlSource

va = pyspark.ml.feature.VectorAssembler()
va.setParams(inputCols=['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT'])
va.setParams(outputCol='features')
result_va=va.transform(result_transition0)

lr_model = pyspark.ml.regression.LinearRegressionModel.load('file:///home/jovyan/LinearRegressionModel_model_2')
result_lr_model=lr_model.transform(result_va)

result_lr_model.select(['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT','target','prediction']) \
.write.jdbc(table='df_boston_result',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})

sc.stop()

五 使用 pipeline 改造

参考:https://spark.apache.org/docs/latest/ml-pipeline.html
部分中文翻译参考: https://zhuanlan.zhihu.com/p/33619687

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
import pyspark.ml as ml
import pyspark as pyspark
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
import sys

spark = SparkSession.builder.master('local') \
.appName("pyspark-lin") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext

result_mysqlSource = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456').to_spark()
result_transition0=result_mysqlSource

va = pyspark.ml.feature.VectorAssembler(inputCols=['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT'],outputCol='features')
lr_model = pyspark.ml.regression.LinearRegressionModel.load('file:///home/jovyan/LinearRegressionModel_model')
pipelineModel = PipelineModel(stages=[va, lr_model])

result=pipelineModel.transform(result_mysqlSource);

result.select(['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT','target','prediction']) \
.write.jdbc(table='df_boston_result2',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})

sc.stop()

PySpark学习笔记
https://linshenkx.github.io/pyspark-notes/
作者
John Doe
发布于
2022年3月16日
许可协议