Использование Python reduce() для объединения нескольких PySpark DataFrames

Кто-нибудь знает, почему использование Python3 functools.reduce() приведет к снижению производительности при объединении нескольких PySpark DataFrames, чем просто итеративное объединение одних и тех же DataFrames с использованием цикла for? В частности, это приводит к значительному замедлению работы, за которым следует ошибка нехватки памяти:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

тогда как этот нет:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

Любые идеи очень приветствуются. Спасибо!


person Eric Smith    schedule 07.07.2017    source источник


Ответы (2)


Одна из причин заключается в том, что редукция или свертка обычно функционально чисты: результат каждой операции накопления записывается не в одну и ту же часть памяти, а в новый блок памяти.

В принципе, сборщик мусора может освобождать предыдущий блок после каждого накопления, но если этого не произойдет, вы будете выделять память для каждой обновленной версии накопителя.

person Alex    schedule 07.07.2017

Пока вы используете CPython (разные реализации могут, но на самом деле не должны демонстрировать существенно различное поведение в этом конкретном случае). Если вы посмотрите на reduce реализацию вы увидите, что это просто цикл for с минимальной обработкой исключений.

Ядро точно эквивалентно циклу, который вы используете

for element in it:
    value = function(value, element)

и нет никаких доказательств, подтверждающих заявления о каком-либо особом поведении.

Кроме того, простые тесты с количеством кадров, практические ограничения соединений Spark (соединения являются одними из самых затратных операций в Spark)

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

Не показывать существенной разницы во времени между прямым циклом for

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

и reduce вызов

from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

Точно так же общие шаблоны поведения JVM сопоставимы между циклами for

Для циклического использования ЦП и памяти — VisualVM

и reduce

уменьшить использование ЦП и памяти — VisualVM

Наконец, оба генерируют идентичные планы выполнения

g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

что указывает на отсутствие разницы при оценке планов и вероятность возникновения OOM.

Другими словами, ваша корреляция не подразумевает причинно-следственную связь, и наблюдаемые проблемы с производительностью вряд ли связаны с методом, который вы используете для объединения DataFrames.

person user11024414    schedule 06.02.2019
comment
Можно ли использовать левые соединения или внешние соединения с этим методом, например: return reduce(lambda x, y: x.join(y, ["id"]), how="left", dfs) ? - person thentangler; 12.05.2021