Группа PysparkBy Pivot Transformation

Мне трудно сформулировать следующую манипуляцию с данными Pyspark.

По сути, я пытаюсь сгруппировать по категориям, а затем развернуть/разбить подкатегории и добавить новые столбцы.

введите здесь описание изображения

Я пробовал несколько способов, но они очень медленные и не используют параллелизм Spark.

Вот мой существующий (медленный, подробный) код:

from pyspark.sql.functions import lit

df = sqlContext.table('Table')

#loop over category
listids = [x.asDict().values()[0] for x in df.select("category").distinct().collect()]
dfArray = [df.where(df.category == x) for x in listids]
for d in dfArray:
  #loop over subcategory
  listids_sub = [x.asDict().values()[0] for x in d.select("sub_category").distinct().collect()]
  dfArraySub = [d.where(d.sub_category == x) for x in listids_sub]
  num = 1

  for b in dfArraySub:
    #renames all columns to append a number
    for c in b.columns:
      if c not in ['category','sub_category','date']:
        column_name = str(c)+'_'+str(num)
        b = b.withColumnRenamed(str(c), str(c)+'_'+str(num))
        b = b.drop('sub_category')
    num += 1
    #if no df exists, create one and continually join new columns
    try:
      all_subs = all_subs.drop('sub_category').join(b.drop('sub_category'), on=['cateogry','date'], how='left')
    except:
      all_subs = b

  #Fixes missing columns on union
  try:
    try:
      diff_columns = list(set(all_cats.columns) - set(all_subs.columns))
      for d in diff_columns:
        all_subs = all_subs.withColumn(d, lit(None))
      all_cats = all_cats.union(all_subs)
    except:
      diff_columns = list(set(all_subs.columns) - set(all_cats.columns))
      for d in diff_columns:
        all_cats = all_cats.withColumn(d, lit(None))
      all_cats = all_cats.union(all_subs)

  except Exception as e:
    print e
    all_cats = all_subs

Но это очень медленно. Любое руководство будет с благодарностью!


person Eric Valente    schedule 22.02.2018    source источник
comment
В чем логика? как метрики кровати должны быть в продажах_1, а не в продажах_2?   -  person Steven    schedule 22.02.2018
comment
Можете ли вы объяснить логику вашего кода?   -  person Steven    schedule 22.02.2018
comment
Я группирую по категориям, а затем собираю в стрелку — по сути, разбивая большой df на кучу маленьких df только с одной категорией. Затем я делаю то же самое для этих меньших df, но по подкатегориям. Это очень дорого с .collect() - это гораздо больше кода в стиле панд. Я рад отказаться от всего этого и пойти с гораздо более ориентированным на искру подходом, как вы написали ниже.   -  person Eric Valente    schedule 22.02.2018


Ответы (1)


Ваш вывод не совсем логичен, но мы можем добиться этого результата, используя функцию поворота. Вам нужно уточнить свои правила, иначе я вижу много случаев, когда это может потерпеть неудачу.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df.show()

+----------+---------+------------+------------+------------+
|      date| category|sub_category|metric_sales|metric_trans|
+----------+---------+------------+------------+------------+
|2018-01-01|furniture|         bed|         100|          75|
|2018-01-01|furniture|       chair|         110|          85|
|2018-01-01|furniture|       shelf|          35|          30|
|2018-02-01|furniture|         bed|          55|          50|
|2018-02-01|furniture|       chair|          45|          40|
|2018-02-01|furniture|       shelf|          10|          15|
|2018-01-01|      rug|      circle|           2|           5|
|2018-01-01|      rug|      square|           3|           6|
|2018-02-01|      rug|      circle|           3|           3|
|2018-02-01|      rug|      square|           4|           5|
+----------+---------+------------+------------+------------+



df.withColumn("fg", F.row_number().over(Window().partitionBy('date', 'category').orderBy("sub_category"))).groupBy('date', 'category', ).pivot('fg').sum('metric_sales', 'metric_trans').show()

+----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|      date| category|1_sum(CAST(`metric_sales` AS BIGINT))|1_sum(CAST(`metric_trans` AS BIGINT))|2_sum(CAST(`metric_sales` AS BIGINT))|2_sum(CAST(`metric_trans` AS BIGINT))|3_sum(CAST(`metric_sales` AS BIGINT))|3_sum(CAST(`metric_trans` AS BIGINT))|
+----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
|2018-02-01|      rug|                                    3|                                    3|                                    4|                                    5|                                 null|                                 null|
|2018-02-01|furniture|                                   55|                                   50|                                   45|                                   40|                                   10|                                   15|
|2018-01-01|furniture|                                  100|                                   75|                                  110|                                   85|                                   35|                                   30|
|2018-01-01|      rug|                                    2|                                    5|                                    3|                                    6|                                 null|                                 null|
+----------+---------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+
person Steven    schedule 22.02.2018
comment
Большое спасибо за быстрый ответ. Я согласен - есть несколько сценариев отказа. Еще больше усложняет ситуацию то, что некоторые категории имеют разное количество подкатегорий. Я собираюсь попробовать это и сообщу. - person Eric Valente; 22.02.2018
comment
Так что это не удается, когда у меня есть 2 категории. Таким образом, у меня может быть категория ковров с квадратными и круглыми подкатами. Мысли? Я знаю, это кажется неортодоксальным преобразованием, но это то, что мне нужно. - person Eric Valente; 22.02.2018
comment
Я обновил свой исходный набор данных, чтобы он был более репрезентативным для данных, с которыми я работаю, спасибо. - person Eric Valente; 22.02.2018
comment
Попробуйте изменить partitionBy('sub_category') на partitionBy('date', 'category', 'sub_category') - person Steven; 22.02.2018
comment
Таким образом, проблема в том, что поворот изменяет столбцы на: bed_sum (CAST (metric_sales AS BIGINT)) vs. metric_sales_1. Таким образом, таблица становится очень широкой (столбец для каждой сводной подкатегории), когда я хочу, чтобы они выглядели как мои выходные данные Excel вверху - сложены по метрике 1, метрике 2 и т. д. - person Eric Valente; 22.02.2018
comment
БЛЕСТЯЩИЙ - спасибо. Работает отлично. На самом деле не могу отблагодарить вас достаточно. 45 минут до 30 секунд. - person Eric Valente; 22.02.2018