PySpark 함수 정리

  • 최근 Pyspark로 코딩을 하는 일이 많은데, 계속 문법이 헷갈려 오류가 나는 경우가 많았습니다. 이 하나의 포스트로 기본 Pyspark 함수들을 정리하는 것이 목표입니다.
  • Spark By Examples에 pyspark 튜토리얼이 잘 나와있기에 이를 활용하였습니다. (이 페이지에 광고만 많지 않으면 정리할 필요도 없을텐데)

Intro: Pyspark에 친해지기 어려운 이유

R 유저라 그런가? Pyspark는 항상 쓸 때마다 쉬운 함수도 오류가 나고 친해지기 어려웠습니다.

우선 R에서는 라이브러리를 불러오고 나서 다시 찾을 일이 없는데, Pyspark나 Python 모두 라이브러리를 참조해서 함수를 불러오는 경우가 종종 있었습니다.

예를 들어, 다음과 같이 두 방법으로 모듈을 불러왔을 때

1
2
from pyspark.sql.functions import *
import pyspark.sql.functions as F

전자는 from으로 모든 걸 import 했기 때문에 그대로 sum(), avg(), max()와 같은 함수를 쓰면 되는 반면에,
후자는 F.sum(), F.avg(), F.max()와 같이 모듈명을 함수 앞에 붙여줘야 한다는 점이 어색했습니다. 그래서 이 포스트에서는 전자로 함수를 모두 불러와서 사용하였습니다.

또 하나, Pyspark를 사용하면서 가장 헷갈렸던 점은 컬럼을 부르는 방법입니다.
참 기본적인 것인데 헷갈렸던 이유를 생각해보면 pyspark는 똑같은 질의이지만 여러 표현으로 가능한 관용적인 언어였기 때문입니다.

지금까지 찾아낸 "똑같은 질의를 하고 있지만 여러 표현으로 가능한 부분"은 다음과 같습니다.

  1. 컬럼을 참조할 때 다음의 네 가지 표현이 가능합니다.
    마치 Python에서 컬럼을 불러올 때 .이나 []을 통해 불러올 수 있듯이 pyspark도 표현이 가능합니다. 또한 마지막의 경우 pyspark.sql.functions 모듈을 불러왔을 때 col()함수를 사용하여 불러올 수 있습니다.
    • "col"
    • df.col
    • df["col"]
    • col("col")
  2. orderBysort는 같은 결과를 냅니다. 아마도 SQL 기반의 ORDER BY와 Python 기반의 sort을 모두 고려했기 때문이라 생각합니다.
  3. filterwhere는 같은 결과를 냅니다. 이 또한 where는 SQL 기반의 함수를 고려하였기 때문입니다.
  4. unionunionAll은 같은 결과를 냅니다. 이건 좀 의외인데 SQL에서는 UNION 연산은 distinct한 데이터만 가져와 합쳐주는 것인 반면, Pyspark에서는 union, unionAll 모두 UNION ALL을 의미한다 합니다.

이러한 점들을 새기고 나면 그래도 차후엔 유의해서 잘 사용할 수 있을 것이라 생각합니다.


Pyspark 함수 알아보기

자, 그럼 본격적으로 Pyspark 함수들을 알아봅시다!
먼저 기본 함수입니다.

기본 함수

함수명 설명
printSchema() 테이블의 스키마를 보여주는 함수
collect() 테이블에서 행을 가져오는 함수
show(truncate=False) 테이블 결과를 보여주는 함수, truncate = False를 사용하면 테이블 내용이 잘리지 않도록 보여줍니다.
describe() 서머리 결과를 보여주는 함수

SQL 함수

Pyspark의 함수는 대부분 SQL 언어와 비슷하게 구성되어 있습니다.
SQL 언어와 비교하며 다음의 함수들에 대해 설명하겠습니다.

Pyspark 함수명 SQL clause
select / drop SELECT
count / countDistinct COUNT(*) / COUNT(DISTINCT *)
withColumn / withColumnRenamed 연산 as col
filter (where) WHERE
groupBy GROUP BY
orderBy ORDER BY
join (how = {inner, full, left, right}) {INNER, FULL, LEFT, RIGHT} JOIN
union / unionByName UNION ALL
pivot PIVOT
row_number().over(Window.partitionBy().orderBy()) ROW_NUMBER() OVER(PARTITION BY ORDER BY)

먼저 위 함수를 설명할 테이블을 불러오겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import *
data = [
(("James","","Smith"),["Java","Scala","C++"],"OH","M"),
(("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
(("Julia","","Williams"),["CSharp","VB"],"OH","F"),
(("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
(("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
(("Mike","Mary","Williams"),["Python","VB"],"OH","M")
]

schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('languages', ArrayType(StringType()), True),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])

df = spark.createDataFrame(data = data, schema = schema)
df.show(truncate=False)
1
2
3
4
5
6
7
8
9
10
+----------------------+------------------+-----+------+
|name |languages |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith} |[Java, Scala, C++]|OH |M |
|{Anna, Rose, } |[Spark, Java, C++]|NY |F |
|{Julia, , Williams} |[CSharp, VB] |OH |F |
|{Maria, Anne, Jones} |[CSharp, VB] |NY |M |
|{Jen, Mary, Brown} |[CSharp, VB] |NY |M |
|{Mike, Mary, Williams}|[Python, VB] |OH |M |
+----------------------+------------------+-----+------+

select와 drop

select는 다음과 같은 SQL 질의를 표현합니다.

1
df.select("name").show()

drop은 특정 컬럼만 제외하고 불러올 때 사용합니다.

1
df.drop("name").show() ## name을 제외한 컬럼을 불러오기

count와 countDistinct

1
df.select(count("state"), countDistinct("state")).show()

withColumn과 withColumnRenamed

withColumn은 컬럼의 정보를 바꾸고자 할 때 혹은 새로운 컬럼을 추가할 때 사용합니다.

withColumn으로 컬럼 타입 바꾸기

1
df.withColumn("state", df.state.cast("String")).show()

withColumn으로 컬럼 추가하기

1
2
df.withColumn("Country", lit("U.S.A")).show() 
# lit: 문자열(literal value)로 새로운 컬럼을 만들 때 사용하는 함수

withColumn으로 CASE WHEN 구문 쓰기

whenotherwise 함수를 사용합니다.

1
2
3
4
5
6
df.withColumn("gender"
, when(df.gender == "F", "Female")
.when(df.gender=="M", "Male")
.when(df.gender.isNull(), "")
.otherwise(df.gender)
).show()```

이를 SQL로 쓰면 다음과 같습니다.

SQL

1
2
3
4
5
6
7
SELECT *
, CASE WHEN gender = 'F' THEN 'Female'
WHEN gender = 'M' THEN 'Male'
WHEN gender IS NULL THEN ''
ELSE gender
END as gender2
FROM df

withColumnRenamed

withColumnRenamed는 컬럼명을 바꾸고 싶을 때 사용합니다.

1
2
# withColumnRenamed("변경 전", "변경 후")
df.withColumnRenamed("name", "Name").show()

filter

filterwhere함수와 같으나 이 포스트에서는 filter로만 사용하겠습니다.

1
2
3
df.filter(df.gender == "M").show() # Python처럼 쓰기
df.filter("gender = 'M'").show() # SQL처럼 쓰기
#df.where(df.gender == "M").show() # where도 똑같은 결과를 낸다!

Pyspark가 Python스럽기도 하고 SQL스럽기도 해서 여러 표현을 허용하다보니 "="의 개수, 따옴표의 위치 등이 헷갈릴 수 있는데요.
Python처럼 쓰거나 SQL처럼 쓴다 생각하고 하나로 스타일을 통일해줌이 좋을 것 같습니다.

여러 조건 필터링

filter 함수 안에

  • (조건1) & (조건2)(조건1) | (조건2)를 사용하거나
  • 조건1 and 조건2 의 형태로 필터링합니다.
1
2
df.filter((df.state == "OH") & (df.gender == "M")).show() # Python-like
df.filter("state = 'OH' and gender = 'M'").show() # SQL-like

IN() 조건 필터링

1
df.filter(df.state.isin([('OH','CA',"DE")])).show()

NOT IN ()은 isin() 함수 앞에 ~ (NOT 연산자)를 사용합니다.

1
df.filter(~ df.state.isin([('OH','CA',"DE")])).show()

LIKE 조건 필터링

아래와 같은 함수를 사용할 수 있습니다.

  • startswith
  • endswith
  • contains
  • like
1
2
df.filter(df.state.startswith("N")).show()
df.filter(df.state.like("N%")).show()
1
2
df.filter(df.state.endswith("H")).show()
df.filter(df.state.like("%H")).show()
1
2
df.filter(df.state.contains("H")).show()
df.filter(df.state.like("%H%")).show()

StructType, ArrayType 필터링

1
df.printSchema()

df의 스키마를 살펴보면 name 컬럼은 struct 타입, languagearray 타입입니다.

1
2
3
4
5
6
7
8
9
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- languages: array (nullable = true)
| |-- element: string (containsNull = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
1
2
# struct 타입을 필터링 할 때는 컬럼.컬럼으로 
df.filter(df.name.lastname =="Williams").show()
1
2
# array 타입을 필터링할 때는 array_contains() 함수 사용
df.filter(array_contains(df.languages,'Java')).show()

groupBy

groupBy를 위해 다음의 데이터프레임을 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.show(truncate=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James |Sales |NY |90000 |34 |10000|
|Michael |Sales |NY |86000 |56 |20000|
|Robert |Sales |CA |81000 |30 |23000|
|Maria |Finance |CA |90000 |24 |23000|
|Raman |Finance |CA |99000 |40 |24000|
|Scott |Finance |NY |83000 |36 |19000|
|Jen |Finance |NY |79000 |53 |15000|
|Jeff |Marketing |CA |80000 |25 |18000|
|Kumar |Marketing |NY |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

아래처럼 groupBy를 이용해 개수, 합계, 최소, 최대, 평균을 낼 수 있습니다.

1
2
3
4
5
df.groupBy("department").count().show()
df.groupBy("department").sum("salary").show()
df.groupBy("department").min("salary").show()
df.groupBy("department").max("salary").show()
df.groupBy("department").avg("salary").show()

개수, 합계, 최소, 최대, 평균을 각 컬럼에 할당하여 저장하려면 agg 함수로 묶고 alias로 컬럼명을 설정하여 사용합니다.

1
2
3
4
5
(df.groupBy("department")
.agg(sum("salary").alias("sum_salary")
, avg("salary").alias("avg_salary")
, sum("bonus").alias("sum_bonus")
, max("bonus").alias("max_bonus"))).show()

또한 여러 개의 컬럼을 이용하여 groupBy를 하고 여러 컬럼을 집계할 경우 다음과 같이 씁니다.

1
df.groupBy("department","state").sum("salary","bonus").show()

orderBy

1
df.orderBy(asc("state"),desc("gender")).show() #asc: 오름차순, desc: 내림차순

join

다음과 같이 empDFdeptDF를 정의하였습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
emp = [(1,"Smith",-1,"2018","10","M",3000), \
(2,"Rose",1,"2010","20","M",4000), \
(3,"Williams",1,"2010","10","M",1000), \
(4,"Jones",2,"2005","10","F",2000), \
(5,"Brown",2,"2010","40","",-1), \
(6,"Brown",2,"2010","50","",-1) \
]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
"emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.show(truncate=False)

dept = [("Finance",10), \
("Marketing",20), \
("Sales",30), \
("IT",40) \
]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show(truncate=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1 |Smith |-1 |2018 |10 |M |3000 |
|2 |Rose |1 |2010 |20 |M |4000 |
|3 |Williams|1 |2010 |10 |M |1000 |
|4 |Jones |2 |2005 |10 |F |2000 |
|5 |Brown |2 |2010 |40 | |-1 |
|6 |Brown |2 |2010 |50 | |-1 |
+------+--------+---------------+-----------+-----------+------+------+
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance |10 |
|Marketing|20 |
|Sales |30 |
|IT |40 |
+---------+-------+

empDFemp_dept_id deptDFdept_id가 조인 키가 되어 조인을 할 수 있는데요.
아래와 같이 inner, full, left join을 시행할 수 있습니다.

1
2
3
4
5
6
# INNER JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id).show()
# FULL JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "full").show()
# LEFT JOIN
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left").show()

만약 조인키 이름이 같다면 어떻게 할까요?
공통된 key인 dept_idon 조건에 적어주면 됩니다.

1
2
3
4
(empDF
.withColumnRenamed("emp_dept_id", "dept_id")
.join(deptDF, on = 'dept_id')
).show(truncate=False)

만약 조인 키가 여러 개라면 ?
아래처럼 key 이름이 같다면 리스트로 받아주거나
& 조건을 이용해서 on 절을 적어줍니다.

1
2
3
df1.join(df2, on = ['key1','key2'], how="inner")
df1.join(df2, on = (df1.key11 == df2.key12) \
& (df1.key21 == df2.key22), how="inner")

또한, SQL에 존재하지 않는 left_anti 조인도 존재합니다.

1
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left_anti").show()
1
2
3
4
5
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6 |Brown|2 |2010 |50 | |-1 |
+------+-----+---------------+-----------+-----------+------+------+

이를 SQL로 표현하면 다음과 같습니다.

SQL

1
2
3
4
5
SELECT *
FROM empDF A
LEFT JOIN deptDF B
ON A.emp_dept_id = B.dept_id
WHERE B.dept_id IS NULL

union / unionByName

다음은 union 입니다. union은 SQL의 UNION과 달리 distinct 한 행만 union을 해주는 것이 아니라,
중복된 행을 허용하여 union을 해줍니다. (즉, UNION ALL과 같습니다)

1
2
3
4
5
6
7
8
9
10
11
12
data = [("James","Sales",34), ("Michael","Sales",56), \
("Robert","Sales",30), ("Maria","Finance",24) ]
columns= ["name","dept","age"]
df1 = spark.createDataFrame(data = data, schema = columns)
df1.show()

#Create DataFrame df1 with columns name,dep,state & salary
data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \
("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)]
columns2= ["name","dept","state","salary"]
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+-------+-------+---+
| name| dept|age|
+-------+-------+---+
| James| Sales| 34|
|Michael| Sales| 56|
| Robert| Sales| 30|
| Maria|Finance| 24|
+-------+-------+---+

+-----+---------+-----+------+
| name| dept|state|salary|
+-----+---------+-----+------+
|James| Sales| NY| 9000|
|Maria| Finance| CA| 9000|
| Jen| Finance| NY| 7900|
| Jeff|Marketing| CA| 8000|
+-----+---------+-----+------+

union은 기본적으로

  • 컬럼의 개수가 같아야 하며,
  • 컬럼 순서도 동일하게 두어야 제대로된 결과를 얻을 수 있습니다.

위 테이블에서는 df1에서 state, salary 컬럼이 빠져있고, df2에서는 age 컬럼이 없어 union을 할 수가 없는데요.
이를 위해 각 테이블에서 없는 컬럼을 생성해주는 함수를 짰습니다.

1
2
3
4
5
6
7
for column in [column for column in df2.columns if column not in df1.columns]:
df1 = df1.withColumn(column, lit(None))

for column in [column for column in df1.columns if column not in df2.columns]:
df2 = df2.withColumn(column, lit(None))
df1.show()
df2.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+-------+-------+---+-----+------+
| name| dept|age|state|salary|
+-------+-------+---+-----+------+
| James| Sales| 34| null| null|
|Michael| Sales| 56| null| null|
| Robert| Sales| 30| null| null|
| Maria|Finance| 24| null| null|
+-------+-------+---+-----+------+

+-----+---------+-----+------+----+
| name| dept|state|salary| age|
+-----+---------+-----+------+----+
|James| Sales| NY| 9000|null|
|Maria| Finance| CA| 9000|null|
| Jen| Finance| NY| 7900|null|
| Jeff|Marketing| CA| 8000|null|
+-----+---------+-----+------+----+

자, 여기서 UNION을 그대로 하면 왼쪽 테이블의 컬럼 순서대로 UNION이 되어서 잘못된 결과를 냅니다.
age컬럼에 state값이 들어가 있죠.

1
df1.union(df2).show()
1
2
3
4
5
6
7
8
9
10
11
12
+-------+---------+---+-----+------+
| name| dept|age|state|salary|
+-------+---------+---+-----+------+
| James| Sales| 34| null| null|
|Michael| Sales| 56| null| null|
| Robert| Sales| 30| null| null|
| Maria| Finance| 24| null| null|
| James| Sales| NY| 9000| null|
| Maria| Finance| CA| 9000| null|
| Jen| Finance| NY| 7900| null|
| Jeff|Marketing| CA| 8000| null|
+-------+---------+---+-----+------+

이를 제대로 UNION 해주려면 다음과 같이 컬럼 순서를 조정해주어야 합니다.

1
df1.select("name","dept","state","salary","age").union(df2).show()

그러나, unionByName을 이용하면 컬럼 순서를 조정할 필요 없이 컬럼 이름에 맞게 UNION을 해주기 때문에,
편리하게 UNION을 할 수 있습니다.

1
df1.unionByName(df2).show()
1
2
3
4
5
6
7
8
9
10
11
12
+-------+---------+----+-----+------+
| name| dept| age|state|salary|
+-------+---------+----+-----+------+
| James| Sales| 34| null| null|
|Michael| Sales| 56| null| null|
| Robert| Sales| 30| null| null|
| Maria| Finance| 24| null| null|
| James| Sales|null| NY| 9000|
| Maria| Finance|null| CA| 9000|
| Jen| Finance|null| NY| 7900|
| Jeff|Marketing|null| CA| 8000|
+-------+---------+----+-----+------+

pivot / unpivot

pivot

다음은 pivot입니다.
pivot은 long 테이블을 wide 하게 바꿔주고, unpivot은 wide 한 테이블을 long 하게 바꿔주는 것을 의미하는데요.

예시로 보면 더 쉽습니다. 아래의 예시는 long 형태의 테이블입니다.

1
2
3
4
5
6
7
8
9
10
11
12
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000 |USA |
|Carrots|1500 |USA |
|Beans |1600 |USA |
|Orange |2000 |USA |
|Orange |2000 |USA |
|Banana |400 |China |
|Carrots|1200 |China |
|Beans |1500 |China |
|Orange |4000 |China |
|Banana |2000 |Canada |
|Carrots|2000 |Canada |
|Beans |2000 |Mexico |
+-------+------+-------+

이를 wide 하게 바꿔주려면 Product를 열, Country를 행으로 해서 Amount의 합계를 구해줄 수 있겠죠.
Summary용 테이블이라 보셔도 무방합니다.

이를 위해 ProductgroupBy를 한 후 Countrypivot을 하여 Amount의 합계를 구해줍니다.

1
2
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show(truncate=False)
1
2
3
4
5
6
7
8
+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null |4000 |null |4000|
|Beans |null |1500 |2000 |1600|
|Banana |2000 |400 |null |1000|
|Carrots|2000 |1200 |null |1500|
+-------+------+-----+------+----+

혹은 이를 더 가벼운 연산으로 계산하려면 pivot()의 주체에 해당하는 컬럼명을 명시해주면 좋다고 합니다.

1
2
3
countries = ["USA","China","Canada","Mexico"]
pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show(truncate=False)

unpivot

Pyspark에서는 unpivot 함수를 지원하고 있지 않기 때문에 expr를 이용하여 다시 long 형태의 테이블로 변형할 수 있습니다.

1
2
3
4
5
6
7
unpivotExpr = "stack(4,'USA',USA,'Canada',Canada,'China',China,'Mexico',Mexico) as (Country, Total)"
unPivotDF = (pivotDF
.select("Product", expr(unpivotExpr))
.filter("total is not null")
.orderBy("Product")
)
unPivotDF.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+-------+-------+-----+
|Product|Country|Total|
+-------+-------+-----+
| Banana| China| 400|
| Banana| Canada| 2000|
| Banana| USA| 1000|
| Beans| USA| 1600|
| Beans| Mexico| 2000|
| Beans| China| 1500|
|Carrots| China| 1200|
|Carrots| USA| 1500|
|Carrots| Canada| 2000|
| Orange| USA| 4000|
| Orange| China| 4000|
+-------+-------+-----+

row_number

사실 row_number 외에도 많은 Window 함수들이 있지만 오늘은 row_number만 소개하고자 합니다.
추가로 보실 분은 이 곳을 참조해주세요.

pivot을 배울 때 썼던 데이터프레임을 그대로 쓰겠습니다.

1
2
3
4
5
6
from pyspark.sql.window import Window 
(
df.withColumn('row_num'
, row_number().over(Window.partitionBy('Product').orderBy(desc('Amount'))))
.show()
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
+-------+------+-------+-------+
|Product|Amount|Country|row_num|
+-------+------+-------+-------+
| Banana| 2000| Canada| 1|
| Banana| 1000| USA| 2|
| Banana| 400| China| 3|
| Beans| 2000| Mexico| 1|
| Beans| 1600| USA| 2|
| Beans| 1500| China| 3|
|Carrots| 2000| Canada| 1|
|Carrots| 1500| USA| 2|
|Carrots| 1200| China| 3|
| Orange| 4000| China| 1|
| Orange| 2000| USA| 2|
| Orange| 2000| USA| 3|
+-------+------+-------+-------+

마치며

오늘은 최대한 기초적인 함수 위주로 다 소개하는 것을 목표로 하고 글을 썼는데요. 역시 깁니다.
이 글을 보면서 pyspark 를 능수능란하게 쓰는 그 날을 고대하며! 이 글을 마칩니다 ~!