Объектно-ориентированное программирование (ООП) — это основополагающий принцип современной разработки программного обеспечения, обеспечивающий четкую структуру кода и шаблон проектирования. Но что происходит, когда ООП встречается с миром распределенных вычислительных сред, в частности с PySpark?

В этом пошаговом руководстве рассматривается сложная проблема, возникающая при интеграции конструкций ООП в PySpark, раскрываются проблемы сериализации и предлагаются прагматичные решения.

Постановка задачи

При работе с приложением PySpark возникла ошибка при использовании вложенных генераторов с методом mapPartitions():

Could not serialize object: RuntimeError: It appears that you are attempting 
to reference SparkContext from a broadcast variable, action, or transformation. 
SparkContext can only be used on the driver, not in code that is run on workers.

Сообщение об ошибке неоднозначно, но указывает на фундаментальную проблему с сериализацией в PySpark.

Исходный код

Исходный код, вызвавший проблему

def do_something(self,
    rows: Iterable[Row],
) -> Iterable[Tuple[List[int]]]:
    pack, needed = [], self.chunk_size
    for row in rows:
        content = do_thing(row[self.content_column_name])
        while len(content) >= needed:
            used, content = content[:needed], content[needed:] 
            pack.extend(used)
            yield pack,
            pack, needed = [], self.chunk_size
        pack.extend(content)
        needed -= len(content)
    pack.extend([self.separator_token]*needed)
    yield pack,
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(self.do_something), self.dataframe_schem

Диагностика и шаги решения

Путь к решению включал несколько этапов устранения неполадок, каждый из которых основывался на выводах предыдущего:

1. Первоначальный диагноз

Метод do_something был частью объекта, что приводило к ошибке сериализации. Это осознание привело к дальнейшему расследованию.

2. Попытка сериализации

Тестирование сериализации с помощью pickle module перед вызовом mapPartitions или преобразованием метода do_something в статический метод выглядело как логические последующие шаги.

3. Новые ошибки

Однако возникла новая ошибка:

 Could not serialize object: TypeError: cannot pickle 'generator' object

4. Решение с использованием статического метода

Еще одна попытка решить проблему с помощью статического метода внутри класса:

class YourClass:
    @staticmethod
    def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]:
        # Same code inside method
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(
    lambda rows: YourClass.do_something(
        rows, 
        your_class_instance.chunk_size, 
        your_class_instance.content_column_name, 
        your_class_instance.separator_token
    )
), your_class_instance.dataframe_schema)

К сожалению, проблема осталась и привела к той же ошибке.

5. Окончательное решение с использованием функции верхнего уровня

Наконец, перемещение метода за пределы класса привело к успешному выполнению!

def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]:
    # Same code inside method
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(
    lambda rows: do_something(
        rows, 
        your_class_instance.chunk_size, 
        your_class_instance.content_column_name, 
        your_class_instance.separator_token
    )
), your_class_instance.dataframe_schema)

Ключевые вынос

Этот сценарий иллюстрирует проблемы обработки сериализации в PySpark с помощью конструкций объектно-ориентированного программирования. Окончательное решение об отказе от ООП отражает некоторые технические ограничения PySpark.

Углубленный контекст и альтернативные подходы

1. Проблемы сериализации PySpark. Сбалансировать шаблоны проектирования с распределенным характером PySpark непросто.

2. Альтернативные подходы. Возможными решениями являются рефакторинг кода, использование широковещательных переменных или даже пользовательская сериализация.

3. Что делать с генераторами.Рассмотрите возможность реорганизации кода, чтобы избежать генераторов или управлять их областью действия.

4. Ограничения PySpark. Разработка кода в соответствии с механизмом сериализации PySpark, склоняясь к более функциональному стилю, может предотвратить проблемы.

Заключение

Возникшая здесь проблема является напоминанием об ограничениях и сложностях использования ООП в PySpark. Существуют обходные пути, и признание присущих PySpark ограничений может помочь смягчить такие проблемы. Однако при попытке сбалансировать принципы ООП и архитектуру PySpark, а также взаимодействие JVM с кодом Python возникают внутренние сложности.

Навигация по масштабируемости PySpark и ремонтопригодности объектно-ориентированного программирования требует глубокого понимания обеих парадигм. Надеюсь, это послужит руководством для инженеров, пытающихся объединить эти две мощные концепции программирования.