сколько существует способов добавить новый столбец в RDD фрейма данных в Spark API?

Я могу думать только об одном, используя withColumn():

val df = sc.dataFrame.withColumn('newcolname',{ lambda row: row + 1 } ) 

но как бы я обобщил это на текстовые данные? Например, у моего DataFrame было

Строковые значения говорят: «Это пример строки», и я хотел извлечь

первое и последнее слово, как в val arraystring: Array[String] = Array(first,last)


person MrL    schedule 17.05.2016    source источник


Ответы (2)


Это то, что вы ищете?

val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val extractFirstWord = udf((sentence: String) => sentence.split(" ").head)
val extractLastWord = udf((sentence: String) => sentence.split(" ").reverse.head)

val sentences = sc.parallelize(Seq("This is an example", "And this is another one", "One_word", "")).toDF("sentence")
val splits = sentences
             .withColumn("first_word", extractFirstWord(col("sentence")))
             .withColumn("last_word", extractLastWord(col("sentence")))

splits.show()

Тогда вывод:

+--------------------+----------+---------+
|            sentence|first_word|last_word|
+--------------------+----------+---------+
|  This is an example|      This|  example|
|And this is anoth...|       And|      one|
|            One_word|  One_word| One_word|
|                    |          |         |
+--------------------+----------+---------+
person Paweł Jurczenko    schedule 17.05.2016

# Create a simple DataFrame, stored into a partition directory
df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.save("data/test_table/key=1", "parquet")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.save("data/test_table/key=2", "parquet")

# Read the partitioned table
df3 = sqlContext.parquetFile("data/test_table")
df3.printSchema()

https://spark.apache.org/docs/1.3.1/sql-programming-guide.html

person RoyaumeIX    schedule 17.05.2016