Не найден класс Java, соответствующий Product with Serializable with Base

Я написал two case class, который расширяет базу abstract class. У меня есть два списка каждого класса (listA и listB). Когда я хочу объединить эти два списка, я не могу преобразовать окончательный список в набор данных Apache Spark 1.6.1.

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()

Apache Spark вызовет это исключение:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
    at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)

Когда я хочу создать RDD из list, Spark не выдает никаких исключений, но когда я конвертирую RDD в набор данных с помощью метода toDS(), это предыдущее исключение будет выдано.


person Milad Khajavi    schedule 29.05.2016    source источник
comment
Я не уверен, что миксины еще поддерживаются кодировщиками DataSet.   -  person Yuval Itzchakov    schedule 29.05.2016


Ответы (1)


Во-первых, вы можете получить более разумный тип для list, явно сделав его List[Base] или добавив Base extends Product with Serializable, если намерение состоит в том, чтобы оно расширялось только классами/объектами case. Но этого недостаточно, потому что

Spark 1.6 поставляется с поддержкой автоматического создания кодировщиков для широкого множество типов, включая примитивные типы (например, String, Integer, Long), классы case Scala и Java Beans.

Обратите внимание, что абстрактные классы, такие как Base, не поддерживаются. Пользовательские кодировщики также не поддерживаются. Хотя вы можете попробовать использовать кодировщик kryo (или javaSerialization, в крайнем случае), см. Как хранить пользовательские объекты в наборе данных?.

Вот полный рабочий пример:

abstract class Base extends Serializable with Product

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS
person Alexey Romanov    schedule 29.05.2016
comment
Это решение не сработало, но кодировщик работает очень хорошо. - person Milad Khajavi; 30.05.2016
comment
Второй абзац фактически объясняет, почему первого недостаточно. Исправил текст, чтобы было понятнее. - person Alexey Romanov; 30.05.2016
comment
Так что Scala не выведет Base with Product, если вы не укажете тип для list (или где-то еще). Вы имеете в виду, что он не работает, если добавить with Product? - person Alexey Romanov; 30.05.2016