Развернуть список объектов JSON в DataFrame

У меня есть данные JSON в следующем формате:

{
     "date": 100
     "userId": 1
     "data": [
         {
             "timeStamp": 101,
             "reading": 1
         },
         {
             "timeStamp": 102,
             "reading": 2
         }
     ]
 }
 {
     "date": 200
     "userId": 1
     "data": [
         {
             "timeStamp": 201,
             "reading": 3
         },
         {
             "timeStamp": 202,
             "reading": 4
         }
     ]
 }

Я прочитал это в Spark SQL:

val df = SQLContext.read.json(...)
df.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- data: array (nullable = true)
//  |     |-- element: struct (containsNull = true)
//  |     |    |-- timeStamp: double (nullable = true)
//  |     |    |-- reading: double (nullable = true)

Я хотел бы преобразовать его, чтобы иметь одну строку для чтения. Насколько я понимаю, каждое преобразование должно создавать новый DataFrame, поэтому должно работать следующее:

import org.apache.spark.sql.functions.explode
val exploded = df
    .withColumn("reading", explode(df("data.reading")))
    .withColumn("timeStamp", explode(df("data.timeStamp")))
    .drop("data")
exploded.printSchema
// root
//  |-- date: double (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- timeStamp: double (nullable = true)
//  |-- reading: double (nullable = true)

Результирующая схема верна, но я получаю каждое значение дважды:

exploded.show
// +-----------+-----------+-----------+-----------+
// |       date|     userId|  timeStamp|    reading|
// +-----------+-----------+-----------+-----------+
// |        100|          1|        101|          1|
// |        100|          1|        101|          1|
// |        100|          1|        102|          2|
// |        100|          1|        102|          2|
// |        200|          1|        201|          3|
// |        200|          1|        201|          3|
// |        200|          1|        202|          4|
// |        200|          1|        202|          4|
// +-----------+-----------+-----------+-----------+

Я чувствую, что есть что-то в ленивой оценке двух взрывов, чего я не понимаю.

Есть ли способ заставить приведенный выше код работать? Или я должен использовать другой подход все вместе?


person Sune Andreas Dybro Debel    schedule 28.01.2016    source источник


Ответы (1)


Результирующая схема верна, но я получаю каждое значение дважды

Хотя схема верна, предоставленный вами вывод не отражает фактический результат. На практике вы получите декартово произведение timeStamp и reading для каждой входной строки.

Я чувствую, что есть что-то в ленивом вычислении

Нет, это не имеет ничего общего с ленивой оценкой. То, как вы используете explode, просто неправильно. Чтобы понять, что происходит, давайте отследим выполнение для date, равного 100:

val df100 = df.where($"date" === 100)

шаг за шагом. Сначала explode сгенерирует две строки, одну для 1 и одну для 2:

val df100WithReading = df100.withColumn("reading", explode(df("data.reading")))

df100WithReading.show
// +------------------+----+------+-------+
// |              data|date|userId|reading|
// +------------------+----+------+-------+
// |[[1,101], [2,102]]| 100|     1|      1|
// |[[1,101], [2,102]]| 100|     1|      2|
// +------------------+----+------+-------+

Второе разнесение создает две строки (timeStamp равно 101 и 102) для каждой строки из предыдущего шага:

val df100WithReadingAndTs = df100WithReading
  .withColumn("timeStamp", explode(df("data.timeStamp")))

df100WithReadingAndTs.show
// +------------------+----+------+-------+---------+
// |              data|date|userId|reading|timeStamp|
// +------------------+----+------+-------+---------+
// |[[1,101], [2,102]]| 100|     1|      1|      101|
// |[[1,101], [2,102]]| 100|     1|      1|      102|
// |[[1,101], [2,102]]| 100|     1|      2|      101|
// |[[1,101], [2,102]]| 100|     1|      2|      102|
// +------------------+----+------+-------+---------+

Если вы хотите получить правильные результаты explode данных и select впоследствии:

val exploded = df.withColumn("data", explode($"data"))
  .select($"userId", $"date",
    $"data".getItem("reading"),  $"data".getItem("timestamp"))

exploded.show
// +------+----+-------------+---------------+
// |userId|date|data[reading]|data[timestamp]|
// +------+----+-------------+---------------+
// |     1| 100|            1|            101|
// |     1| 100|            2|            102|
// |     1| 200|            3|            201|
// |     1| 200|            4|            202|
// +------+----+-------------+---------------+
person zero323    schedule 29.01.2016
comment
сокращенная форма для выбора внутреннего поля также будет работать: df.withColumn("data",explode($"data")).select($"data.reading", $"data.timestamp", $"date",$"userId").foreach(println) - person Roberto Congiu; 30.01.2016