Spark сортирует RDD и вступает в их ряды

У меня есть RDD[(VertexId, Double)], и я хочу отсортировать его по _._2 и соединить индекс (ранг) с этим RDD. Поэтому я могу получить элемент и его ранг на filter.

В настоящее время я сортирую СДР по sortBy, но не знаю, как соединить СДР с его рангом. Поэтому я собираю его как последовательность и архивирую с индексом. Но это неэффективно. Мне интересно, есть ли более элегантный способ сделать это.

Код, который я использую прямо сейчас:

val tmpRes = graph.vertices.sortBy(_._2, ascending = false) // Sort all nodes by its PR score in descending order
      .collect() // collect to master, this may be very expensive

    tmpRes.zip(tmpRes.indices) // zip with index

person bxshi    schedule 03.03.2015    source источник


Ответы (1)


если по какой-то причине вы хотите вернуть драйверу только n первых кортежей, то, возможно, вы могли бы использовать takeOrdered(n, [ordering]), где < strong>n — это количество результатов, которые необходимо вернуть, и порядок компаратора, который вы хотели бы использовать.

В противном случае вы можете использовать преобразование zipWithIndex, которое превратит вас RDD[(VertexId, Double)] в RDD[((VertexId, Double), Long)] с правильным индексом (конечно, вы должны сделать это после сортировки).

Например :

scala> val data = sc.parallelize(List(("A", 1), ("B", 2)))
scala> val sorted = data.sortBy(_._2)
scala> sorted.zipWithIndex.collect()
res1: Array[((String, Int), Long)] = Array(((A,1),0), ((B,2),1))

С уважением,

person Olivier Girardot    schedule 03.03.2015
comment
Спасибо ссабум. Я думал, что zipWithIndex заархивирует index_in_partition + partition_offset. Но после сортировки индекс становится рейтинговым вместо старого индекса. Кстати, я использую top для получения первых n упорядоченных элементов в RDD. Кажется, это работает аналогично takeOrdered. - person bxshi; 03.03.2015