본문 바로가기

spark

[spark dataframe] extract date value using pyspark udf lambda

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime

fnDataReplace = func.udf(lambda s : s.replace('\\',''))
fnGetBaseDate = func.udf(lambda value1, s1, s2, s3 : extractBaseDate(value1, s1, s2, s3))
def extractBaseDate(value1, dateCol1, dateCol2, timestampCol):
    if (dateCol1 is not None) and len(dateCol1) > 13:
        baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d%H%M%s%f").strftime("%Y-%m-%d")
    elif (dateCol2 is not None) and len(dateCol2) > 13:
        baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d")
    else:
        try :
            baseDate = datetime.datetime.strptime(value1[1:11], "%Y-%m-%d").strftime("%Y-%m-%d")
        except ValueError:
            try :
                baseDate = datetime.datetime.strptime(timestampCol[:10], "%Y-%m-%d").strftime("%Y-%m-%d")
            except ValueError:
                baseDate = "unusable"
    return baseDate


dfRaw = dfRaw.withColumn("value", fnDataReplace("value"))
dfRaw = dfRaw.withColumn("base_dt", fnGetBaseDate("value","c0","c1","tmp_timestamp"))
dfRaw = dfRaw.drop("c0","c1","tmp_timestamp")
dfRaw = dfRaw.filter(dfRaw["base_dt"] != "unusable")

'spark' 카테고리의 다른 글

[pyspark] create spark dataframe  (0) 2020.07.10
[spark] dataframe get row value 데이터 전처리  (0) 2020.06.16