
Объектно-ориентированное программирование (ООП) — это основополагающий принцип современной разработки программного обеспечения, обеспечивающий четкую структуру кода и шаблон проектирования. Но что происходит, когда ООП встречается с миром распределенных вычислительных сред, в частности с PySpark?
В этом пошаговом руководстве рассматривается сложная проблема, возникающая при интеграции конструкций ООП в PySpark, раскрываются проблемы сериализации и предлагаются прагматичные решения.
Постановка задачи
При работе с приложением PySpark возникла ошибка при использовании вложенных генераторов с методом mapPartitions():
Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that is run on workers.
Сообщение об ошибке неоднозначно, но указывает на фундаментальную проблему с сериализацией в PySpark.
Исходный код
Исходный код, вызвавший проблему
def do_something(self,
rows: Iterable[Row],
) -> Iterable[Tuple[List[int]]]:
pack, needed = [], self.chunk_size
for row in rows:
content = do_thing(row[self.content_column_name])
while len(content) >= needed:
used, content = content[:needed], content[needed:]
pack.extend(used)
yield pack,
pack, needed = [], self.chunk_size
pack.extend(content)
needed -= len(content)
pack.extend([self.separator_token]*needed)
yield pack,
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(self.do_something), self.dataframe_schem
Диагностика и шаги решения
Путь к решению включал несколько этапов устранения неполадок, каждый из которых основывался на выводах предыдущего:
1. Первоначальный диагноз
Метод do_something был частью объекта, что приводило к ошибке сериализации. Это осознание привело к дальнейшему расследованию.
2. Попытка сериализации
Тестирование сериализации с помощью pickle module перед вызовом mapPartitions или преобразованием метода do_something в статический метод выглядело как логические последующие шаги.
3. Новые ошибки
Однако возникла новая ошибка:
Could not serialize object: TypeError: cannot pickle 'generator' object
4. Решение с использованием статического метода
Еще одна попытка решить проблему с помощью статического метода внутри класса:
class YourClass:
@staticmethod
def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]:
# Same code inside method
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(
lambda rows: YourClass.do_something(
rows,
your_class_instance.chunk_size,
your_class_instance.content_column_name,
your_class_instance.separator_token
)
), your_class_instance.dataframe_schema)
К сожалению, проблема осталась и привела к той же ошибке.
5. Окончательное решение с использованием функции верхнего уровня
Наконец, перемещение метода за пределы класса привело к успешному выполнению!
def do_something(rows: Iterable[Row], chunk_size: int, content_column_name: str, separator_token: int) -> Iterable[Tuple[List[int]]]:
# Same code inside method
applied = spark.createDataFrame(dataframe.rdd.mapPartitions(
lambda rows: do_something(
rows,
your_class_instance.chunk_size,
your_class_instance.content_column_name,
your_class_instance.separator_token
)
), your_class_instance.dataframe_schema)
Ключевые вынос
Этот сценарий иллюстрирует проблемы обработки сериализации в PySpark с помощью конструкций объектно-ориентированного программирования. Окончательное решение об отказе от ООП отражает некоторые технические ограничения PySpark.
Углубленный контекст и альтернативные подходы
1. Проблемы сериализации PySpark. Сбалансировать шаблоны проектирования с распределенным характером PySpark непросто.
2. Альтернативные подходы. Возможными решениями являются рефакторинг кода, использование широковещательных переменных или даже пользовательская сериализация.
3. Что делать с генераторами.Рассмотрите возможность реорганизации кода, чтобы избежать генераторов или управлять их областью действия.
4. Ограничения PySpark. Разработка кода в соответствии с механизмом сериализации PySpark, склоняясь к более функциональному стилю, может предотвратить проблемы.
Заключение
Возникшая здесь проблема является напоминанием об ограничениях и сложностях использования ООП в PySpark. Существуют обходные пути, и признание присущих PySpark ограничений может помочь смягчить такие проблемы. Однако при попытке сбалансировать принципы ООП и архитектуру PySpark, а также взаимодействие JVM с кодом Python возникают внутренние сложности.
Навигация по масштабируемости PySpark и ремонтопригодности объектно-ориентированного программирования требует глубокого понимания обеих парадигм. Надеюсь, это послужит руководством для инженеров, пытающихся объединить эти две мощные концепции программирования.