Spark RDD: установить разницу

val data: RDD [(String, Array[Int])] = sc.parallelize(Seq(
  ("100",Array(1, 2, 3, 4, 5)), ("1000",Array(10, 11, 12, 13, 14))
))

val codes = sc.parallelize(Seq(2, 3, 12, 13))

val result = data.map {case (id,values) => (id, values.diff(codes))}

Я хотел бы получить результат как:

val result: Array[(String, Array[Int])] = Array(
  ("100", Array(1, 4, 5)), ("1000", Array(10, 11, 14))
)

Однако, когда я делаю разницу в наборе, я получаю ошибку несоответствия типа.


person user3803714    schedule 20.01.2016    source источник
comment
В вашем коде чего-то не хватает... что такое dummy_data_sorted?   -  person Justin Pihony    schedule 21.01.2016
comment
Извините, обновил код.   -  person user3803714    schedule 21.01.2016


Ответы (1)


Если вы хотите применить операцию к локальной структуре данных, нет причин распараллеливать codes. Просто mapValues вот так:

val codes = Seq(2, 3, 12, 13)
val result = data.mapValues(_.diff(codes))

Если коды не помещаются в память, вам придется сделать что-то более сложное:

// Add dummy values to codes
val codes = sc.parallelize(Seq(2, 3, 12, 13)).map((_, null))

data
  .flatMapValues(x => x)  // Flatten values (k, vs) => (k, v)
  .map(_.swap) // Swap order => (v, k)
  .subtractByKey(codes) // Difference
  .map(_.swap) // Swap order => (k, v)
  .groupByKey  // Group => (k, vs)
person zero323    schedule 20.01.2016
comment
Я бы также сделал codes широковещательной переменной в вашем первом примере. Ваш второй пример, я думаю, не более эффективен, чем groupByKey, join и т. д., которые кажутся более интуитивно понятными при использовании. - person Radu Ionescu; 21.01.2016
comment
@RaduIonescu Если code относительно велико, то конечно. Если он маленький, как в примере, то ИМХО не стоит суеты. По поводу вашего второго замечания. Не могли бы вы привести пример? - person zero323; 21.01.2016
comment
data.flatMapValues(x => x) .map(_.swap).leftOuterJoin(codes).map(...) или data.flatMapValues(x => x) .map(_.swap).groupByKey().filter(...).map(). Может показаться, что groupByKey выводит больше кортежей, но из-за планировщика DAG операции фильтрации и сопоставления будут выполняться на основе кортежа, и вы получите тот же результат. - person Radu Ionescu; 21.01.2016