У меня есть класс случая:
case class clickStream(userid:String, adId :String, timestamp:String)
экземпляр которого я хочу отправить с KafkaProducer как:
val record = new ProducerRecord[String,clickStream](
"clicktream",
"data",
clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)
который отправляет запись в виде строки, как и ожидалось, в очереди TOPIC:
clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)
Однако проблема на стороне потребителя:
val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
clickStreamDF
.select($"value".as("string"))
.as[clickStream] //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
По-видимому, использование API .as[clickStream] не работает, так как исключение:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];
Вот что содержит столбец [value]:
Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+
Я пытался использовать Пользовательский сериализатор как value.serializer и value.deserializer.
Но столкнулся с другой проблемой ClassNotFoundException в моей структуре каталогов.
У меня есть 3 вопроса:
Как Kafka использует здесь класс Custom Deserializer для синтаксического анализа объекта?
Я не совсем понимаю концепцию кодировщиков и как их можно использовать в данном случае?
Как лучше всего отправлять/получать объекты Custom Case Class с помощью Kafka?