Laut Learning Spark
Beachten Sie, dass die Neupartitionierung Ihrer Daten ein ziemlich teurer Vorgang ist. Spark verfügt auch über eine optimierte Version von repartition() namens coalesce(), mit der Datenbewegungen vermieden werden können, aber nur, wenn Sie die Anzahl der RDD-Partitionen verringern.
Ein Unterschied besteht darin, dass mit repartition() die Anzahl der Partitionen erhöht/verringert werden kann, während mit coalesce() die Anzahl der Partitionen nur verringert werden kann.
Wenn die Partitionen auf mehrere Rechner verteilt sind und coalesce() ausgeführt wird, wie können dann Datenbewegungen vermieden werden?
Dadurch wird eine vollständige Umschichtung vermieden. Wenn bekannt ist, dass die Anzahl der Partitionen abnimmt, kann der Executor die Daten sicher auf der minimalen Anzahl von Partitionen halten und nur die Daten von den zusätzlichen Knoten auf die Knoten verschieben, die wir behalten haben.
Es würde also in etwa so ablaufen:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
Dann koaleszieren
auf 2 Partitionen:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
Beachten Sie, dass Knoten 1 und Knoten 3 ihre ursprünglichen Daten nicht verschieben mussten.
Justin's Antwort ist großartig und diese Antwort geht mehr in die Tiefe.
Der "Repartition"-Algorithmus führt einen vollständigen Shuffle durch und erstellt neue Partitionen mit gleichmäßig verteilten Daten. Lassen Sie uns einen DataFrame mit den Zahlen von 1 bis 12 erstellen.
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf
enthält auf meinem Rechner 4 Partitionen.
numbersDf.rdd.partitions.size // => 4
Hier sehen Sie, wie die Daten auf die Partitionen aufgeteilt sind:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
Führen wir einen vollständigen Shuffle mit der Methode repartition
durch und erhalten diese Daten auf zwei Knoten.
val numbersDfR = numbersDf.repartition(2)
Hier sehen Sie, wie die numbersDfR
-Daten auf meinem Rechner partitioniert sind:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
Die Methode "repartition" erstellt neue Partitionen und verteilt die Daten gleichmäßig in den neuen Partitionen (bei größeren Datensätzen ist die Datenverteilung gleichmäßiger).
Unterschied zwischen "coalesce" und "repartition "
Coalesce" verwendet vorhandene Partitionen, um die Menge der zu mischenden Daten zu minimieren. repartition
erstellt neue Partitionen und führt eine vollständige Durchmischung durch. Coalesce" führt zu Partitionen mit unterschiedlichen Datenmengen (manchmal Partitionen mit sehr unterschiedlichen Größen) und "Repartition" führt zu ungefähr gleich großen Partitionen.
**Ist coalesce
oder repartition
schneller?
Coalesce" kann schneller laufen als "Repartition", aber ungleich große Partitionen sind im Allgemeinen langsamer zu bearbeiten als gleich große Partitionen. Normalerweise müssen Sie Datensätze nach dem Filtern eines großen Datensatzes neu partitionieren. Ich habe festgestellt, dass "repartition" insgesamt schneller ist, da Spark für die Arbeit mit gleichgroßen Partitionen ausgelegt ist.
[Lesen Sie diesen Blog-Beitrag (https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4#.36o8a7b5j), wenn Sie noch mehr Details erfahren möchten.
Ein zusätzlicher Punkt, der hier zu beachten ist, ist, dass das Grundprinzip von Spark RDD die Unveränderlichkeit ist. Durch die Neupartitionierung oder das Koaleszieren wird ein neues RDD erstellt. Das Basis-RDD wird mit seiner ursprünglichen Anzahl von Partitionen weiter existieren. Wenn der Anwendungsfall erfordert, dass das RDD im Cache gespeichert wird, muss das gleiche für das neu erstellte RDD getan werden.
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2