Estoy intentando convertir el DF de Pandas en uno de Spark. Cabeza de DF:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Código:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
Y tengo un error:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Los errores relacionados con el tipo pueden evitarse imponiendo un esquema como el siguiente:
nota: se ha creado un archivo de texto (test.csv) con los datos originales (como los anteriores) y se han insertado nombres de columnas hipotéticos ("col1","col2",...,"col25").
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
contenido del marco de datos de pandas:
pdDF
col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25
0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543
1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611
2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691
A continuación, cree el esquema:
from pyspark.sql.types import *
mySchema = StructType([ StructField("Col1", LongType(), True)\
,StructField("Col2", IntegerType(), True)\
,StructField("Col3", IntegerType(), True)\
,StructField("Col4", IntegerType(), True)\
,StructField("Col5", StringType(), True)\
,StructField("Col6", StringType(), True)\
,StructField("Col7", IntegerType(), True)\
,StructField("Col8", IntegerType(), True)\
,StructField("Col9", IntegerType(), True)\
,StructField("Col10", IntegerType(), True)\
,StructField("Col11", StringType(), True)\
,StructField("Col12", StringType(), True)\
,StructField("Col13", IntegerType(), True)\
,StructField("Col14", IntegerType(), True)\
,StructField("Col15", IntegerType(), True)\
,StructField("Col16", IntegerType(), True)\
,StructField("Col17", IntegerType(), True)\
,StructField("Col18", IntegerType(), True)\
,StructField("Col19", IntegerType(), True)\
,StructField("Col20", IntegerType(), True)\
,StructField("Col21", IntegerType(), True)\
,StructField("Col22", IntegerType(), True)\
,StructField("Col23", IntegerType(), True)\
,StructField("Col24", IntegerType(), True)\
,StructField("Col25", IntegerType(), True)])
Nota: True
(implica que se puede anular)
crear el marco de datos pyspark:
df = spark.createDataFrame(pdDF,schema=mySchema)
confirmar que el marco de datos de pandas es ahora un marco de datos de pyspark:
type(df)
salida:
pyspark.sql.dataframe.DataFrame
Asunto:
Para abordar el comentario de Kate's abajo - para imponer un esquema general (Cadena) se puede hacer lo siguiente:
df=spark.createDataFrame(pdDF.astype(str))
Tienes que asegurarte de que las columnas de tu dataframe de pandas son apropiadas para el tipo que spark está deduciendo. Si tu dataframe de pandas lista algo como:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
Y obtienes ese error prueba:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Ahora, asegúrate de que .astype(str)
es realmente el tipo que quieres que tengan esas columnas. Básicamente, cuando el código Java subyacente intenta inferir el tipo de un objeto en python utiliza algunas observaciones y hace una conjetura, si esa conjetura no se aplica a todos los datos de la(s) columna(s) que intenta convertir de pandas a spark fallará.