Я читаю файл csv из hdfs с помощью Spark. Он входит в объект FSDataInputStream. Я не могу использовать метод textfile(), потому что он разбивает CSV-файл по переводу строки, и я читаю CSV-файл с переводом строки внутри текстовых полей. Opencsv от sourcefourge обрабатывает переводы строк внутри ячеек, это хороший проект, но он принимает Reader в качестве входных данных. Мне нужно преобразовать его в строку, чтобы я мог передать его в opencsv как StringReader. Итак, файл HDFS -> FSdataINputStream -> String -> StringReader -> список строк opencsv. Ниже приведен код...
import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder
val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt
var b = Array.fill[Byte](2048)(0)
var j = 1
val stringBuilder = new StringBuilder()
var bufferString = ""
csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)
while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}
val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)
val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD
Это работает, но я сомневаюсь, что это масштабируемо. Это заставляет меня задаться вопросом, насколько большим ограничением является наличие программы-драйвера, которая передает все данные через один jvm. Мои вопросы к тем, кто хорошо знаком с искрой:
Что происходит, когда вы выполняете такие манипуляции с данными во всем наборе данных, прежде чем они попадут во входной RDD? Он просто обрабатывается как любая другая программа и будет меняться как сумасшедший, я думаю?
Как бы вы тогда сделали любую программу spark масштабируемой? Всегда ли вам НУЖНО извлекать данные непосредственно во входной RDD?