Как выбрать объект класса Case в качестве DataFrame в Kafka-Spark Structured Streaming

У меня есть класс случая:

case class clickStream(userid:String, adId :String, timestamp:String)

экземпляр которого я хочу отправить с KafkaProducer как:

val record = new ProducerRecord[String,clickStream](
  "clicktream",
  "data",
  clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)

который отправляет запись в виде строки, как и ожидалось, в очереди TOPIC:

clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)

Однако проблема на стороне потребителя:

val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
 

clickStreamDF 
.select($"value".as("string"))
.as[clickStream]       //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

По-видимому, использование API .as[clickStream] не работает, так как исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];

Вот что содержит столбец [value]:

    Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+

Я пытался использовать Пользовательский сериализатор как value.serializer и value.deserializer.

Но столкнулся с другой проблемой ClassNotFoundException в моей структуре каталогов.

У меня есть 3 вопроса:

Как Kafka использует здесь класс Custom Deserializer для синтаксического анализа объекта?

Я не совсем понимаю концепцию кодировщиков и как их можно использовать в данном случае?

Как лучше всего отправлять/получать объекты Custom Case Class с помощью Kafka?




Ответы (1)


Поскольку вы передаете данные объекта clickStream как string в kafka, и spark будет читать одну и ту же строку, в spark вам нужно проанализировать и извлечь необходимые поля из clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)

Проверьте код ниже.

clickStreamDF 
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream] # Extract all fields from the value string & then use .as[clickStream] option. I think this line is not required as data already parsed to required format. 
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

Образец Как разобрать clickStream строковых данных.

scala> df.show(false)
+---------------------------------------------------+
|value                                              |
+---------------------------------------------------+
|clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)|
+---------------------------------------------------+
scala> df
.select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
.select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
.as[clickStream]
.show(false)

+------+----+----------------------------+
|userid|adId|timestamp                   |
+------+----+----------------------------+
|user5 |ad2 |Sat Jul 18 20:48:53 IST 2020|
+------+----+----------------------------+

Каким будет наилучший подход для отправки/получения объектов класса Custom Case с помощью Kafka?

Попробуйте преобразовать класс case в json или avro или csv, затем отправьте сообщение в kafka и прочитайте то же сообщение с помощью spark.

person Srinivas    schedule 18.07.2020