spark

[spark dataframe] extract date value using pyspark udf lambda

2jelly 2021. 12. 27. 16:00
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime

fnDataReplace = func.udf(lambda s : s.replace('\\',''))
fnGetBaseDate = func.udf(lambda value1, s1, s2, s3 : extractBaseDate(value1, s1, s2, s3))
def extractBaseDate(value1, dateCol1, dateCol2, timestampCol):
    if (dateCol1 is not None) and len(dateCol1) > 13:
        baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d%H%M%s%f").strftime("%Y-%m-%d")
    elif (dateCol2 is not None) and len(dateCol2) > 13:
        baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d")
    else:
        try :
            baseDate = datetime.datetime.strptime(value1[1:11], "%Y-%m-%d").strftime("%Y-%m-%d")
        except ValueError:
            try :
                baseDate = datetime.datetime.strptime(timestampCol[:10], "%Y-%m-%d").strftime("%Y-%m-%d")
            except ValueError:
                baseDate = "unusable"
    return baseDate


dfRaw = dfRaw.withColumn("value", fnDataReplace("value"))
dfRaw = dfRaw.withColumn("base_dt", fnGetBaseDate("value","c0","c1","tmp_timestamp"))
dfRaw = dfRaw.drop("c0","c1","tmp_timestamp")
dfRaw = dfRaw.filter(dfRaw["base_dt"] != "unusable")