from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime
fnDataReplace = func.udf(lambda s : s.replace('\\',''))
fnGetBaseDate = func.udf(lambda value1, s1, s2, s3 : extractBaseDate(value1, s1, s2, s3))
def extractBaseDate(value1, dateCol1, dateCol2, timestampCol):
if (dateCol1 is not None) and len(dateCol1) > 13:
baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d%H%M%s%f").strftime("%Y-%m-%d")
elif (dateCol2 is not None) and len(dateCol2) > 13:
baseDate = datetime.datetime.strptime(dateCol1, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d")
else:
try :
baseDate = datetime.datetime.strptime(value1[1:11], "%Y-%m-%d").strftime("%Y-%m-%d")
except ValueError:
try :
baseDate = datetime.datetime.strptime(timestampCol[:10], "%Y-%m-%d").strftime("%Y-%m-%d")
except ValueError:
baseDate = "unusable"
return baseDate
dfRaw = dfRaw.withColumn("value", fnDataReplace("value"))
dfRaw = dfRaw.withColumn("base_dt", fnGetBaseDate("value","c0","c1","tmp_timestamp"))
dfRaw = dfRaw.drop("c0","c1","tmp_timestamp")
dfRaw = dfRaw.filter(dfRaw["base_dt"] != "unusable")
'spark' 카테고리의 다른 글
[pyspark] create spark dataframe (0) | 2020.07.10 |
---|---|
[spark] dataframe get row value 데이터 전처리 (0) | 2020.06.16 |