Spark DataFrame withColumn

Spark withColumn()函数用于重命名,更改值,转换现有DataFrame列的数据类型,还可以用于创建新列,在本文中,我将通过Scala和Pyspark示例向您介绍常用的DataFrame列操作。


首先,让我们创建一个要使用的DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val data = Seq(Row(Row("James ","","Smith"),"36636","M","3000"),
Row(Row("Michael ","Rose",""),"40288","M","4000"),
Row(Row("Robert ","","Williams"),"42114","M","4000"),
Row(Row("Maria ","Anne","Jones"),"39192","F","4000"),
Row(Row("Jen","Mary","Brown"),"","F","-1")
)
val schema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("dob",StringType)
.add("gender",StringType)
.add("salary",StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

Spark withColumn –更改列的数据类型


通过在DataFrame上使用Spark withColumn并在列上使用强制转换功能,我们可以更改DataFrame列的数据类型。下面的语句将“工资”列的数据类型从字符串更改为整数。

1
df.withColumn("salary",col("salary").cast("Integer"))

更改现有列的值


withColumn() 函数也可以用于更新现有列的值。为了更改值,将现有的列名作为第一个参数传递,将值分配为第二个列。请注意,第二个参数应为Columntype。

1
df.withColumn("salary",col("salary")*100)


此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。

从现有列派生新列


创建新列,请使用您希望新列使用的名称指定第一个参数,并通过对现有列进行操作来使用第二个参数来分配值。

1
df.withColumn("CopiedColumn",col("salary")* -1)


此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。


如果派生出来的列需要复杂的计算,则可以使用udf来进行转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//  利用withColumn方法,新增列的过程包含在udf函数中
val to_date_udf: (Long => String) = (timestamps: Long) => {
var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var date: String = sdf.format(new Date(timestamps*1000L))
date
}
val timestamps_2_date: UserDefinedFunction = udf(to_date_udf)


val user_behavior_dataframe: DataFrame = spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.schema(user_behavior_Schema)
.load("/Users/cpeixin/IdeaProjects/code_warehouse/data/UserBehavior_5000.csv")
.withColumn("date", timestamps_2_date(col("timestamps")))

添加一个新列


要创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。在下面的代码片段中,lit()函数用于将常量值添加到DataFrame列。我们还可以链接以添加多个列。

1
2
3
4
df.withColumn("Country", lit("USA"))
//chaining to operate on multiple columns
df.withColumn("Country", lit("USA"))
.withColumn("anotherColumn",lit("anotherValue"))

重命名DataFrame列名


要重命名现有列,请在DataFrame上使用“ withColumnRenamed ”功能。

1
df.withColumnRenamed("gender","sex")

从Spark DataFrame删除一列


使用“放置”功能从数据框中放置特定的列。

1
df.drop("CopiedColumn")

将列拆分为多列


尽管此示例未使用withColumn()函数,但我仍然觉得用map()转换函数将一个DataFrame列拆分为多个列还是很好的解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name",
"Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)


此代码段将“名称”列拆分为“名字”,“姓氏”,并将“地址”列拆分为“地址行1”,“城市”,“州”和“邮政编码”。产量低于产出:

1
2
3
4
5
6
7
8
9
10
11
12
13
root
|-- First Name: string (nullable = true)
|-- Last Name: string (nullable = true)
|-- Address Line1: string (nullable = true)
|-- City: string (nullable = true)
|-- State: string (nullable = true)
|-- zipCode: string (nullable = true)
+----------+---------+--------------+-------+-----+-------+
|First Name|Last Name|Address Line1 |City |State|zipCode|
+----------+---------+--------------+-------+-----+-------+
|Robert | Smith |1 Main st | Newark| NJ | 92537 |
|Maria | Garcia |3456 Walnut st| Newark| NJ | 94732 |
+----------+---------+--------------+-------+-----+-------+


注意:请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。

Spark withColumn完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
import org.apache.spark.sql.functions._
object WithColumn {
def main(args:Array[String]):Unit= {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
val arrayStructureData = Seq(
Row(Row("James ","","Smith"),"1","M",3100,List("Cricket","Movies"),Map("hair"->"black","eye"->"brown")),
Row(Row("Michael ","Rose",""),"2","M",3100,List("Tennis"),Map("hair"->"brown","eye"->"black")),
Row(Row("Robert ","","Williams"),"3","M",3100,List("Cooking","Football"),Map("hair"->"red","eye"->"gray")),
Row(Row("Maria ","Anne","Jones"),"4","M",3100,null,Map("hair"->"blond","eye"->"red")),
Row(Row("Jen","Mary","Brown"),"5","M",3100,List("Blogging"),Map("white"->"black","eye"->"black"))
)
val arrayStructureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("id",StringType)
.add("gender",StringType)
.add("salary",IntegerType)
.add("Hobbies", ArrayType(StringType))
.add("properties", MapType(StringType,StringType))
val df2 = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
//Change the column data type
df2.withColumn("salary",df2("salary").cast("Integer"))
//Derive a new column from existing
val df4=df2.withColumn("CopiedColumn",df2("salary")* -1)
//Transforming existing column
val df5 = df2.withColumn("salary",df2("salary")*100)
//You can also chain withColumn to change multiple columns
//Renaming a column.
val df3=df2.withColumnRenamed("gender","sex")
df3.printSchema()
//Droping a column
val df6=df4.drop("CopiedColumn")
println(df6.columns.contains("CopiedColumn"))
//Adding a literal value
df2.withColumn("Country", lit("USA")).printSchema()
//Retrieving
df2.show(false)
df2.select("name").show(false)
df2.select("name.firstname").show(false)
df2.select("name.*").show(false)
val df8 = df2.select(col("*"),explode(col("hobbies")))
df8.show(false)

//Splitting one column to multiple columns
import spark.implicits._
val columns = Seq("name","address")
val data = Seq(("Robert, Smith", "1 Main st, Newark, NJ, 92537"),
("Maria, Garcia","3456 Walnut st, Newark, NJ, 94732"))
var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()
val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split(",")
val addSplit = f.getAs[String](1).split(",")
(nameSplit(0),nameSplit(1),addSplit(0),addSplit(1),addSplit(2),addSplit(3))
})
val finalDF = newDF.toDF("First Name","Last Name",
"Address Line1","City","State","zipCode")
finalDF.printSchema()
finalDF.show(false)
}
}