Apache Spark — это унифицированная аналитическая система для крупномасштабной обработки данных. Он имеет процветающее сообщество открытого исходного кода и является самым активным проектом Apache в области больших данных. Spark предоставляет интерфейс для программирования целых кластеров с неявным параллелизмом данных и отказоустойчивостью.

Spark — это быстрый и универсальный механизм для крупномасштабной обработки данных. Некоторые из основных особенностей Apache Spark:

Скорость: Spark до 100 раз быстрее, чем Hadoop MapReduce, для крупномасштабной обработки данных. Он использует кэширование в памяти и оптимизированное выполнение для обеспечения высокой производительности.

Простота использования. Вы можете быстро писать приложения на Java, Scala, Python, R. Scala является языком по умолчанию, но вы также можете использовать интерактивные оболочки на Scala, Python, R.

Общее: в Spark есть библиотеки для SQL (Spark SQL), потоковой передачи (Spark Streaming), машинного обучения (MLlib) и обработки графов (GraphX) — все они используют один и тот же основной механизм с одинаковыми API, что упрощает объединение этих инструментов.

Запуск где угодно. Вы можете запускать Spark в режиме автономного кластера в Hadoop YARN, Mesos, Kubernetes или в облаке EC2.

Spark состоит из нескольких библиотек, которые можно использовать вместе в одном приложении:

  • Spark Core: базовые функции Apache Spark, включая планирование задач, управление памятью и взаимодействие с системами хранения. Spark Core содержит API-интерфейсы, используемые библиотеками более высокого уровня.
  • Spark SQL: используется для выполнения SQL-запросов к вашим данным, как статическим, так и потоковым. Он поддерживает несколько источников данных (csv, json, паркет) и типов данных (DataFrames, Datasets).
  • Spark Streaming: используется для потоковой передачи данных в реальном времени в аналитические приложения. Он позволяет создавать масштабируемые отказоустойчивые потоковые приложения.
  • MLlib: библиотека машинного обучения Spark с алгоритмами и утилитами для распределенного машинного обучения.
  • GraphX: платформа вычислений графов, включающая API для создания графиков и управления ими.
  • SparkR: обеспечивает интеграцию с языком программирования R. Вы можете анализировать свои данные с помощью R и масштабировать их с помощью Spark.

В следующих разделах мы рассмотрим каждую из этих библиотек более подробно с примерами и вариантами использования.

Ключевые особенности Apache Spark

Apache Spark — это быстрый и универсальный механизм для крупномасштабной обработки данных. Некоторые из ключевых особенностей Apache Spark:

Скорость

Spark работает до 100 раз быстрее, чем Hadoop MapReduce, при крупномасштабной обработке данных. Такая скорость достигается за счет кэширования в памяти и оптимизированного выполнения. Основной API Spark основан на идее распределенных коллекций, которые можно кэшировать в памяти и сохранять на диске.

Простота использования

Spark предоставляет простые в использовании API на Python, Java, Scala и R. Вы можете быстро начать программировать Spark, используя любой из этих языков. Вот простой пример программы подсчета слов на Python:

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

input = sc.textFile("file:///path/to/input")
words = input.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()

for word, count in wordCounts.items(): 
    print("%s: %i" % (word, count))

общность

Spark работает на Hadoop, Mesos, автономно или в облаке. Он может получить доступ к различным источникам данных. Вы можете использовать его для:

  • Пакетная обработка
  • Интерактивные запросы
  • Потоковое вещание
  • Машинное обучение

Все с использованием единого фреймворка. Это означает, что вы можете объединить несколько рабочих нагрузок в одном приложении.

Беги куда угодно

Spark работает на Hadoop, Mesos, Kubernetes, автономно или в облаке. Он может получить доступ к различным источникам данных, включая HDFS, HBase, Cassandra, S3 и Kafka. Это позволяет легко запускать Spark в существующей инфраструктуре или масштабировать его на облачных ресурсах.

III. Искровое ядро

Spark Core — это основа стека Spark. Он обеспечивает распределенную диспетчеризацию задач, планирование и базовые функции ввода-вывода.

RDD (устойчивые распределенные наборы данных)

RDD — это фундаментальная структура данных в Spark. Это неизменяемая распределенная коллекция объектов, разделенных по узлам кластера. RDD можно создавать из исходных данных в системах хранения (HDFS, HBase, S3) или путем преобразования существующих RDD.

Некоторые примеры создания RDD:

# From a list
sc.parallelize([1, 2, 3])

# From a file 
sc.textFile("data.txt")

# By transforming an existing RDD
rdd.map(lambda x: x * 2)

RDD поддерживают два типа операций:

  • Преобразования: возврат нового RDD (например, map(), filter(), FlatMap(), Union(), Crossing()).
  • Действия: вернуть результат программе драйвера (например, уменьшить(), count(), first(), saveAsTextFile()).

Преобразования оцениваются лениво, то есть они не вычисляются до тех пор, пока не произойдет действие. Это позволяет Spark оптимизировать DAG преобразований.

СДР можно сохранять в памяти, чтобы ускорить будущие действия. Существует несколько уровней хранения:

  • ТОЛЬКО ПАМЯТЬ
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER

Мы можем отменить сохранение RDD, чтобы освободить место.

Перегородки

RDD разбиваются на разделы, которые представляют собой блоки данных, которые можно вычислять на разных узлах кластера. Разделитель RDD определяет, как его данные будут разделены по разделам.

Некоторые примеры использования преобразований repartition() и Coalesce() для перераспределения RDD:

# Decrease number of partitions 
rdd = rdd.coalesce(2)  

# Increase number of partitions 
rdd = rdd.repartition(10)

Объединение пытается уменьшить количество разделов без перемешивания, тогда как перераспределение может привести к перемешиванию.

В этом разделе представлен обзор основных API-интерфейсов Spark для работы с RDD, разделами и сохраняемостью. В следующих разделах будут рассмотрены компоненты более высокого уровня Spark, такие как Spark SQL, MLlib и GraphX, основанные на Spark Core.

Искровой SQL

Spark SQL — это модуль Spark для работы со структурированными данными с использованием SQL или API DataFrame. Он позволяет запрашивать структурированные данные внутри программ Spark, используя SQL или API DataFrame.

Кадры данных

DataFrame — это распределенная коллекция данных, организованная в именованные столбцы. Концептуально он эквивалентен таблице в реляционной базе данных или фрейму данных в R/Python. DataFrames могут быть созданы из широкого спектра источников, таких как:

  • Файлы структурированных данных (JSON, CSV, XML и т. д.)
  • Столы для ульев
  • Внешние базы данных
  • Существующие СДР

Например, вот как можно создать DataFrame из CSV-файла:

df = spark.read.csv("data.csv")

Это даст вам DataFrame со схемой, полученной из данных CSV.

Вы можете просмотреть схему DataFrame, используя метод .printSchema():

df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = true)

SQL-запросы к DataFrames

Вы можете выполнять SQL-запросы к DataFrames, используя метод .sql(). Например:

df.createOrReplaceTempView("table1")

query = spark.sql("SELECT * FROM table1 WHERE age > 20")

Это запустит SQL-запрос и вернет результат в виде DataFrame.

Spark SQL поддерживает почти все основные агрегатные функции SQL, такие как COUNT(), SUM(), AVG() и т. д. Таким образом, вы можете легко выполнять сложные агрегаты и анализ данных, используя только SQL.

Написание фреймов данных

Вы можете записывать DataFrame во многие системы хранения, чтобы они служили более постоянными таблицами данных, используя DataFrameWriter. Например:

df.write.mode("overwrite").parquet("data.parquet")

Это запишет DataFrame df в файл Parquet с именем data.parquet. Доступные форматы:

  • JSON
  • CSV-файл
  • Паркет
  • ОРЦ
  • JDBC
  • Улей и т. д.

Таким образом, Spark SQL дает вам унифицированный способ подключения ко многим источникам данных, запроса и агрегирования данных, а также записи полученных данных в различные системы хранения. Это позволяет создавать мощные приложения и конвейеры обработки данных.

Искра Стриминг

Spark Streaming — это легкая платформа, построенная на основе Spark Core, которая обеспечивает масштабируемую, высокопроизводительную и отказоустойчивую потоковую обработку потоков данных в реальном времени. Данные могут быть получены из многих источников, таких как Kafka, Flume, Kinesis, Twitter и т. д.

# Initialize StreamingContext 
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

Основная абстракция, предоставляемая Spark Streaming, — это Дискретизированный поток или DStream, который представляет непрерывный поток данных. DStreams можно создавать из различных источников ввода, таких как:

  • Источники на основе файлов: файлы в каталоге, доступ к которым осуществляется через любую файловую систему, совместимую с Hadoop.
  • Источники на основе сокетов: необработанные TCP-сокеты.
  • Источники на основе актеров Akka: Актеры Akka.
  • Источники Kafka, Flume и Twitter.

DStreams поддерживают два типа операций:

  • Преобразования: операции с DStreams для получения новых DStreams. Некоторые примеры: map, filter, reduceByKey, join и т. д. Они оцениваются лениво.
  • Операции вывода: такие операции, как saveAsTextFiles, saveAsHadoopFiles, print и т. д. Они фактически запускают выполнение всех преобразований в потоках DStreams.

Вот простой пример подсчета слов в Spark Streaming:

# Create a DStream that will connect to localhost:9999 
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint(10)

# Start the computation
ssc.start()        

# Wait for the computation to terminate 
ssc.awaitTermination()

Эта программа подключается к потоку данных сокета на localhost:9999, вычисляет количество слов в каждом пакете данных и печатает первые 10 слов из каждого пакета.

Подводя итог, можно сказать, что Spark Streaming позволяет масштабировать обработку данных в реальном времени, используя масштабируемые функции Spark Core и библиотек расширений. Надеюсь, это поможет дать обзор API Spark Streaming и его возможностей! Пожалуйста, дайте мне знать, если у вас есть еще вопросы.

MLlib — библиотека машинного обучения Spark

MLlib — это библиотека машинного обучения Spark. MLlib предоставляет множество алгоритмов и утилит машинного обучения поверх Spark. Некоторые из алгоритмов, поддерживаемых MLlib:

  • Регрессия — линейная регрессия, логистическая регрессия, деревья решений, случайные леса, деревья с градиентным усилением и т. д.
  • Классификация — логистическая регрессия, наивный Байес, деревья решений, случайные леса, SVM, деревья с градиентным усилением и т. д.
  • Кластеризация — кластеризация K-средних, скрытое распределение Дирихле (LDA), модели гауссовой смеси и т. д.
  • Совместная фильтрация — факторизация матрицы попеременных наименьших квадратов (ALS).
  • Уменьшение размерности — разложение по сингулярным значениям (SVD), анализ главных компонент (PCA) и т. д.

MLlib позволяет создавать конвейеры для объединения нескольких алгоритмов и преобразователей. MLlib также поддерживает сохранение модели с помощью API на основе DataFrame.

Вот простой пример линейной регрессии в MLlib:

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")

# Fit the model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = model.summary
print("numIterations: %i" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

В этой статье рассматриваются основы MLlib и некоторые примеры использования. Дайте мне знать, если вы хотите, чтобы я объяснил что-то более подробно! Я стремился сделать этот раздел описательным с длинным содержанием и оптимизированными для SEO заголовками, сохраняя при этом соотношение чтения пользователем с примерами кода и простой плавной структурой. Пожалуйста, похлопайте по этой статье, если она оказалась для вас полезной!

GraphX ​​— платформа обработки графов Spark

GraphX ​​— это API Spark для графов и параллельных вычислений на графах. GraphX ​​расширяет абстракцию Spark RDD, включая графовые операции и алгоритмы. Он предоставляет единый набор инструментов для ETL, исследовательского анализа и итеративных вычислений на больших графах.

На высоком уровне GraphX ​​представляет график как объект Graph, который содержит:

  • vertices: СДР свойств вершин.
  • edges: СДР свойств ребер.

VertexRDD и EdgeRDD обеспечивают удобный просмотр свойств вершин и ребер.

GraphX ​​предоставляет набор фундаментальных операторов (например, subgraph, joinVertices, aggregateMessages), а также оптимизированный вариант Pregel API. Эти операторы упрощают ETL графов, исследовательский анализ и алгоритмы итеративных графов.

GraphX ​​также включает набор встроенных алгоритмов, таких как PageRank, связанные компоненты, подсчет треугольников и сильно связанные компоненты. Они реализованы поверх операторов GraphX ​​с использованием стандартных API Spark.

Вот простой пример создания графика и запуска PageRank:

// Create an RDD for the vertices
val vertices = sc.parallelize(Seq(
  (1L, "A"), 
  (2L, "B"),
  (3L, "C")
))

// Create an RDD for edges 
val edges = sc.parallelize(Seq(
  (1L, 2L),
  (2L, 3L) 
))

// Create a Graph 
val graph = Graph(vertices, edges)

// Run PageRank
val ranks = graph.pageRank(0.15).vertices

// Join ranks back to vertices
val ranksJoined = vertices.join(ranks)

Это создает граф с 3 вершинами и 2 ребрами, запускает PageRank и объединяет вычисленные ранги обратно со свойствами вершин.

Подводя итог, GraphX ​​— это мощная среда обработки графов с унифицированным API для вычислений на графах и встроенными алгоритмами. Он позволяет легко создавать масштабируемые графовые приложения поверх Spark.

SparkR — использование R со Spark

SparkR — это пакет R, который предоставляет легкий интерфейс для использования Apache Spark из R. Он позволяет манипулировать кадрами данных Spark из R и вызывать алгоритмы MLlib. SparkR позволяет легко объединить возможности R и Spark.

Подключение к Spark из R

Чтобы начать работу с SparkR, сначала необходимо подключиться к работающему кластеру Spark из вашей программы R. Вы можете сделать это с помощью функции sparkR.connect(), передав главный URL-адрес Spark:

library(SparkR)

sc <- sparkR.connect("spark://hostname:7077")

Это позволит подключиться к автономному кластеру Spark. Вы также можете подключиться к кластерам YARN и Mesos.

Загрузка и сохранение данных

После подключения к Spark вы можете загружать данные в Spark DataFrames и сохранять DataFrames в системах хранения. Некоторые примеры:

# Load JSON file into DataFrame 
df <- read.json(sc, "data.json") 

# Save DataFrame to Parquet 
write.parquet(df, "data.parquet")  

# Read from Hive table
hiveDF <- read.df(sc, "my_hive_table")  

# Write to Postgres
write.jdbc(df, "jdbc:postgresql://host/database", "table_name")

SparkR поддерживает чтение и запись во многие источники данных, включая JSON, CSV, Text, Parquet, Hive, MySQL, Postgres и т. д.

Вызов Spark SQL через SparkR

Вы можете запускать SQL-запросы Spark к своим DataFrames из R, используя метод sql(). Например:

# Register DataFrame as a table
createOrReplaceTempView(df, "table_name")

# Run SQL query 
results <- sql(sc, "SELECT * FROM table_name")

Это вернет новый DataFrame с результатами запроса SQL.

Модели MLlib через SparkR

Вы также можете вызывать алгоритмы машинного обучения MLlib из R с помощью пакета SparkR ml. Например:

# Logistic regression 
lr <- ml_logistic_regression(sc, df)

# Naive Bayes 
nb <- ml_naive_bayes(sc, df)

# Clustering 
km <- ml_kmeans(sc, df, k = 2)

Пакет ml позволяет создавать конвейеры машинного обучения, настраивать гиперпараметры и оценивать модели машинного обучения — и все это с помощью R!

Подводя итог, можно сказать, что SparkR предоставляет полезный интерфейс для специалистов по данным и аналитиков, знакомых с R, чтобы использовать возможности Spark для крупномасштабной обработки данных, SQL и машинного обучения.

Запуск приложений Spark

Теперь, когда мы рассмотрели основные компоненты экосистемы Spark, давайте обсудим, как на самом деле запускать приложения Spark. Существует несколько основных способов запуска Spark — оболочка Spark, автономные приложения и распределенные развертывания.

Искровая оболочка

Самый простой способ начать использовать Spark — через оболочку Spark, spark-shell. Оболочка Spark предоставляет простой интерфейс командной строки для взаимодействия со Spark с помощью объектов SparkContext (sc) и SQLContext (sqlContext). Вы можете быстро создать прототип в оболочке, прежде чем переходить к полноценному приложению.

Чтобы запустить оболочку Spark:

spark-shell

Это запустит оболочку Spark с вашей конфигурацией Spark по умолчанию. Вы можете передать дополнительные параметры для изменения главного URL-адреса, добавления пакетов, установки параметров конфигурации Spark и т. д.

Автономные приложения

Для более крупных и сложных приложений вам потребуется создавать автономные приложения Spark. Вы можете писать приложения Spark на Java, Scala, Python, R и SQL. Примеры автономных приложений см. на странице Примеры Spark.

Чтобы отправить автономное приложение под названием MySparkApp.py на Python:

spark-submit MySparkApp.py

Опять же, вы можете передать различные параметры для настройки вашего приложения и способа его запуска.

Распределенные развертывания

Для серьезного использования Spark вам понадобится запустить его в распределенном кластере. Существует несколько основных вариантов распределенного развертывания:

  • Автономный — собственный простой автономный менеджер кластеров Spark. Отлично подходит для целей тестирования.
  • YARN — запускайте Spark поверх Hadoop NextGen (YARN), который может запускать распределенные рабочие нагрузки поверх кластеров Hadoop.
  • Mesos — общий менеджер кластера, который также может запускать приложения Hadoop MapReduce и Spark.
  • Kubernetes — система с открытым исходным кодом для автоматизации развертывания, масштабирования и управления контейнерными приложениями, такими как Spark.

Вы можете настроить менеджер кластера, передав аргумент --master в spark-shell или spark-submit. Например, чтобы запустить оболочку Spark в режиме автономного кластера:

spark-shell --master spark://hostname:7077

Это позволит подключиться к автономному кластеру Spark на hostname. Вы также можете запускать приложения в YARN, Mesos, Kubernetes и т. д., соответствующим образом изменив главный URL-адрес.

Ключевой вывод заключается в том, что существует множество вариантов запуска приложений Spark в зависимости от вашего варианта использования и инфраструктуры. Дайте мне знать, если у вас есть еще вопросы!

Надеюсь, эта статья была для вас полезна! Если статья оказалась для вас полезной, поддержите меня: 1) нажмите несколько хлопков и 2) поделитесь этой историей в своей сети. Дайте мне знать, если у вас есть какие-либо вопросы по обсуждаемому содержанию.

Не стесняйтесь обращаться ко мне по адресу coderhack.com(at)xiv.in.