присоединиться к двум кадрам данных pyspark, используя предложение between, чтобы найти данные IP из диапазона IP

Я пытаюсь написать код pyspark для следующего SQL-запроса:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int

Приведенный выше запрос объединяет две таблицы и использует предложение «между» с предложением «on». Я написал UDF, который делает то же самое, но кажется, что он очень медленный. Есть ли способ написать вышеуказанный запрос в коде pyspark, который даст мне лучшую производительность.

Ниже приведен код, который я использую

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))

это очень неэффективно. более того, если у меня есть region_code , я должен вызвать, изменив возвращаемое значение функции ip_mapping.

df_final = df.withColumn("region_code", ip_mapped("ip_int"))

person braj    schedule 10.01.2017    source источник
comment
пожалуйста, поделитесь кодом, который вы написали   -  person Yaron    schedule 10.01.2017
comment
Также было бы проще, если бы вы могли поделиться некоторыми примерными данными   -  person Dat Tran    schedule 10.01.2017


Ответы (2)


Итак, для каждого IP в вашем DF вы выполняете поиск в другом DF обогащения IP-адресов->GeoIP?

Простое решение -> рассмотрите возможность использования MaxMind DB — https://github.com/maxmind/GeoIP2-python https://www.maxmind.com/en/home

В любом случае, вы должны выполнить операцию один раз для каждого IP-адреса и вернуть все данные GeoIP для определенного IP-адреса.

Ваша функция ip_mapping должна возвращать список элементов (например: (country_code, city_code, region_code))

Ваш UDF должен использовать схему array , и результатом UDF будет несколько выходных столбцов (см. https://stackoverflow.com/a/35323680/5088142 для получения дополнительной информации)

person Yaron    schedule 10.01.2017
comment
спасибо за ответ. Я не могу использовать первый, так как у нас уже есть проверенный платный источник данных. Я изменил свой UDF, но не сильно улучшил производительность. - person braj; 16.01.2017

Вы можете определить условие соединения, используя between, и использовать его в соединении. Пример ниже должен работать для вас.

join_condition = [nk_ip_address_check.ip_number.between(ip_additional_pulse.ip_start_int,ip_additional_pulse.ip_end_int)]

nk_ip_address_check.alias('a')\
    .join(ip_additional_pulse.alias('b'),cond)\
    .selectExpr("a.ip_address",
                "a.ip_number",
                "b.ip_start_int",
                "b.ip_end_int",
                "b.post_code_id",
                "b.city",
                "b.region_name",
                "b.two_letter_country")
person Shan    schedule 09.05.2019