Проблема неизменяемости Apache Spark RDD[Vector]

Я знаю, что RDD неизменяемы, и поэтому их значение нельзя изменить, но я вижу следующее поведение:

Я написал реализацию для алгоритма FuzzyCMeans (https://github.com/salexln/FinalProject_FCM), и теперь я Я тестирую это, поэтому я запускаю следующий пример:

import org.apache.spark.mllib.clustering.FuzzyCMeans
import org.apache.spark.mllib.linalg.Vectors

val data = sc.textFile("/home/development/myPrjects/R/butterfly/butterfly.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
> parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[2] at map at <console>:31

val numClusters = 2
val numIterations = 20


parsedData.foreach{ point => println(point) }
> [0.0,-8.0]
[-3.0,-2.0]
[-3.0,0.0]
[-3.0,2.0]
[-2.0,-1.0]
[-2.0,0.0]
[-2.0,1.0]
[-1.0,0.0]
[0.0,0.0]
[1.0,0.0]
[2.0,-1.0]
[2.0,0.0]
[2.0,1.0]
[3.0,-2.0]
[3.0,0.0]
[3.0,2.0]
[0.0,8.0] 

val clusters = FuzzyCMeans.train(parsedData, numClusters, numIteration
parsedData.foreach{ point => println(point) }
> 
[0.0,-0.4803333185624595]
[-0.1811743096972924,-0.12078287313152826]
[-0.06638890786148487,0.0]
[-0.04005925925925929,0.02670617283950619]
[-0.12193263222069807,-0.060966316110349035]
[-0.0512,0.0]
[NaN,NaN]
[-0.049382716049382706,0.0]
[NaN,NaN]
[0.006830134553650707,0.0]
[0.05120000000000002,-0.02560000000000001]
[0.04755220304297078,0.0]
[0.06581619798335057,0.03290809899167529]
[0.12010867103812725,-0.0800724473587515]
[0.10946638900458144,0.0]
[0.14814814814814817,0.09876543209876545]
[0.0,0.49119985188436205] 

Но как может быть так, что мой метод изменяет Immutable RDD?

Кстати, сигнатура метода поезда следующая:

поезд (данные: RDD[Vector], кластеры: Int, maxIterations: Int)


person Alex L    schedule 29.12.2015    source источник
comment
Не могли бы вы принять один из ответов или объяснить, почему они не работают для вас, чтобы их можно было улучшить? Заранее спасибо.   -  person zero323    schedule 25.04.2016


Ответы (2)


То, что вы делаете, точно описано в документы:

Печатные элементы RDD

Другой распространенной идиомой является попытка распечатать элементы RDD с помощью rdd.foreach(println) или rdd.map(println). На одной машине это сгенерирует ожидаемый результат и напечатает все элементы RDD. Однако в кластерном режиме вывод на стандартный вывод, вызываемый исполнителями, теперь вместо этого записывается в стандартный вывод исполнителя, а не в драйвер, поэтому стандартный вывод драйвера их не покажет! Чтобы напечатать все элементы в драйвере, можно использовать метод collect(), чтобы сначала передать RDD в узел драйвера, таким образом: rdd.collect().foreach(println). Однако это может привести к тому, что драйверу не хватит памяти, потому что collect() извлекает весь RDD на одну машину; если вам нужно напечатать только несколько элементов RDD, более безопасным подходом является использование функции take(): rdd.take(100).foreach(println).

Таким образом, поскольку данные могут мигрировать между узлами, одинаковый вывод foreach не гарантируется. RDD неизменяем, но вы должны извлекать данные соответствующим образом, так как у вас нет всего RDD на вашем узле.


Другая возможная проблема (не в вашем случае, поскольку вы используете неизменяемый вектор) заключается в использовании изменяемых данных внутри Point iself, что совершенно неверно, поэтому вы потеряете все гарантии - однако сам RDD по-прежнему будет неизменным.

person dk14    schedule 29.12.2015

Чтобы RDD был полностью неизменным, его содержимое также должно быть неизменным:

scala> val m = Array.fill(2, 2)(0)
m: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))

scala> val rdd = sc.parallelize(m)
rdd: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[1]
at parallelize at <console>:23

scala> rdd.collect()
res6: Array[Array[Int]] = Array(Array(0, 0), Array(0, 0))

scala> m(0)(1) = 2

scala> rdd.collect()
res8: Array[Array[Int]] = Array(Array(0, 2), Array(0, 0)) 

поэтому, поскольку массив изменчив, я мог его изменить, и поэтому RDD был обновлен новыми данными.

person Alex L    schedule 29.12.2015