Saya mencoba mengubah DF Panda menjadi Spark. Kepala DF:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Kode:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
Dan saya mendapat kesalahan:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Kesalahan terkait tipe dapat dihindari dengan memaksakan skema sebagai berikut:
catatan: sebuah file teks dibuat (test.csv) dengan data asli (seperti di atas) dan nama-nama kolom hipotetis disisipkan ("col1","col2",...,"col25").
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
isi dari bingkai data pandas:
pdDF
col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25
0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543
1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611
2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691
Selanjutnya, buat skema:
from pyspark.sql.types import *
mySchema = StructType([ StructField("Col1", LongType(), True)\
,StructField("Col2", IntegerType(), True)\
,StructField("Col3", IntegerType(), True)\
,StructField("Col4", IntegerType(), True)\
,StructField("Col5", StringType(), True)\
,StructField("Col6", StringType(), True)\
,StructField("Col7", IntegerType(), True)\
,StructField("Col8", IntegerType(), True)\
,StructField("Col9", IntegerType(), True)\
,StructField("Col10", IntegerType(), True)\
,StructField("Col11", StringType(), True)\
,StructField("Col12", StringType(), True)\
,StructField("Col13", IntegerType(), True)\
,StructField("Col14", IntegerType(), True)\
,StructField("Col15", IntegerType(), True)\
,StructField("Col16", IntegerType(), True)\
,StructField("Col17", IntegerType(), True)\
,StructField("Col18", IntegerType(), True)\
,StructField("Col19", IntegerType(), True)\
,StructField("Col20", IntegerType(), True)\
,StructField("Col21", IntegerType(), True)\
,StructField("Col22", IntegerType(), True)\
,StructField("Col23", IntegerType(), True)\
,StructField("Col24", IntegerType(), True)\
,StructField("Col25", IntegerType(), True)])
Catatan: True
(menyiratkan nullable diperbolehkan)
membuat dataframe pyspark:
df = spark.createDataFrame(pdDF,schema=mySchema)
mengkonfirmasi data frame pandas sekarang menjadi data frame pyspark:
type(df)
keluaran:
pyspark.sql.dataframe.DataFrame
Selain itu:
Untuk menjawab komentar Kate di bawah ini - untuk memaksakan skema umum (String), Anda dapat melakukan hal berikut:
df=spark.createDataFrame(pdDF.astype(str))
Anda perlu memastikan kolom dataframe pandas anda sesuai dengan tipe yang disimpulkan spark. Jika dataframe pandas anda mencantumkan sesuatu seperti:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
Dan anda mendapatkan error tersebut, cobalah:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Sekarang, pastikan .astype(str)
sebenarnya adalah tipe yang Anda inginkan dari kolom-kolom tersebut. Pada dasarnya, ketika kode Java yang mendasari mencoba menyimpulkan tipe dari suatu objek dalam python, ia menggunakan beberapa pengamatan dan membuat tebakan, jika tebakan itu tidak berlaku untuk semua data di kolom yang coba dikonversi dari pandas ke spark, itu akan gagal.
Saya telah mencoba ini dengan data Anda dan berhasil:
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()