
В этом руководстве будут подробно описаны полные этапы конвейерной обработки от настройки производства данных до создания и оценки модели. Здесь для иллюстрации используется набор данных Movielens с Kafka в качестве производителя.
С помощью Pyspark создается модель машинного обучения с использованием альтернативного метода наименьших квадратов, и ее производительность сравнивается с моделями глубокого обучения, созданными с использованием платформы TensorFlow в Databricks.
Набор данных можно найти в Kaggle здесь.
Введение в совместную фильтрацию
Совместная фильтрация (CF) – это популярный алгоритм рекомендаций, который основывает свои прогнозы и рекомендации на оценках или поведении других пользователей в системе. Проще говоря, если пользователю «Эндрю» нравится продукт А, а также продукт Б, какова вероятность того, что пользователю «Британия» понравится продукт А, учитывая, что ей уже нравится продукт Б. Здесь реализация системы рекомендаций на основе совместного моделирования показан, так как он не такой сложный, как фильтрация на основе содержимого, которая в основном фокусируется на поиске элементов, похожих на элементы, понравившиеся пользователю, с использованием текстового сходства в метаданных.
Метод чередующихся наименьших квадратов (ALS) — это модель, которую мы будем использовать для сопоставления наших данных и поиска сходств, чтобы давать рекомендации.
что такое метод наименьших квадратов (ALS)
ALS лучше всего работает с разреженными наборами данных, такими как тот, что у нас есть. По сути, он пытается использовать сходство между пользователями в разреженной матрице и заполнять недостающие оценки с такой вероятностью, как если бы пользователи сами дали оценку. так что если вероятность данной оценки выше, то этот фильм рекомендуется пользователю.
Например, предположим, что вам 18 лет, и вы поставили лайки и оценки боевикам с участием Арнольда Шварценеггера и Сильвестра Сталлоне, скажем, какой-то другой пользователь вашего возраста, пола и из вашей страны поставил лайк и оценил боевики с участием Джейсона Стэтхэма, то фильмы с участием Джейсона Стэтхэма, скорее всего, вам рекомендуют.
Поскольку это руководство слишком подробно описывает практическую реализацию, мы не будем углубляться в математику, лежащую в основе ALS. А вот математические подробности можно найти здесь
Настройка Kafka в качестве производителя и потребителя потоковой передачи:
вам нужно установить пакет Kafka и импортировать KafkaConsumer и KafkaProducer. После чего мы импортируем наши данные, которые изначально были в формате CSV, и конвертируем их в формат JSON.
pip install kafka-python
from kafka import KafkaConsumer, KafkaProducer
import jsonmovie_csv = pd.read_csv("movie_lens_integrated.csv", delimiter="|", engine='python')
movie_json_convert = movie_csv.to_json("movies.json")
Чтобы облегчить потоковую передачу наших данных в виде тем Kafka, мы устанавливаем необходимые настройки, а затем начинаем отправлять каждый фильм в формате JSON через темы Kafka.
from kafka import KafkaConsumer, KafkaProducer
import time
import pandas as pd
KAFKA_TOPIC_NAME = "movielence"
KAFKA_BOOTSTRAP_SERVER_CONN = "192.168.99.100:9092"
kafka_producer_object = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER_CONN,
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
movies_json = pd.read_json("movies.json")
movie_list= movies_json.to_dict(orient="records")
for movie in movie_list:
print("Message to be send : ", movie)
kafka_producer_object.send(KAFKA_TOPIC_NAME,movie)
time.sleep(1)
Настройка Spark Streaming для потребителей
Теперь, когда производитель Kafka настроен, мы приступаем к настройке потоковой передачи искры для работы в качестве потребителя.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
KAFKA_TOPIC_NAME_CONS = "movielence"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "outputtopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = '192.168.99.100:9092'
if __name__ == "__main__":
print("PySpark Structured Streaming with Kafka Application Started ...")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.config("spark.jars", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.executor.extraClassPath", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.executor.extraLibrary", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.config("spark.driver.extraClassPath", "spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,kafka-clients-2.4.1.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print(" kafka Started ...")
# Construct a streaming DataFrame that reads from testtopic
transaction_detail_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS) \
.option("startingOffsets", "latest") \
.load()
print("Printing Schema of transaction_detail_df: ")
transaction_detail_df.printSchema()
# Write final result into console for debugging purpose
trans_detail_write_stream = transaction_detail_df \
.writeStream \
.trigger(processingTime='1 seconds') \
.outputMode("update") \
.option("truncate", "false")\
.format("console") \
.start()
trans_detail_write_stream.awaitTermination()
spark.stop()
Машинное обучение:
Использование ALS и Regression Evaluator для обучения и тестирования показателей производительности. Здесь мы обучаем нашу модель на основе ALS и оцениваем тестовые данные, используя среднеквадратичную ошибку в качестве показателей производительности.
from pyspark.ml.recommendation import ALS
als = (ALS()
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
.setPredictionCol("predictions")
.setMaxIter(2)
.setSeed(seed)
.setRegParam(0.1)
.setColdStartStrategy("drop")
.setRank(12))
alsModel = als.fit(trainingDF)from pyspark.ml.evaluation import RegressionEvaluator
regEval = RegressionEvaluator(predictionCol="predictions", labelCol="rating", metricName="mse")
predictedTestDF = alsModel.transform(testDF)
testMse = regEval.evaluate(predictedTestDF)
Установлено, что эта модель, основанная на ALS, дает среднеквадратичную ошибку 0,802690. Эта ошибка довольно высока по сравнению с моделью глубокого обучения, обученной и протестированной с помощью HorovodEstimator. Модель глубокого обучения имеет меньшую ошибку 0,706399, что доказывает, что модели глубокого обучения лучше изучают сложные нелинейные отношения, существующие в данных.
Построение модели глубокого обучения
Поскольку это руководство предназначено для демонстрации конвейера, по которому данные проходят от начала до конца, здесь не показаны реализации моделей глубокого обучения, но их можно найти здесь
import tensorflow as tf
import horovod.tensorflow as hvd
tf.set_random_seed(seed=40)
def model_fn(features, labels, mode, params):
print("HVD Size: ", hvd.size())
features_with_shape = tf.reshape(features["features"], [-1, 24]) # Explicitly specify dimensions
hidden_layer1 = tf.layers.dense(inputs=features_with_shape, units=params["hidden_layer1"], activation=tf.nn.relu)
hidden_layer2 = tf.layers.dense(inputs=hidden_layer1, units=params["hidden_layer2"], activation=tf.nn.relu)
predictions = tf.squeeze(tf.layers.dense(inputs=hidden_layer2, units=1, activation=None), axis=-1)
# If the estimator is running in PREDICT mode, we can stop building our model graph here and simply return
# our model's inference outputs
serving_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
export_outputs = {serving_key: tf.estimator.export.PredictOutput({"predictions": predictions})}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, export_outputs=export_outputs)
# Calculate Loss (for both TRAIN and EVAL modes)
loss = tf.losses.mean_squared_error(labels, predictions)
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"])
optimizer = hvd.DistributedOptimizer(optimizer)
train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op,
export_outputs=export_outputs)
# If running in EVAL mode, add model evaluation metrics (accuracy) to our EstimatorSpec so that
# they're logged when model evaluation runs
eval_metric_ops = {"rmse": tf.metrics.root_mean_squared_error(labels=labels, predictions=predictions)}
return tf.estimator.EstimatorSpec(
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops, export_outputs=export_outputs)
Для тестирования производительности построенной модели глубокого обучения используется оценщик Horovod.
from sparkdl.estimators.horovod_estimator.estimator import HorovodEstimator
est = HorovodEstimator(modelFn=model_fn,
featureMapping={"features":"features"},
modelDir=model_dir,
labelCol="rating",
batchSize=128,
maxSteps=20000,
isValidationCol="isVal",
modelFnParams={"hidden_layer1": 30, "hidden_layer2": 20, "learning_rate": 0.0001},
saveCheckpointsSecs=30)
transformer = est.fit(trainValDF)
Заключение
Мы видим, что производительность модели глубокого обучения довольно высока, по сравнению с моделью машинного обучения. Для тестирования кодов полные коды доступны в репозитории GitHub, показанном ниже.