У меня есть несколько классов Avro, которые я создал, и теперь я пытаюсь использовать их в Spark. Итак, я импортировал свой сгенерированный в avro класс java, «twitter_schema», и обращаюсь к нему при десериализации. Кажется, работает, но в конце возникает исключение Cast.
Моя схема:
$ подробнее twitter.avsc
{"type": "record", "name": "twitter_schema", "namespace": "com.miguno.avro", "fields": [{"name": "имя пользователя", "type": "строка" , "doc": "Имя учетной записи пользователя на Twitter.com"}, {"name": "tweet", "type": "string", "doc": "Содержание сообщения пользователя в Twitter"}, {"name": "timestamp", "type": "long", "doc": "Время эпохи Unix в секундах"}], "doc:": "Базовая схема для хранения сообщений Twitter"}
Мой код:
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import com.miguno.avro.twitter_schema
val path = "/app/avro/data/twitter.avro"
val conf = new Configuration
var avroRDD = sc.newAPIHadoopFile(path,classOf[AvroKeyInputFormat[twitter_schema]],
classOf[AvroKey[ByteBuffer]], classOf[NullWritable], conf)
var avroRDD = sc.hadoopFile(path,classOf[AvroInputFormat[twitter_schema]],
classOf[AvroWrapper[twitter_schema]], classOf[NullWritable], 5)
avroRDD.map(l => {
//transformations here
new String(l._1.datum.username)
}
).first
И я получаю ошибку в последней строке:
scala> avroRDD.map(l => {
| new String(l._1.datum.username)}).first
<console>:30: error: overloaded method constructor String with alternatives:
(x$1: StringBuilder)String <and>
(x$1: StringBuffer)String <and>
(x$1: Array[Byte])String <and>
(x$1: Array[Char])String <and>
(x$1: String)String
cannot be applied to (CharSequence)
new String(l._1.datum.username)}).first
Что я делаю не так - не понимаю ошибки? Это правильный способ десериализации? Я читал о Kryo, но, похоже, это усложняет ситуацию, и читал о том, что контекст Spark SQL принимает Avro в 1.2, но это звучит как ограничение производительности / обходной путь .. Лучшие практики для этого?
спасибо, Мэтт