Saya telah mendefinisikan dua tabel seperti ini:
val tableName = "table1"
val tableName2 = "table2"
val format = new SimpleDateFormat("yyyy-MM-dd")
val data = List(
List("mike", 26, true),
List("susan", 26, false),
List("john", 33, true)
)
val data2 = List(
List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("isBoy", BooleanType, false)
))
val schema2 = StructType(Array(
StructField("name", StringType, true),
StructField("grade", StringType, true),
StructField("howold", IntegerType, true),
StructField("hobby", StringType, true),
StructField("birthday", DateType, false)
))
val df = sqlContext.createDataFrame(rdd, schema)
val df2 = sqlContext.createDataFrame(rdd2, schema2)
df.createOrReplaceTempView(tableName)
df2.createOrReplaceTempView(tableName2)
Saya mencoba membuat query untuk mengembalikan baris dari tabel1 yang tidak memiliki baris yang cocok di tabel2. Saya sudah mencoba melakukannya dengan menggunakan query ini:
Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL
tetapi ini hanya memberi saya semua baris dari tabel1:
List({&"name":"john","age":33,"isBoy":true}, {> {"nama":"susan","usia":26,"isBoy":false}, {> {"nama":"mike","usia":26,"isBoy":true})
Bagaimana cara membuat jenis gabungan ini di Spark secara efisien?
Saya mencari kueri SQL karena saya harus dapat menentukan kolom yang akan dibandingkan antara dua tabel, tidak hanya membandingkan baris demi baris seperti yang dilakukan dalam pertanyaan lain yang direkomendasikan. Seperti menggunakan subtract, except dll.
Anda dapat menggunakan tipe join "left anti" - baik dengan DataFrame API atau dengan SQL (DataFrame API mendukung segala sesuatu yang didukung SQL, termasuk kondisi join apa pun yang Anda butuhkan):
DataFrame API:
df.as("table1").join(
df2.as("table2"),
$"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",
"leftanti"
)
SQL:
sqlContext.sql(
"""SELECT table1.* FROM table1
| LEFT ANTI JOIN table2
| ON table1.name = table2.name AND table1.age = table2.howold
""".stripMargin)
CATATAN: perlu dicatat juga bahwa ada cara yang lebih singkat dan lebih ringkas untuk membuat data sampel tanpa menentukan skema secara terpisah, menggunakan tupel dan metode toDF
implisit, dan kemudian "memperbaiki &" skema yang secara otomatis di-inferensikan jika diperlukan:
import spark.implicits._
val df = List(
("mike", 26, true),
("susan", 26, false),
("john", 33, true)
).toDF("name", "age", "isBoy")
val df2 = List(
("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),
("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),
("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),
("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),
("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))
).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType))
Anda dapat melakukannya dengan fungsi bawaan except
(Saya akan menggunakan kode yang Anda berikan, tetapi Anda tidak menyertakan impornya, jadi saya tidak bisa melakukannya :( )
val a = sc.parallelize(Seq((1,"a",123),(2,"b",456))).toDF("col1","col2","col3")
val b= sc.parallelize(Seq((4,"a",432),(2,"t",431),(2,"b",456))).toDF("col1","col2","col3")
scala> a.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| 123|
| 2| b| 456|
+----+----+----+
scala> b.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 4| a| 432|
| 2| t| 431|
| 2| b| 456|
+----+----+----+
scala> a.except(b).show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a| 123|
+----+----+----+
Dalam SQL, Anda bisa dengan mudah membuat kueri Anda menjadi seperti di bawah ini (tidak yakin apakah ini berfungsi di SPARK)
Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold where table2.name IS NULL
Ini akan mengembalikan semua baris tabel1 yang joinnya gagal