Как я могу получить позицию элемента в RDD Spark?

Я новичок в Apache Spark и знаю, что основной структурой данных является RDD. Сейчас я пишу несколько приложений, которым требуется информация о положении элементов. Например, после преобразования ArrayList в (Java)RDD для каждого целого числа в RDD мне нужно знать его (глобальный) индекс массива. Возможно ли это сделать?

Насколько мне известно, для RDD существует функция take(int), поэтому я считаю, что позиционная информация все еще сохраняется в RDD.


person SciPioneer    schedule 25.09.2014    source источник


Ответы (2)


По сути, метод RDD zipWithIndex() делает это, но он не сохраняет исходный порядок данных, из которых был создан RDD. По крайней мере, вы получите стабильный заказ.

val orig: RDD[String] = ...
val indexed: RDD[(String, Long)] = orig.zipWithIndex()

Причина, по которой вы вряд ли найдете что-то, сохраняющее порядок в исходных данных, скрыта в документе API для zipWithIndex():

«Заархивирует этот RDD с его индексами элементов. Порядок сначала основан на индексе раздела, а затем порядок элементов в каждом разделе. Таким образом, первый элемент в первом разделе получает индекс 0, а последний элемент в последнем разделе получает самый большой индекс. Это похоже на zipWithIndex в Scala, но в качестве типа индекса он использует Long вместо Int. Этот метод должен запускать искровое задание, когда этот RDD содержит более одного раздела».

Таким образом, похоже, что первоначальный заказ отброшен. Если для вас важно сохранить исходный порядок, похоже, вам нужно добавить индекс перед созданием RDD.

person Spiro Michaylov    schedule 25.09.2014
comment
Да, добавление индекса массива в качестве дополнительного атрибута перед созданием RDD может решить эту проблему. Однако есть два серьезных ограничения: 1) Очевидно, что этот дополнительный атрибут индекса как минимум удвоит стоимость хранения, а такая стоимость может быть и больше, например, в целочисленном/плавающем массиве для индекса добавляется поле long int. 2) Поскольку добавление дополнительных значений индекса не может быть загружено в Spark, такое преобразование данных также не может быть распараллелено Spark. Таким образом, мне приходится задействовать другие параллельные методы для добавления index. - person SciPioneer; 26.09.2014

Я считаю, что в большинстве случаев zipWithIndex() поможет и сохранит порядок. Прочтите комментарии еще раз. Насколько я понимаю, это как раз и означает поддерживать порядок в RDD.

scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3)
scala> val r2 = r1.zipWithIndex
scala> r2.foreach(println)
(c,2)
(d,3)
(e,4)
(f,5)
(g,6)
(a,0)
(b,1)

Пример выше подтверждает это. Красный имеет 3 раздела: a с индексом 0, b с индексом 1 и т. д.

person zhang zhan    schedule 28.09.2014
comment
Спасибо за Ваш ответ! В большинстве случаев этот метод неплох, так как элемент входного массива/списка может быть относительно большим объектом. Однако это может быть проблемой для массивов примитивных типов, например целочисленного массива, потому что это, казалось бы, единственное решение весьма неэффективно с точки зрения затрат как на вычисления, так и на хранение. В любом случае, я очень доволен вашим ответом. Я надеюсь, что когда-нибудь естественное сохранение индекса без (zipWithIndex) станет правдой для RDD Spark. - person SciPioneer; 28.09.2014
comment
Основываясь на дизайне Spark, я не могу представить хороший способ поддерживать индекс элемента без ущерба для хранилища. - person zhang zhan; 29.09.2014