Что происходит, когда вы выполняете манипуляции с данными Java в Spark за пределами RDD

Я читаю файл csv из hdfs с помощью Spark. Он входит в объект FSDataInputStream. Я не могу использовать метод textfile(), потому что он разбивает CSV-файл по переводу строки, и я читаю CSV-файл с переводом строки внутри текстовых полей. Opencsv от sourcefourge обрабатывает переводы строк внутри ячеек, это хороший проект, но он принимает Reader в качестве входных данных. Мне нужно преобразовать его в строку, чтобы я мог передать его в opencsv как StringReader. Итак, файл HDFS -> FSdataINputStream -> String -> StringReader -> список строк opencsv. Ниже приведен код...

import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder

val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt

var b = Array.fill[Byte](2048)(0)
var j = 1

val stringBuilder = new StringBuilder()
var bufferString = ""

csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)

while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}

val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)

val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD

Это работает, но я сомневаюсь, что это масштабируемо. Это заставляет меня задаться вопросом, насколько большим ограничением является наличие программы-драйвера, которая передает все данные через один jvm. Мои вопросы к тем, кто хорошо знаком с искрой:

  1. Что происходит, когда вы выполняете такие манипуляции с данными во всем наборе данных, прежде чем они попадут во входной RDD? Он просто обрабатывается как любая другая программа и будет меняться как сумасшедший, я думаю?

  2. Как бы вы тогда сделали любую программу spark масштабируемой? Всегда ли вам НУЖНО извлекать данные непосредственно во входной RDD?


person uh_big_mike_boi    schedule 25.02.2016    source источник
comment
Это где-то между слишком широким и неясным, что вы спрашиваете. Можете ли вы уточнить, в чем ваша настоящая проблема?   -  person The Archetypal Paul    schedule 25.02.2016


Ответы (2)


Ваш код загружает данные в память, а затем драйвер Spark разделяет и отправляет каждую часть данных исполнителю, потому что он не масштабируется.
Есть два способа решить ваш вопрос.

написать собственный InputFormat для поддержки формата файла CSV

import java.io.{InputStreamReader, IOException}

import com.google.common.base.Charsets
import com.opencsv.{CSVParser, CSVReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Seekable, Path, FileSystem}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable}
import org.apache.hadoop.mapred._

class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable {
  private var compressionCodecs: CompressionCodecFactory = _

  def configure(conf: JobConf) {
    compressionCodecs = new CompressionCodecFactory(conf)
  }

  protected override def isSplitable(fs: FileSystem, file: Path): Boolean = {
    val codec: CompressionCodec = compressionCodecs.getCodec(file)
    if (null == codec) {
      return true
    }
    codec.isInstanceOf[SplittableCompressionCodec]
  }

  @throws(classOf[IOException])
  def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = {
    reporter.setStatus(genericSplit.toString)
    val delimiter: String = job.get("textinputformat.record.delimiter")
    var recordDelimiterBytes: Array[Byte] = null
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8)
    }
    new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes)
  }
}

class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte])
  extends RecordReader[LongWritable, ArrayWritable] {
  private val compressionCodecs = new CompressionCodecFactory(job)
  private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
    LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE)
  private var filePosition: Seekable = _
  private val file = split.getPath
  private val codec = compressionCodecs.getCodec(file)
  private val isCompressedInput = codec != null
  private val fs = file.getFileSystem(job)
  private val fileIn = fs.open(file)

  private var start = split.getStart
  private var pos: Long = 0L
  private var end = start + split.getLength
  private var reader: CSVReader = _
  private var decompressor: Decompressor = _

  private lazy val CSVSeparator =
    if (recordDelimiter == null)
      CSVParser.DEFAULT_SEPARATOR
    else
      recordDelimiter(0).asInstanceOf[Char]

  if (isCompressedInput) {
    decompressor = CodecPool.getDecompressor(codec)
    if (codec.isInstanceOf[SplittableCompressionCodec]) {
      val cIn = (codec.asInstanceOf[SplittableCompressionCodec])
        .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
      reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator)
      start = cIn.getAdjustedStart
      end = cIn.getAdjustedEnd
      filePosition = cIn
    }else {
      reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator)
      filePosition = fileIn
    }
  } else {
    fileIn.seek(start)
    reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator)
    filePosition = fileIn
  }

  @throws(classOf[IOException])
  private def getFilePosition: Long = {
    if (isCompressedInput && null != filePosition) {
      filePosition.getPos
    }else
      pos
  }

  private def nextLine: Option[Array[String]] = {
    if (getFilePosition < end){
      //readNext automatical split the line to elements
      reader.readNext() match {
        case null => None
        case elems => Some(elems)
      }
    } else
      None
  }

  override def next(key: LongWritable, value: ArrayWritable): Boolean =
    nextLine
      .exists { elems =>
        key.set(pos)
        val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b)
        pos += lineLength
        value.set(elems.map(new Text(_)))
        if (lineLength < maxLineLength) true else false
      }

  @throws(classOf[IOException])
  def getProgress: Float =
    if (start == end)
      0.0f
    else
      Math.min(1.0f, (getFilePosition - start) / (end - start).toFloat)

  override def getPos: Long = pos

  override def createKey(): LongWritable = new LongWritable

  override def close(): Unit = {
    try {
      if (reader != null) {
        reader.close
      }
    } finally {
      if (decompressor != null) {
        CodecPool.returnDecompressor(decompressor)
      }
    }
  }

  override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text])
}

Простой тестовый пример:

val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable],
sc.defaultMinPartitions).map(_._2.get().map(_.toString))
arrayRdd.collect().foreach(e => println(e.mkString(",")))

Другой способ, который я предпочитаю, использует spark-csv, написанный блоками данных, который хорошо поддерживается для файла CSV. формате, вы можете попрактиковаться на странице github.

Обновлено для spark-csv с использованием univocity as parserLib, которая может обрабатывать многострочные ячейки

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("parserLib", "univocity")
.option("inferSchema", "true") // Automatically infer data types
.load("source path")
person taigetco    schedule 29.02.2016
comment
Я уже пробовал spark-csv, и он не обрабатывает перевод строки. Он делает новые записи, когда видит перевод строки, а не ожидает LF/CR. Он не учитывает CSV-файлы с переводом строки в одной записи. - person uh_big_mike_boi; 29.02.2016
comment
см. мои обновления выше, я пишу простой пример, используя spark-csv для обработки csv. - person taigetco; 01.03.2016

Что происходит, когда вы выполняете такие манипуляции с данными во всем наборе данных, прежде чем они попадут во входной RDD? Он просто обрабатывается как любая другая программа и будет меняться как сумасшедший, я думаю?

Вы загружаете весь набор данных в локальную память. Так что если у вас есть память, она работает.

Как бы вы тогда сделали любую программу spark масштабируемой?

Вы выбрали формат данных, который может загружать spark, или вы изменили свое приложение, чтобы оно могло напрямую загружать формат данных в spark или и то, и другое.

В этом случае вы можете создать собственный InputFormat, который разбивается не на новую строку, а на что-то другое. Я думаю, вы хотели бы также посмотреть, как вы записываете свои данные, чтобы они были разделены в HDFS на границах записи, а не на новых строках.

Однако я подозреваю, что самый простой ответ - кодировать данные по-другому. Строки JSON или кодировать новые строки в файле CSV во время записи или Avro или... Все, что лучше подходит для Spark и HDFS.

person Michael Lloyd Lee mlk    schedule 26.02.2016