Попытка десериализовать Avro в Spark с определенным типом

У меня есть несколько классов Avro, которые я создал, и теперь я пытаюсь использовать их в Spark. Итак, я импортировал свой сгенерированный в avro класс java, «twitter_schema», и обращаюсь к нему при десериализации. Кажется, работает, но в конце возникает исключение Cast.

Моя схема:

$ подробнее twitter.avsc

{"type": "record", "name": "twitter_schema", "namespace": "com.miguno.avro", "fields": [{"name": "имя пользователя", "type": "строка" , "doc": "Имя учетной записи пользователя на Twitter.com"}, {"name": "tweet", "type": "string", "doc": "Содержание сообщения пользователя в Twitter"}, {"name": "timestamp", "type": "long", "doc": "Время эпохи Unix в секундах"}], "doc:": "Базовая схема для хранения сообщений Twitter"}

Мой код:

import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import com.miguno.avro.twitter_schema

val path = "/app/avro/data/twitter.avro"
val conf = new Configuration
var avroRDD = sc.newAPIHadoopFile(path,classOf[AvroKeyInputFormat[twitter_schema]], 
classOf[AvroKey[ByteBuffer]], classOf[NullWritable], conf)
var avroRDD = sc.hadoopFile(path,classOf[AvroInputFormat[twitter_schema]], 
classOf[AvroWrapper[twitter_schema]], classOf[NullWritable], 5)

avroRDD.map(l => { 
      //transformations here
      new String(l._1.datum.username)
}
).first  

И я получаю ошибку в последней строке:

scala> avroRDD.map(l => { 
     |       new String(l._1.datum.username)}).first
<console>:30: error: overloaded method constructor String with alternatives:
  (x$1: StringBuilder)String <and>
  (x$1: StringBuffer)String <and>
  (x$1: Array[Byte])String <and>
  (x$1: Array[Char])String <and>
  (x$1: String)String
 cannot be applied to (CharSequence)
                    new String(l._1.datum.username)}).first

Что я делаю не так - не понимаю ошибки? Это правильный способ десериализации? Я читал о Kryo, но, похоже, это усложняет ситуацию, и читал о том, что контекст Spark SQL принимает Avro в 1.2, но это звучит как ограничение производительности / обходной путь .. Лучшие практики для этого?

спасибо, Мэтт


person matthieu lieber    schedule 07.01.2015    source источник


Ответы (2)


Я думаю, ваша проблема в том, что avro десериализовал строку в CharSequence, но искал ожидаемую строку java. У Avro есть 3 способа десериализации строки в java: в CharSequence, в String и в UTF8 (класс avro для хранения строк, вроде текста Hadoop).

Вы контролируете это, добавляя свойство avro.java.string в вашу схему avro. Возможные значения (с учетом регистра): «String», «CharSequence», «Utf8». Возможно, есть способ управлять этим динамически через формат ввода, но я точно не знаю.

person miljanm    schedule 09.01.2015
comment
спасибо за Ваш ответ; я изменил свою схему на то, что вы сказали; теперь я получаю: ‹console›: 34: error: перегруженный конструктор метода Строка с альтернативами: (x $ 1: StringBuilder) String ‹and› (x $ 1: StringBuffer) String ‹and› (x $ 1: Array [Byte]) Строка ‹And› (x $ 1: Array [Char]) String ‹and› (x $ 1: String) Строка не может быть применена к (Object) new String (l._1.datum.get (имя пользователя)) ^ - person matthieu lieber; 13.01.2015
comment
Честно говоря, я не использовал Spark, поэтому не знаю, что означают эти выходы. Но похоже, что он пытается построить String с неправильным конструктором ... Вы пробовали все возможные значения для этого свойства? - person miljanm; 13.01.2015

Хорошо, поскольку CharSequence - это интерфейс для String, я могу сохранить свою схему Avro такой, какой она была, и просто сделать мою строку Avro String через toString (), то есть:

scala> avroRDD.map(l => {
     | new String(l._1.datum.get("username").toString())
     | } ).first
res2: String = miguno
person matthieu lieber    schedule 12.01.2015