pyspark dataframe отбрасывает повторяющиеся значения с более старой меткой времени

У меня есть фреймворк pyspark, в котором есть столбцы starttime и stoptime с дополнительными столбцами, значения которых обновляются

|startime  |stoptime  |hour  |minute  |sec  |sip          |dip            |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |

В этом примере я хочу выбрать все строки с последним временем остановки, все остальные значения столбцов дублируются.


person user2825083    schedule 07.09.2017    source источник


Ответы (2)


Полагаю, вы хотите вести последние записи на каждый sport. Вы должны использовать оконную функцию для определения последней записи для каждого раздела:

import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("sport").orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn")

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

У вас будет столько записей, сколько различных разделов для sport.

Если вам нужен последний stoptime для всей таблицы без разделения, вы можете удалить partitionBy и использовать вместо него dense_rank (одинаковые значения будут иметь одинаковый ранг):

w = Window.orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.dense_rank().over(w)).filter("rn = 1").drop("rn").show()

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
person MaFF    schedule 09.09.2017
comment
Мне нужны строки с одинаковым временем начала, часами, минутами, секундами, sip, dip, sport, dport, proto, но с последним временем остановки. Это, в свою очередь, даст мне max byt и pkt - person user2825083; 11.09.2017
comment
Если вам нужно максимальное значение stoptime для каждого отдельного startime, hour, min, sec, sip, dip, sport, dport, proto. вам просто нужно, чтобы все эти столбцы в partitionBy - person MaFF; 11.09.2017
comment
Выглядит правильно? import pyspark.sql.functions as psf from pyspark.sql import Window w = Window.partitionBy("startime","hour","sip","dip","sport","dport","proto").orderBy(psf.desc("stoptime")) df = dataframe.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn") df.show() - person user2825083; 11.09.2017
comment
Это выглядит правильно, проверьте результат, чтобы убедиться, что он соответствует вашим ожиданиям. - person MaFF; 11.09.2017

Надеюсь это поможет!

from pyspark.sql.functions import col

df = sc.parallelize([(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0)]).\
    toDF(["startime","stoptime","hour","min","sec","sip","dip","sport","dport","proto","pkt","byt"])

df1 = df.where(col("stoptime") == df.select("stoptime").rdd.max()[0]).distinct()
df1.show()

Выход

+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
|1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
person 1.618    schedule 07.09.2017
comment
Только что добавлено выделение кода (Python) - ищите ‹! - language-all: lang-python -› в начале кода ... - person desertnaut; 07.09.2017
comment
@ user2825083, вы должны отметить его как правильный ответ, если он ответил на ваш запрос, поскольку это поможет другим в случае, если они столкнутся с аналогичной проблемой в будущем. Спасибо! - person 1.618; 08.09.2017
comment
сбор значений pyspark в объекты python может привести к неожиданным результатам для float, поскольку они не аппроксимируются одинаково. Я бы не рекомендовал это делать. - person MaFF; 09.09.2017
comment
@ Мэри может объяснить поподробнее, если ты не против? Я не могу найти здесь неискровых предметов? - person 1.618; 09.09.2017
comment
df.select("stoptime").rdd.max()[0] - это питон int - person MaFF; 09.09.2017
comment
Вы также конвертируете его в rdd для агрегации, которая менее эффективна, чем агрегация фреймов данных. - person MaFF; 09.09.2017
comment
@Marie Я думаю, что мы всегда должны говорить разумно и быть готовыми защищать любые теоретические предположения, прежде чем что-то публиковать (например, сначала вы говорите о float, затем int, затем rdd vs df). Разве мы не должны сосредоточиться на помощи членам сообщества, а не на такого рода диалогах? (Кстати, это эталонное тестирование может заинтересовать ты). Пожалуйста, не пугайте ОП таким количеством комментариев. Будь спортом !!! - person 1.618; 10.09.2017
comment
о нет, я думаю, сообщения могут вводить в заблуждение ... Я не собирался показаться унизительным, я просто пытался помочь. Я потратил время, пытаясь найти воспроизводимый пример для float, это проблема, которую я должен был решить для моего коллеги, и я не мог ее воспроизвести. Я также столкнулся с этой веткой stackoverflow.com/questions/46122846/ пару дней назад, поэтому я знал, что другие люди столкнулись с этой проблемой преобразования. Я просто советовал не переключаться между питоном и искрой, если этого можно избежать. Прости еще раз - person MaFF; 10.09.2017
comment
Спасибо за предложения. Это тоже сработает? from pyspark.sql.functions import col, max as max_ dataframe.groupBy(dataframe.trhour, dataframe.trminute, dataframe.trsec, dataframe.starttime, dataframe.sip, dataframe.dip, dataframe.sport, dataframe.dport, dataframe.proto, dataframe.ipkt, dataframe.ibyt).agg(max_('stoptime')) - person user2825083; 11.09.2017
comment
@ user2825083 Это должно сработать, но разве это не даст строк с максимальным значением в каждой группе? - person 1.618; 11.09.2017
comment
@Prem Это решение предоставит только строки с максимальным временем остановки. - person user2825083; 11.09.2017
comment
Но в своем исходном посте вы прозвучали иначе. Похоже, случай неправильного сбора требований :) - Я думал, что вы хотите, чтобы строки имели последнее время остановки (независимо от группы !!!). Я бы посоветовал правильно обновить вопрос, указав ожидаемый результат, чтобы другие пользователи не запутались при поиске ответа на свою проблему. Спасибо! - person 1.618; 11.09.2017