我想把Pandas DF转换成Spark的。 DF头:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
代码:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
而我得到了一个错误:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
类型相关的错误可以通过**强加一个模式来避免,如下所示:
注:用原始数据(如上)创建一个文本文件(test.csv),并插入假设的列名("col1","col2",...,"col25")。
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
pandas数据框的内容:
pdDF
col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25
0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543
1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611
2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691
接下来,创建模式:
from pyspark.sql.types import *
mySchema = StructType([ StructField("Col1", LongType(), True)\
,StructField("Col2", IntegerType(), True)\
,StructField("Col3", IntegerType(), True)\
,StructField("Col4", IntegerType(), True)\
,StructField("Col5", StringType(), True)\
,StructField("Col6", StringType(), True)\
,StructField("Col7", IntegerType(), True)\
,StructField("Col8", IntegerType(), True)\
,StructField("Col9", IntegerType(), True)\
,StructField("Col10", IntegerType(), True)\
,StructField("Col11", StringType(), True)\
,StructField("Col12", StringType(), True)\
,StructField("Col13", IntegerType(), True)\
,StructField("Col14", IntegerType(), True)\
,StructField("Col15", IntegerType(), True)\
,StructField("Col16", IntegerType(), True)\
,StructField("Col17", IntegerType(), True)\
,StructField("Col18", IntegerType(), True)\
,StructField("Col19", IntegerType(), True)\
,StructField("Col20", IntegerType(), True)\
,StructField("Col21", IntegerType(), True)\
,StructField("Col22", IntegerType(), True)\
,StructField("Col23", IntegerType(), True)\
,StructField("Col24", IntegerType(), True)\
,StructField("Col25", IntegerType(), True)])
注意:True
(意味着允许nullable)。
创建 pyspark 数据框架:
df = spark.createDataFrame(pdDF,schema=mySchema)
确认pandas数据框现在是一个pyspark数据框:
type(df)
输出:
pyspark.sql.dataframe.DataFrame
副作用:
针对Kate'的评论,要强加一个一般的(String)模式,你可以这样做:
df=spark.createDataFrame(pdDF.astype(str))
你需要确保你的pandas数据框架的列是适合于spark推断的类型。 如果你的pandas数据框架列出了类似的东西:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
而你得到了这样的错误,请尝试:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
现在,确保.astype(str)
实际上是你想让这些列成为的类型。 基本上,当底层Java代码试图从python中的对象推断类型时,它会使用一些观察结果并进行猜测,如果这个猜测不适用于它试图从pandas转换到spark的列中的所有数据,就会失败。