Сводная нечисловая таблица в Spark Scala

Можно ли составить сводную таблицу с нечисловыми значениями в Spark Scala? Я рассмотрел следующие два вопроса о стеке.

Как повернуть DataFrame?

Список в инструкции Case-When в Spark SQL

Следуя шагам в вопросе «Список в случае - когда», я могу преобразовать свои данные так, чтобы каждый тип данных был столбцом, но для каждой комбинации типа данных сущности существовала строка.

id    tag    value
1     US     foo
1     UK     bar
1     CA     baz
2     US     hoo
2     UK     hah
2     CA     wah

id    US    UK    CA
1     foo
1           bar
1                 baz
2     hoo
2           hah
3                 wah

Есть ли функция «first non-null», которая может свернуть несколько строк для каждой сущности в одну?

id    US    UK    CA
1     foo   bar   baz
2     hoo   hah   wah

person John Todd    schedule 21.09.2015    source источник


Ответы (2)


Вы можете рассмотреть метод aggregate (или aggregateByKey). Вам просто нужно написать соответствующие функции, чтобы получить ненулевой элемент в каждой позиции.

person mgaido    schedule 21.09.2015
comment
Я надеялся найти для этого метод DataFrame, но, в конце концов, я вернулся к RDD Tuples и aggregateByKey. Спасибо за предложение. - person John Todd; 05.10.2015
comment
любой фрагмент кода доступен, чтобы понять, как это работает? Благодарность - person user299791; 29.02.2016
comment
Извините, @ user299791, я перешел в другой проект и видел ваш вопрос только пару недель назад. Затем мне потребовалось время, чтобы отследить исходный код, который я использовал для решения проблемы, и очистить его настолько, чтобы поделиться. Я отправлю еще один ответ с полным примером класса. - person John Todd; 05.12.2016

Вот полный класс Scala, который создает образец фрейма данных, а затем разворачивает его. Он специфичен для этой проблемы, поэтому я не знаю, насколько он будет полезен в целом. Также не был тщательно протестирован, поэтому покупатель остерегается.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object DemoPivot {

  def main(args: Array[String]) = {

    def pivotColumn(df: DataFrame)(t: String): Column = {
      val col = when(df("tag") <=> lit(t), df("value"))
      col.alias(t)
    }

    def pivotFrame(sqlContext: SQLContext, df: DataFrame): DataFrame = {
      val tags = df.select("tag").distinct.map(r => r.getString(0)).collect.toList
      df.select(df("id") :: tags.map(pivotColumn(df)): _*)
    }

    def aggregateRows(value: Seq[Option[Any]], agg: Seq[Option[Any]]): Seq[Option[Any]] = {
      for (i <- 0 until Math.max(value.size, agg.size)) yield i match {
        case x if x > value.size => agg(x)
        case y if y > agg.size => value(y)
        case z if value(z).isEmpty => agg(z)
        case a => value(a)
      }
    }

    def collapseRows(sqlContext: SQLContext, df: DataFrame): DataFrame = {
      // RDDs cannot have null elements, so pack into Options and unpack before returning
      val rdd = df.map(row => (Some(row(0)), row.toSeq.tail.map(element => if (element == null) None else Some(element))))
      val agg = rdd.reduceByKey(aggregateRows)
      val aggRdd = agg.map{ case (key, list) => Row.fromSeq((key.get) :: (list.map(element => if (element.isDefined) element.get else null)).toList) }
      sqlContext.createDataFrame(aggRdd, df.schema)
    }

    val conf = new SparkConf().setAppName("Simple Pivot Demo")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val data = List((1, "US", "foo"), (1, "UK", "bar"), (1, "CA", "baz"),
                    (2, "US", "hoo"), (2, "UK", "hah"), (2, "CA", "wah"))
    val rows = data.map(d => Row.fromSeq(d.productIterator.toList))
    val fields = Array(StructField("id", IntegerType, nullable = false),
                       StructField("tag", StringType, nullable = false),
                       StructField("value", StringType, nullable = false))
    val df = sqlContext.createDataFrame(sc.parallelize(rows), StructType(fields))
    df.show()

    val pivoted = pivotFrame(sqlContext, df)
    pivoted.show()

    val collapsed = collapseRows(sqlContext, pivoted)
    collapsed.show()
  }
}
person John Todd    schedule 05.12.2016