Объектно-ориентированное программирование (ООП) — это основополагающий принцип современной разработки программного обеспечения, обеспечивающий четкую структуру кода и шаблон проектирования. Но что происходит, когда ООП встречается с миром распределенных вычислительных сред, в частности с PySpark?
В этом пошаговом руководстве рассматривается сложная проблема, возникающая при интеграции конструкций ООП в PySpark, раскрываются проблемы сериализации и предлагаются прагматичные решения.
Постановка задачи
При работе с приложением PySpark возникла ошибка при использовании вложенных генераторов с методом mapPartitions():
Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that is run on workers.
Сообщение об ошибке неоднозначно, но указывает на фундаментальную проблему с сериализацией в PySpark.
Исходный код
Исходный код, вызвавший проблему
def do_something(self, rows: Iterable[Row], ) -> Iterable[Tuple[List[int]]]: pack, needed = [], self.chunk_size for row in rows: content = do_thing(row[self.content_column_name]) while len(content) >= needed: used, content = content[:needed], content[needed:] pack.extend(used) yield pack, pack, needed = [], self.chunk_size pack.extend(content) needed -= len(content) pack.extend([self.separator_token]*needed) yield pack, applied = spark.createDataFrame(dataframe.rdd.mapPartitions(self.do_something), self.dataframe_schem
Диагностика и шаги решения
Путь к решению включал несколько этапов устранения неполадок, каждый из которых основывался на выводах предыдущего:
1. Первоначальный диагноз
Метод do_something был частью объекта, что приводило к ошибке сериализации. Это осознание привело к дальнейшему расследованию.
2. Попытка сериализации
Тестирование сериализации с помощью pickle module перед вызовом mapPartitions или преобразованием метода do_something в статический метод выглядело как логические последующие шаги.
3. Новые ошибки
Однако возникла новая ошибка:
Could not serialize object: TypeError: cannot pickle 'generator' object
4. Решение с использованием статического метода
Еще одна попытка решить проблему с помощью статического метода внутри класса:
class YourClass: @staticmethod def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]: # Same code inside method applied = spark.createDataFrame(dataframe.rdd.mapPartitions( lambda rows: YourClass.do_something( rows, your_class_instance.chunk_size, your_class_instance.content_column_name, your_class_instance.separator_token ) ), your_class_instance.dataframe_schema)
К сожалению, проблема осталась и привела к той же ошибке.
5. Окончательное решение с использованием функции верхнего уровня
Наконец, перемещение метода за пределы класса привело к успешному выполнению!
def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]: # Same code inside method applied = spark.createDataFrame(dataframe.rdd.mapPartitions( lambda rows: do_something( rows, your_class_instance.chunk_size, your_class_instance.content_column_name, your_class_instance.separator_token ) ), your_class_instance.dataframe_schema)
Ключевые вынос
Этот сценарий иллюстрирует проблемы обработки сериализации в PySpark с помощью конструкций объектно-ориентированного программирования. Окончательное решение об отказе от ООП отражает некоторые технические ограничения PySpark.
Углубленный контекст и альтернативные подходы
1. Проблемы сериализации PySpark. Сбалансировать шаблоны проектирования с распределенным характером PySpark непросто.
2. Альтернативные подходы. Возможными решениями являются рефакторинг кода, использование широковещательных переменных или даже пользовательская сериализация.
3. Что делать с генераторами.Рассмотрите возможность реорганизации кода, чтобы избежать генераторов или управлять их областью действия.
4. Ограничения PySpark. Разработка кода в соответствии с механизмом сериализации PySpark, склоняясь к более функциональному стилю, может предотвратить проблемы.
Заключение
Возникшая здесь проблема является напоминанием об ограничениях и сложностях использования ООП в PySpark. Существуют обходные пути, и признание присущих PySpark ограничений может помочь смягчить такие проблемы. Однако при попытке сбалансировать принципы ООП и архитектуру PySpark, а также взаимодействие JVM с кодом Python возникают внутренние сложности.
Навигация по масштабируемости PySpark и ремонтопригодности объектно-ориентированного программирования требует глубокого понимания обеих парадигм. Надеюсь, это послужит руководством для инженеров, пытающихся объединить эти две мощные концепции программирования.