Как преобразовать искровой DataFrame в RDD mllib LabeledPoints?

Я попытался применить PCA к своим данным, а затем применить RandomForest к преобразованным данным. Тем не менее, PCA.transform(data) дал мне DataFrame, но мне нужна библиотека LabeledPoints для библиотеки RandomForest. Как я могу это сделать? Мой код:

    import org.apache.spark.mllib.util.MLUtils
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.mllib.tree.RandomForest
    import org.apache.spark.mllib.tree.model.RandomForestModel
    import org.apache.spark.ml.feature.PCA
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.linalg.Vectors


    val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2")

    val splits = dataset.randomSplit(Array(0.7, 0.3))

    val (trainingData, testData) = (splits(0), splits(1))

    val trainingDf = trainingData.toDF()

    val pca = new PCA()
    .setInputCol("features")
    .setOutputCol("pcaFeatures")
    .setK(100)
    .fit(trainingDf)

    val pcaTrainingData = pca.transform(trainingDf)

    val numClasses = 10
    val categoricalFeaturesInfo = Map[Int, Int]()
    val numTrees = 10 // Use more in practice.
    val featureSubsetStrategy = "auto" // Let the algorithm choose.
    val impurity = "gini"
    val maxDepth = 20
    val maxBins = 32

    val model = RandomForest.trainClassifier(pcaTrainingData, numClasses, categoricalFeaturesInfo,
        numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)


     error: type mismatch;
     found   : org.apache.spark.sql.DataFrame
     required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]

Я попробовал следующие два возможных решения, но они не сработали:

 scala> val pcaTrainingData = trainingData.map(p => p.copy(features = pca.transform(p.features)))
 <console>:39: error: overloaded method value transform with alternatives:
   (dataset: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame <and>
   (dataset: org.apache.spark.sql.DataFrame,paramMap: org.apache.spark.ml.param.ParamMap)org.apache.spark.sql.DataFrame <and>
   (dataset: org.apache.spark.sql.DataFrame,firstParamPair: org.apache.spark.ml.param.ParamPair[_],otherParamPairs: org.apache.spark.ml.param.ParamPair[_]*)org.apache.spark.sql.DataFrame
  cannot be applied to (org.apache.spark.mllib.linalg.Vector)

А также:

     val labeled = pca
    .transform(trainingDf)
    .map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector[Int]]))

     error: type mismatch;
     found   : scala.collection.immutable.Vector[Int]
     required: org.apache.spark.mllib.linalg.Vector

(В приведенном выше случае я импортировал org.apache.spark.mllib.linalg.Vectors)

Любая помощь?


person Tianyi Wang    schedule 13.03.2016    source источник
comment
ваш код отлично работает для меня (как есть, без двух попыток решения). Я предполагаю, может быть, вы ошиблись в одном из импортов? Я использую import org.apache.spark.ml.feature.PCA, import org.apache.spark.mllib.util.MLUtils. Я запустил его с этим файлом: csie.ntu. edu.tw/~cjlin/libsvmtools/datasets/multiclass/   -  person Tzach Zohar    schedule 13.03.2016
comment
@TzachZohar О, у меня тот же импорт, что и у вас, и я отредактировал свой вопрос, добавив их. Я также использовал тот же файл данных. Это было из-за того, что я запускал в оболочке, а не в искровом режиме, поэтому это не сработало?   -  person Tianyi Wang    schedule 13.03.2016
comment
Почему все минусы? Кажется резонным вопросом.   -  person WestCoastProjects    schedule 29.05.2016


Ответы (1)


Правильный подход здесь - второй, который вы пробовали, - сопоставление каждого Row с LabeledPoint для получения RDD[LabeledPoint]. Однако у него есть две ошибки:

  1. Правильный класс Vector (org.apache.spark.mllib.linalg.Vector) НЕ принимает аргументы типа (например, Vector[Int]) - поэтому, даже если у вас был правильный импорт, компилятор пришел к выводу, что вы имели в виду scala.collection.immutable.Vector, который ДЕЛАЕТ.
  2. DataFrame, возвращенный из PCA.fit(), имеет 3 столбца, и вы попытались извлечь столбец номер 4. Например, показаны первые 4 строки:

    +-----+--------------------+--------------------+
    |label|            features|         pcaFeatures|
    +-----+--------------------+--------------------+
    |  5.0|(780,[152,153,154...|[880.071111851977...|
    |  1.0|(780,[158,159,160...|[-41.473039034112...|
    |  2.0|(780,[155,156,157...|[931.444898405036...|
    |  1.0|(780,[124,125,126...|[25.5114585648411...|
    +-----+--------------------+--------------------+
    

    Чтобы упростить задачу, я предпочитаю использовать имена столбцов вместо их индексов.

Итак, вот преобразование, которое вам нужно:

val labeled = pca.transform(trainingDf).rdd.map(row => LabeledPoint(
   row.getAs[Double]("label"),   
   row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures")
))
person Tzach Zohar    schedule 14.03.2016