최근 Pyspark로 코딩을 하는 일이 많은데, 계속 문법이 헷갈려 오류가 나는 경우가 많았습니다. 이 하나의 포스트로 기본 Pyspark 함수들을 정리하는 것이 목표입니다.
Spark By Examples 에 pyspark 튜토리얼이 잘 나와있기에 이를 활용하였습니다. (이 페이지에 광고만 많지 않으면 정리할 필요도 없을텐데 )
Intro: Pyspark에 친해지기 어려운 이유
R 유저라 그런가? Pyspark는 항상 쓸 때마다 쉬운 함수도 오류가 나고 친해지기 어려웠습니다.
우선 R에서는 라이브러리를 불러오고 나서 다시 찾을 일이 없는데, Pyspark나 Python 모두 라이브러리를 참조해서 함수를 불러오는 경우가 종종 있었습니다.
예를 들어, 다음과 같이 두 방법으로 모듈을 불러왔을 때
1 2 from pyspark.sql.functions import *import pyspark.sql.functions as F
전자는 from으로 모든 걸 import 했기 때문에 그대로 sum(), avg(), max()와 같은 함수를 쓰면 되는 반면에,
후자는 F.sum(), F.avg(), F.max()와 같이 모듈명을 함수 앞에 붙여줘야 한다는 점이 어색했습니다. 그래서 이 포스트에서는 전자로 함수를 모두 불러와서 사용하였습니다.
또 하나, Pyspark를 사용하면서 가장 헷갈렸던 점은 컬럼을 부르는 방법입니다.
참 기본적인 것인데 헷갈렸던 이유를 생각해보면 pyspark는 똑같은 질의이지만 여러 표현으로 가능한 관용적인 언어 였기 때문입니다.
지금까지 찾아낸 "똑같은 질의를 하고 있지만 여러 표현으로 가능한 부분"은 다음과 같습니다.
컬럼을 참조할 때 다음의 네 가지 표현이 가능합니다.
마치 Python에서 컬럼을 불러올 때 .
이나 []
을 통해 불러올 수 있듯이 pyspark도 표현이 가능합니다. 또한 마지막의 경우 pyspark.sql.functions
모듈을 불러왔을 때 col()
함수를 사용하여 불러올 수 있습니다.
"col"
df.col
df["col"]
col("col")
orderBy
와 sort
는 같은 결과를 냅니다. 아마도 SQL 기반의 ORDER BY
와 Python 기반의 sort
을 모두 고려했기 때문이라 생각합니다.
filter
와 where
는 같은 결과를 냅니다. 이 또한 where
는 SQL 기반의 함수를 고려하였기 때문입니다.
union
과 unionAll
은 같은 결과를 냅니다. 이건 좀 의외인데 SQL에서는 UNION
연산은 distinct한 데이터만 가져와 합쳐주는 것인 반면, Pyspark에서는 union
, unionAll
모두 UNION ALL
을 의미한다 합니다.
이러한 점들을 새기고 나면 그래도 차후엔 유의해서 잘 사용할 수 있을 것이라 생각합니다.
Pyspark 함수 알아보기
자, 그럼 본격적으로 Pyspark 함수들을 알아봅시다!
먼저 기본 함수입니다.
기본 함수
함수명
설명
printSchema()
테이블의 스키마를 보여주는 함수
collect()
테이블에서 행을 가져오는 함수
show(truncate=False)
테이블 결과를 보여주는 함수, truncate = False
를 사용하면 테이블 내용이 잘리지 않도록 보여줍니다.
describe()
서머리 결과를 보여주는 함수
SQL 함수
Pyspark의 함수는 대부분 SQL 언어와 비슷하게 구성되어 있습니다.
SQL 언어와 비교하며 다음의 함수들에 대해 설명하겠습니다.
Pyspark 함수명
SQL clause
select
/ drop
SELECT
count
/ countDistinct
COUNT(*)
/ COUNT(DISTINCT *)
withColumn
/ withColumnRenamed
연산 as col
filter
(where
)
WHERE
groupBy
GROUP BY
orderBy
ORDER BY
join (how = {inner, full, left, right})
{INNER, FULL, LEFT, RIGHT} JOIN
union
/ unionByName
UNION ALL
pivot
PIVOT
row_number().over(Window.partitionBy().orderBy())
ROW_NUMBER() OVER(PARTITION BY ORDER BY)
먼저 위 함수를 설명할 테이블을 불러오겠습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayTypefrom pyspark.sql.functions import *data = [ (("James" ,"" ,"Smith" ),["Java" ,"Scala" ,"C++" ],"OH" ,"M" ), (("Anna" ,"Rose" ,"" ),["Spark" ,"Java" ,"C++" ],"NY" ,"F" ), (("Julia" ,"" ,"Williams" ),["CSharp" ,"VB" ],"OH" ,"F" ), (("Maria" ,"Anne" ,"Jones" ),["CSharp" ,"VB" ],"NY" ,"M" ), (("Jen" ,"Mary" ,"Brown" ),["CSharp" ,"VB" ],"NY" ,"M" ), (("Mike" ,"Mary" ,"Williams" ),["Python" ,"VB" ],"OH" ,"M" ) ] schema = StructType([ StructField('name' , StructType([ StructField('firstname' , StringType(), True ), StructField('middlename' , StringType(), True ), StructField('lastname' , StringType(), True ) ])), StructField('languages' , ArrayType(StringType()), True ), StructField('state' , StringType(), True ), StructField('gender' , StringType(), True ) ]) df = spark.createDataFrame(data = data, schema = schema) df.show(truncate=False )
1 2 3 4 5 6 7 8 9 10 +----------------------+------------------+-----+------+ |name |languages |state|gender| +----------------------+------------------+-----+------+ |{James, , Smith} |[Java, Scala, C++]|OH |M | |{Anna, Rose, } |[Spark, Java, C++]|NY |F | |{Julia, , Williams} |[CSharp, VB] |OH |F | |{Maria, Anne, Jones} |[CSharp, VB] |NY |M | |{Jen, Mary, Brown} |[CSharp, VB] |NY |M | |{Mike, Mary, Williams}|[Python, VB] |OH |M | +----------------------+------------------+-----+------+
select와 drop
select
는 다음과 같은 SQL 질의를 표현합니다.
1 df.select("name" ).show()
drop
은 특정 컬럼만 제외하고 불러올 때 사용합니다.
count와 countDistinct
1 df.select(count("state" ), countDistinct("state" )).show()
withColumn과 withColumnRenamed
withColumn
은 컬럼의 정보를 바꾸고자 할 때 혹은 새로운 컬럼을 추가할 때 사용합니다.
withColumn으로 컬럼 타입 바꾸기
1 df.withColumn("state" , df.state.cast("String" )).show()
withColumn으로 컬럼 추가하기
1 2 df.withColumn("Country" , lit("U.S.A" )).show()
withColumn으로 CASE WHEN 구문 쓰기
when
과 otherwise
함수를 사용합니다.
1 2 3 4 5 6 df.withColumn("gender" , when(df.gender == "F" , "Female" ) .when(df.gender=="M" , "Male" ) .when(df.gender.isNull(), "" ) .otherwise(df.gender) ).show()```
이를 SQL로 쓰면 다음과 같습니다.
SQL
1 2 3 4 5 6 7 SELECT * , CASE WHEN gender = 'F' THEN 'Female' WHEN gender = 'M' THEN 'Male' WHEN gender IS NULL THEN '' ELSE gender END as gender2 FROM df
withColumnRenamed
withColumnRenamed
는 컬럼명을 바꾸고 싶을 때 사용합니다.
1 2 df.withColumnRenamed("name" , "Name" ).show()
filter
filter
는 where
함수와 같으나 이 포스트에서는 filter
로만 사용하겠습니다.
1 2 3 df.filter (df.gender == "M" ).show() df.filter ("gender = 'M'" ).show()
Pyspark가 Python스럽기도 하고 SQL스럽기도 해서 여러 표현을 허용하다보니 "="의 개수, 따옴표의 위치 등이 헷갈릴 수 있는데요.
Python처럼 쓰거나 SQL처럼 쓴다 생각하고 하나로 스타일을 통일해줌이 좋을 것 같습니다.
여러 조건 필터링
filter
함수 안에
(조건1) & (조건2)
나 (조건1) | (조건2)
를 사용하거나
조건1 and 조건2
의 형태로 필터링합니다.
1 2 df.filter ((df.state == "OH" ) & (df.gender == "M" )).show() df.filter ("state = 'OH' and gender = 'M'" ).show()
IN() 조건 필터링
1 df.filter (df.state.isin([('OH' ,'CA' ,"DE" )])).show()
NOT IN ()
은 isin() 함수 앞에 ~
(NOT 연산자)를 사용합니다.
1 df.filter (~ df.state.isin([('OH' ,'CA' ,"DE" )])).show()
LIKE 조건 필터링
아래와 같은 함수를 사용할 수 있습니다.
startswith
endswith
contains
like
1 2 df.filter (df.state.startswith("N" )).show() df.filter (df.state.like("N%" )).show()
1 2 df.filter (df.state.endswith("H" )).show() df.filter (df.state.like("%H" )).show()
1 2 df.filter (df.state.contains("H" )).show() df.filter (df.state.like("%H%" )).show()
StructType, ArrayType 필터링
df의 스키마를 살펴보면 name
컬럼은 struct
타입, language
는 array
타입입니다.
1 2 3 4 5 6 7 8 9 root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- languages: array (nullable = true) | |-- element: string (containsNull = true) |-- state: string (nullable = true) |-- gender: string (nullable = true)
1 2 df.filter (df.name.lastname =="Williams" ).show()
1 2 df.filter (array_contains(df.languages,'Java' )).show()
groupBy
groupBy
를 위해 다음의 데이터프레임을 생성합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 simpleData = [("James" ,"Sales" ,"NY" ,90000 ,34 ,10000 ), ("Michael" ,"Sales" ,"NY" ,86000 ,56 ,20000 ), ("Robert" ,"Sales" ,"CA" ,81000 ,30 ,23000 ), ("Maria" ,"Finance" ,"CA" ,90000 ,24 ,23000 ), ("Raman" ,"Finance" ,"CA" ,99000 ,40 ,24000 ), ("Scott" ,"Finance" ,"NY" ,83000 ,36 ,19000 ), ("Jen" ,"Finance" ,"NY" ,79000 ,53 ,15000 ), ("Jeff" ,"Marketing" ,"CA" ,80000 ,25 ,18000 ), ("Kumar" ,"Marketing" ,"NY" ,91000 ,50 ,21000 ) ] schema = ["employee_name" ,"department" ,"state" ,"salary" ,"age" ,"bonus" ] df = spark.createDataFrame(data=simpleData, schema = schema) df.show(truncate=False )
1 2 3 4 5 6 7 8 9 10 11 12 13 +-------------+----------+-----+------+---+-----+ |employee_name|department|state|salary|age|bonus| +-------------+----------+-----+------+---+-----+ |James |Sales |NY |90000 |34 |10000| |Michael |Sales |NY |86000 |56 |20000| |Robert |Sales |CA |81000 |30 |23000| |Maria |Finance |CA |90000 |24 |23000| |Raman |Finance |CA |99000 |40 |24000| |Scott |Finance |NY |83000 |36 |19000| |Jen |Finance |NY |79000 |53 |15000| |Jeff |Marketing |CA |80000 |25 |18000| |Kumar |Marketing |NY |91000 |50 |21000| +-------------+----------+-----+------+---+-----+
아래처럼 groupBy
를 이용해 개수, 합계, 최소, 최대, 평균을 낼 수 있습니다.
1 2 3 4 5 df.groupBy("department" ).count().show() df.groupBy("department" ).sum ("salary" ).show() df.groupBy("department" ).min ("salary" ).show() df.groupBy("department" ).max ("salary" ).show() df.groupBy("department" ).avg("salary" ).show()
개수, 합계, 최소, 최대, 평균을 각 컬럼에 할당하여 저장하려면 agg
함수로 묶고 alias
로 컬럼명을 설정하여 사용합니다.
1 2 3 4 5 (df.groupBy("department" ) .agg(sum ("salary" ).alias("sum_salary" ) , avg("salary" ).alias("avg_salary" ) , sum ("bonus" ).alias("sum_bonus" ) , max ("bonus" ).alias("max_bonus" ))).show()
또한 여러 개의 컬럼을 이용하여 groupBy
를 하고 여러 컬럼을 집계할 경우 다음과 같이 씁니다.
1 df.groupBy("department" ,"state" ).sum ("salary" ,"bonus" ).show()
orderBy
1 df.orderBy(asc("state" ),desc("gender" )).show()
join
다음과 같이 empDF
와 deptDF
를 정의하였습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 emp = [(1 ,"Smith" ,-1 ,"2018" ,"10" ,"M" ,3000 ), \ (2 ,"Rose" ,1 ,"2010" ,"20" ,"M" ,4000 ), \ (3 ,"Williams" ,1 ,"2010" ,"10" ,"M" ,1000 ), \ (4 ,"Jones" ,2 ,"2005" ,"10" ,"F" ,2000 ), \ (5 ,"Brown" ,2 ,"2010" ,"40" ,"" ,-1 ), \ (6 ,"Brown" ,2 ,"2010" ,"50" ,"" ,-1 ) \ ] empColumns = ["emp_id" ,"name" ,"superior_emp_id" ,"year_joined" , \ "emp_dept_id" ,"gender" ,"salary" ] empDF = spark.createDataFrame(data=emp, schema = empColumns) empDF.show(truncate=False ) dept = [("Finance" ,10 ), \ ("Marketing" ,20 ), \ ("Sales" ,30 ), \ ("IT" ,40 ) \ ] deptColumns = ["dept_name" ,"dept_id" ] deptDF = spark.createDataFrame(data=dept, schema = deptColumns) deptDF.show(truncate=False )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +------+--------+---------------+-----------+-----------+------+------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+--------+---------------+-----------+-----------+------+------+ |1 |Smith |-1 |2018 |10 |M |3000 | |2 |Rose |1 |2010 |20 |M |4000 | |3 |Williams|1 |2010 |10 |M |1000 | |4 |Jones |2 |2005 |10 |F |2000 | |5 |Brown |2 |2010 |40 | |-1 | |6 |Brown |2 |2010 |50 | |-1 | +------+--------+---------------+-----------+-----------+------+------+ +---------+-------+ |dept_name|dept_id| +---------+-------+ |Finance |10 | |Marketing|20 | |Sales |30 | |IT |40 | +---------+-------+
empDF
의 emp_dept_id
deptDF
의 dept_id
가 조인 키가 되어 조인을 할 수 있는데요.
아래와 같이 inner, full, left join을 시행할 수 있습니다.
1 2 3 4 5 6 empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id).show() empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "full" ).show() empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left" ).show()
만약 조인키 이름이 같다면 어떻게 할까요?
공통된 key인 dept_id
를 on
조건에 적어주면 됩니다.
1 2 3 4 (empDF .withColumnRenamed("emp_dept_id" , "dept_id" ) .join(deptDF, on = 'dept_id' ) ).show(truncate=False )
만약 조인 키가 여러 개라면 ?
아래처럼 key 이름이 같다면 리스트로 받아주거나
&
조건을 이용해서 on 절을 적어줍니다.
1 2 3 df1.join(df2, on = ['key1' ,'key2' ], how="inner" ) df1.join(df2, on = (df1.key11 == df2.key12) \ & (df1.key21 == df2.key22), how="inner" )
또한, SQL에 존재하지 않는 left_anti
조인도 존재합니다.
1 empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, how = "left_anti" ).show()
1 2 3 4 5 +------+-----+---------------+-----------+-----------+------+------+ |emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary| +------+-----+---------------+-----------+-----------+------+------+ |6 |Brown|2 |2010 |50 | |-1 | +------+-----+---------------+-----------+-----------+------+------+
이를 SQL로 표현하면 다음과 같습니다.
SQL
1 2 3 4 5 SELECT * FROM empDF A LEFT JOIN deptDF B ON A.emp_dept_id = B.dept_id WHERE B.dept_id IS NULL
union / unionByName
다음은 union
입니다. union
은 SQL의 UNION
과 달리 distinct 한 행만 union을 해주는 것이 아니라,
중복된 행을 허용 하여 union을 해줍니다. (즉, UNION ALL
과 같습니다)
1 2 3 4 5 6 7 8 9 10 11 12 data = [("James","Sales",34), ("Michael","Sales",56), \ ("Robert","Sales",30), ("Maria","Finance",24) ] columns= ["name","dept","age"] df1 = spark.createDataFrame(data = data, schema = columns) df1.show() #Create DataFrame df1 with columns name,dep,state & salary data2=[("James","Sales","NY",9000),("Maria","Finance","CA",9000), \ ("Jen","Finance","NY",7900),("Jeff","Marketing","CA",8000)] columns2= ["name","dept","state","salary"] df2 = spark.createDataFrame(data = data2, schema = columns2) df2.show()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 +-------+-------+---+ | name| dept|age| +-------+-------+---+ | James| Sales| 34| |Michael| Sales| 56| | Robert| Sales| 30| | Maria|Finance| 24| +-------+-------+---+ +-----+---------+-----+------+ | name| dept|state|salary| +-----+---------+-----+------+ |James| Sales| NY| 9000| |Maria| Finance| CA| 9000| | Jen| Finance| NY| 7900| | Jeff|Marketing| CA| 8000| +-----+---------+-----+------+
union
은 기본적으로
컬럼의 개수가 같아야 하며,
컬럼 순서도 동일하게 두어야 제대로된 결과를 얻을 수 있습니다.
위 테이블에서는 df1
에서 state
, salary
컬럼이 빠져있고, df2
에서는 age
컬럼이 없어 union
을 할 수가 없는데요.
이를 위해 각 테이블에서 없는 컬럼을 생성해주는 함수를 짰습니다.
1 2 3 4 5 6 7 for column in [column for column in df2.columns if column not in df1.columns]: df1 = df1.withColumn(column, lit(None )) for column in [column for column in df1.columns if column not in df2.columns]: df2 = df2.withColumn(column, lit(None )) df1.show() df2.show()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 +-------+-------+---+-----+------+ | name| dept|age|state|salary| +-------+-------+---+-----+------+ | James| Sales| 34| null| null| |Michael| Sales| 56| null| null| | Robert| Sales| 30| null| null| | Maria|Finance| 24| null| null| +-------+-------+---+-----+------+ +-----+---------+-----+------+----+ | name| dept|state|salary| age| +-----+---------+-----+------+----+ |James| Sales| NY| 9000|null| |Maria| Finance| CA| 9000|null| | Jen| Finance| NY| 7900|null| | Jeff|Marketing| CA| 8000|null| +-----+---------+-----+------+----+
자, 여기서 UNION을 그대로 하면 왼쪽 테이블의 컬럼 순서대로 UNION이 되어서 잘못된 결과를 냅니다.
age컬럼에 state값이 들어가 있죠.
1 2 3 4 5 6 7 8 9 10 11 12 +-------+---------+---+-----+------+ | name| dept|age|state|salary| +-------+---------+---+-----+------+ | James| Sales| 34| null| null| |Michael| Sales| 56| null| null| | Robert| Sales| 30| null| null| | Maria| Finance| 24| null| null| | James| Sales| NY| 9000| null| | Maria| Finance| CA| 9000| null| | Jen| Finance| NY| 7900| null| | Jeff|Marketing| CA| 8000| null| +-------+---------+---+-----+------+
이를 제대로 UNION 해주려면 다음과 같이 컬럼 순서를 조정해주어야 합니다.
1 df1.select("name" ,"dept" ,"state" ,"salary" ,"age" ).union(df2).show()
그러나, unionByName
을 이용하면 컬럼 순서를 조정할 필요 없이 컬럼 이름에 맞게 UNION을 해주기 때문에,
편리하게 UNION을 할 수 있습니다.
1 df1.unionByName(df2).show()
1 2 3 4 5 6 7 8 9 10 11 12 +-------+---------+----+-----+------+ | name| dept| age|state|salary| +-------+---------+----+-----+------+ | James| Sales| 34| null| null| |Michael| Sales| 56| null| null| | Robert| Sales| 30| null| null| | Maria| Finance| 24| null| null| | James| Sales|null| NY| 9000| | Maria| Finance|null| CA| 9000| | Jen| Finance|null| NY| 7900| | Jeff|Marketing|null| CA| 8000| +-------+---------+----+-----+------+
pivot / unpivot
pivot
다음은 pivot
입니다.
pivot은 long 테이블을 wide 하게 바꿔주고, unpivot은 wide 한 테이블을 long 하게 바꿔주는 것을 의미하는데요.
예시로 보면 더 쉽습니다. 아래의 예시는 long 형태의 테이블입니다.
1 2 3 4 5 6 7 8 9 10 11 12 import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import expr #Create spark session data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \ ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \ ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \ ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")] columns= ["Product","Amount","Country"] df = spark.createDataFrame(data = data, schema = columns) df.show(truncate=False)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 +-------+------+-------+ |Product|Amount|Country| +-------+------+-------+ |Banana |1000 |USA | |Carrots|1500 |USA | |Beans |1600 |USA | |Orange |2000 |USA | |Orange |2000 |USA | |Banana |400 |China | |Carrots|1200 |China | |Beans |1500 |China | |Orange |4000 |China | |Banana |2000 |Canada | |Carrots|2000 |Canada | |Beans |2000 |Mexico | +-------+------+-------+
이를 wide 하게 바꿔주려면 Product
를 열, Country
를 행으로 해서 Amount
의 합계를 구해줄 수 있겠죠.
Summary용 테이블이라 보셔도 무방합니다.
이를 위해 Product
로 groupBy
를 한 후 Country
로 pivot
을 하여 Amount
의 합계를 구해줍니다.
1 2 pivotDF = df.groupBy("Product" ).pivot("Country" ).sum ("Amount" ) pivotDF.show(truncate=False )
1 2 3 4 5 6 7 8 +-------+------+-----+------+----+ |Product|Canada|China|Mexico|USA | +-------+------+-----+------+----+ |Orange |null |4000 |null |4000| |Beans |null |1500 |2000 |1600| |Banana |2000 |400 |null |1000| |Carrots|2000 |1200 |null |1500| +-------+------+-----+------+----+
혹은 이를 더 가벼운 연산으로 계산하려면 pivot()
의 주체에 해당하는 컬럼명을 명시해주면 좋다고 합니다.
1 2 3 countries = ["USA" ,"China" ,"Canada" ,"Mexico" ] pivotDF = df.groupBy("Product" ).pivot("Country" , countries).sum ("Amount" ) pivotDF.show(truncate=False )
unpivot
Pyspark에서는 unpivot 함수를 지원하고 있지 않기 때문에 expr
를 이용하여 다시 long 형태의 테이블로 변형할 수 있습니다.
1 2 3 4 5 6 7 unpivotExpr = "stack(4,'USA',USA,'Canada',Canada,'China',China,'Mexico',Mexico) as (Country, Total)" unPivotDF = (pivotDF .select("Product" , expr(unpivotExpr)) .filter ("total is not null" ) .orderBy("Product" ) ) unPivotDF.show()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +-------+-------+-----+ |Product|Country|Total| +-------+-------+-----+ | Banana| China| 400| | Banana| Canada| 2000| | Banana| USA| 1000| | Beans| USA| 1600| | Beans| Mexico| 2000| | Beans| China| 1500| |Carrots| China| 1200| |Carrots| USA| 1500| |Carrots| Canada| 2000| | Orange| USA| 4000| | Orange| China| 4000| +-------+-------+-----+
row_number
사실 row_number
외에도 많은 Window 함수들이 있지만 오늘은 row_number
만 소개하고자 합니다.
추가로 보실 분은 이 곳
을 참조해주세요.
pivot을 배울 때 썼던 데이터프레임을 그대로 쓰겠습니다.
1 2 3 4 5 6 from pyspark.sql.window import Window ( df.withColumn('row_num' , row_number().over(Window.partitionBy('Product').orderBy(desc('Amount')))) .show() )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 +-------+------+-------+-------+ |Product|Amount|Country|row_num| +-------+------+-------+-------+ | Banana| 2000| Canada| 1| | Banana| 1000| USA| 2| | Banana| 400| China| 3| | Beans| 2000| Mexico| 1| | Beans| 1600| USA| 2| | Beans| 1500| China| 3| |Carrots| 2000| Canada| 1| |Carrots| 1500| USA| 2| |Carrots| 1200| China| 3| | Orange| 4000| China| 1| | Orange| 2000| USA| 2| | Orange| 2000| USA| 3| +-------+------+-------+-------+
마치며
오늘은 최대한 기초적인 함수 위주로 다 소개하는 것을 목표로 하고 글을 썼는데요. 역시 깁니다.
이 글을 보면서 pyspark 를 능수능란하게 쓰는 그 날을 고대하며! 이 글을 마칩니다 ~!