Как выполнять udfs для нескольких столбцов динамически

У меня есть 30 столбцов, таких как DPF_1,DPF_2,DPF_3......DPF_30, к которым мне нужно применить кадры данных. Все 30 столбцов имеют тип данных String. Мое требование состоит в том, чтобы преобразовать все значения «Na», присутствующие в этих 30 столбцах, в «null».

Я пробовал код ниже, но он не динамический.

def udf_A(x:StringType()):
    if x == "Na": return "null"
    else:return x
udf_B = udf(udf_A, StringType())

df.withColumn("DPF_1" udf_B("DPF_1"))
df.withColumn("DPF_2" udf_B("DPF_2"))
.
.
repeated till DPF_30 

Теперь я хочу, чтобы этот процесс выполнялся динамически в pyspark/scala, потому что более поздние столбцы могут увеличиваться с разными именами столбцов.


person Community    schedule 16.01.2018    source источник
comment
Вы всегда можете использовать regexp_replace для преобразования Na в null   -  person philantrovert    schedule 16.01.2018
comment
@philantrovert, у меня много других требований. что можно легко сделать с помощью UDFS   -  person    schedule 16.01.2018
comment
Итак, вы пытаетесь преобразовать Na в Null и сделать кучу других вещей с помощью одного UDF?   -  person philantrovert    schedule 16.01.2018
comment
@ филантроверт: да   -  person    schedule 16.01.2018


Ответы (4)


Попробуйте этот код ниже, надеюсь, это поможет.

def udf_A(x:StringType()):
    if x == "Na": return "null"
    else:return x
udf_B = udf(udf_A, StringType())

import pyspark.sql.functions as psf

for c in df.dtypes:
    if "string" in c[1]:
        df=df.withColumn(c[0],udf_B(psf.col(c[0])))
df.show()

Здесь df.dtypes дает вам массив кортежей с именем столбца и типом данных.

[('DPF_1', 'string'), ('DPF_2', 'string'), ('DPF_3', 'string')... ]

c[0] обозначает имя столбца, а c[1] обозначает тип данных, который в вашем случае равен string.

person LUZO    schedule 16.01.2018
comment
Это прекрасно работает. спасибо, не могли бы вы объяснить, что dtypes будет делать в описанном выше процессе - person ; 16.01.2018

Вот решение на Scala:

// columns which you want to keep 
val colsToSelect : Seq[Column] = ???
// columns which are applied to UDF
val selectUDFs : Seq[Column] = (1 to 30).map(i => udf_B(col(s"DPF_$i")).as(s"DPF_$i"))

df.select((colsToSelect++selectUDFs):_*)
person Raphael Roth    schedule 16.01.2018
comment
Я думаю, что столбцы уже называются DPF_1. Вы можете просто сделать df.columns.map(c => udf_B(c).as(c) ) - person philantrovert; 16.01.2018
comment
он не упоминает, есть ли у его фрейма данных какие-либо другие столбцы или нет, поэтому я не хочу ничего предполагать - person Raphael Roth; 16.01.2018

Вы можете просто преобразовать кадр данных 30 столбцов в na dataframe и применить метод replace как

df.na.replace(df.columns, Map("Na" -> "null"))

Вы замените все строки Na на строку null.

person Ramesh Maharjan    schedule 16.01.2018
comment
у меня много других требований. что можно легко сделать с помощью UDFS. - person ; 16.01.2018
comment
хорошо, поэтому для этих требований вы можете написать udfs. Требования одинаковы для всех столбцов? какие еще требования? - person Ramesh Maharjan; 16.01.2018
comment
да требования одинаковы для всех остальных столбцов. требования зависят от данных, эти требования могут быть обработаны в основном с помощью операторов If, else в UDFS. - person ; 16.01.2018
comment
Каждый раз, когда я вижу это downvoters please do comment before you downvote, у меня возникает соблазн сдаться ;) И без шуток - мы не ожидаем, что пользователи объяснят свои голоса, и определенно не требуем этого. Голоса сами по себе не требуют пояснений - стрелка вниз имеет title, что означает Этот ответ бесполезен (для ответов) или Этот вопрос не показывает каких-либо исследований; это непонятно или бесполезно. В некоторых случаях (ответ, который совершенно неверен, но звучит обманчиво правильно) имеет смысл предоставить объяснение, но в целом я бы не ожидал. - person Alper t. Turker; 16.01.2018
comment
Я понимаю, что вы говорите, @user8371915. Но если в ответе нет недостатка, зачем кому-то просто минусовать? - person Ramesh Maharjan; 16.01.2018
comment
Полезность очень субъективна, как и голоса (то же самое). Ответ может быть бесполезным, потому что он просто уродлив, не имеет объяснения или представляет собой простой дамп кода, не следует передовым методам, опаздывает и повторяет то же самое, что и 10 других ответов на тот же вопрос, показывает не настоящие усилия, не t объясняют важные предостережения, являются простым плагиатом и так далее. Иногда контекст объясняется тем, что вы следите за близкими голосами или просматриваете отзывы в очереди. Если вы получаете гораздо больше отрицательных голосов, чем вы ожидаете, это намек на то, что чего-то не хватает в соответствии с неписаными и письменными правилами сообщества. - person Alper t. Turker; 16.01.2018
comment
Если только это не потому, что вы кому-то не нравитесь, но серийные голоса в любом направлении автоматически обнаруживаются и отменяются, поэтому не нужно об этом беспокоиться. - person Alper t. Turker; 16.01.2018
comment
О, также возможно, что вопрос (явно не по теме) является скорее проблемой, чем ответом. В этом случае это может быть просто попытка очистки, чтобы roomba могла выполнять свою работу, без доверенных пользователей, голосующих за удаление. - person Alper t. Turker; 16.01.2018
comment
Спасибо @user8371915 за объяснение. - person Ramesh Maharjan; 16.01.2018
comment
И последующее чтение: meta.stackoverflow.com/q/285081/8371915, meta.stackoverflow.com/q/252677/8371915, meta.stackoverflow.com/q/276572/8371915 - person Alper t. Turker; 16.01.2018

Один из подходов в Scala состоит в том, чтобы собрать список столбцов с использованием фильтра и пройти по списку, чтобы преобразовать DataFrame с помощью вашей UDF:

val cols = df.columns.filter(_.startsWith("DPF_"))

val df2 = cols.foldLeft( df )( (acc, c) => acc.withColumn(c, udf_B(df(c))) )
person Leo C    schedule 16.01.2018