Spark: групповой запрос RDD Sql

У меня есть 3 RDD, к которым мне нужно присоединиться.

val event1001RDD: schemaRDD = [тип события, идентификатор, местоположение, дата1]

[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]

val event2009RDD: schemaRDD = [eventtype,id,celltype,date1] (не сгруппировано по идентификатору, так как мне нужно 4 даты из этого в зависимости от типа ячейки)

[2009,4929101,R01,2015-01-20 20:44:39]
[2009,4929102,R02,2015-01-20 14:00:00] (RPM)
[2009,4929102,P01,2015-01-20 12:00:00] (PPM)
[2009,4929102,R03,2015-01-20 15:00:00] (RPM)
[2009,4929102,C01,2015-01-20 13:00:00] (RPM)
[2009,4929103,R01,2015-01-20 14:44:39]
[2009,4929105,R01,2015-01-20 12:44:39]
[2009,4929105,V01,2015-01-20 11:44:39]
[2009,4929106,R01,2015-01-20 13:44:39]

val cellLookupRDD: [тип ячейки, имя ячейки] (имя ячейки имеет 4 значения)

[R01,RPM]
[R02,RPM]
[R03,RPM]
[C01,RPM]
[P01,PPM]
[V01,PPM]

Ожидаемый результат: [id, местонахождение 1001, дата 1001, дата первого RPM 2009, дата последнего RPM 2009, дата первого PPM 2009, дата последнего PPM 2009]

4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
4929102,LOC01,2015-01-20 10:44:39,2015-01-20 13:00:00,2015-01-20 15:00:00,2015-01-20 12:00:00,NULL
4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,NULL,NULL,NULL
4929104,LOC03,2015-01-20 10:44:39,NULL,NULL,NULL,NULL
4929105,NULL,NULL,2015-01-20 12:44:39,NULL,2015-01-20 11:44:39,NULL
4929106,NULL,NULL,2015-01-20 13:44:39,NULL,NULL,NULL

Это мой текущий запрос (где я также указываю необязательный тип события в качестве первого столбца, но в моем предыдущем событии 2009RDD я выбираю минимальную и максимальную дату, что неверно, поскольку мне нужны четыре даты, определенные через cellLookupRDD - RPM и PPM) :

select if(event1001Table.eventtype is not null, event1001Table.eventtype,
          event2009Table.eventtype), 
       if(event1001Table.id is not null, event1001Table.id, 
          event2009Table.id), 
       event1001Table.date1, event2009Table.minDate, event2009Table.maxDate  
       from event1001Table full outer join event2009Table  
       on event1001Table.id=event2009Table.id")

EDITED, чтобы показать результат после применения ответа:

  " min(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmn, " +
  " max(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmx, " +
  " min(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmn, " +
  " max(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmx " +


[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
---res: [2009,4929102,NULL,NULL,2015-01-20 13:00:00,2015-01-20 14:00:00] 
--- CORRECT

[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,P01,2015-01-20 14:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 14:00:00,NULL] 
--- INCORRECT (max should be equal to MIN although NULL is preferred if possible but I could just check in the code later on if min=max)

[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
[2009,4929102,P01,2015-01-20 09:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 09:00:00,NULL] 
--- INCORRECT (max is not working)

person sophie    schedule 27.05.2015    source источник


Ответы (1)


Давайте работать это шаг за шагом. Давайте сначала создадим часть 2009 года

event2009RDD.registerTempTable("base2009")
cellLookupRDD.registerTempTable("lookup")

trns2009 = ssc.sql("select eventtype, id, \
                          min(case when l.cn = 'RPM' then r.date1 else null end) rpmmn, \
max(case when l.cn = 'RPM' then r.date1 else null end) rpmmx, \
min(case when l.cn = 'PPM' then r.date1 else null end) ppmmn, \
max(case when l.cn = 'PPM' then r.date1 else null end) ppmmx, \
from base2009 r inner join lookup l on r.celltype=l.celltype \
group by eventtype,id "

trns2009 .registerTempTable("transformed2009")

Теперь вы можете выполнить полное внешнее соединение с набором данных 1001 и получить результат.

Примечание: у вас не должно быть

4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL

вместо этого вы должны иметь

4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39,NULL,NULL

Потому что, если событие 2009 года произошло один раз, то оно должно иметь и первую, и последнюю дату. NULL должен означать, что событие никогда не происходило, например, для id=4929101, тип ячейки=PPM.

Пожалуйста, дайте мне знать, если это работает (или нет). У меня нет доступа к искре прямо сейчас, но я должен быть в состоянии отладить, если это необходимо, сегодня вечером.

person ayan guha    schedule 27.05.2015
comment
привет, аян, агрегат MAX не работает, если событие имеет оба имени ячейки (PPM и RPM). Отредактировал свой пост, чтобы показать результаты. спасибо - person sophie; 27.05.2015
comment
Пожалуйста, измените на null вместо «NULL», вы выполняете сравнение строк. Или измените значение по умолчанию на старую дату, например «1900-01-01 00:00:00». - person ayan guha; 27.05.2015
comment
привет, аян, да, теперь это работает. Я использовал оператор case when, поскольку IF не поддерживает значение null. Спасибо еще раз! :) - person sophie; 27.05.2015
comment
Рад помочь :) - person ayan guha; 27.05.2015