kafka
[python] Getting startingOffsets value and reading kafka value into spark dataframe
2jelly
2021. 12. 27. 17:48
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
# get maxoffset
spark1 = SparkSession.builder \
.appName(appName1) \
.config("spark.executor.memory","11g") \
.enableHiveSupport() \
.getOrCreate()
strStartOffset = 'earliest'
if len(listPath) > 0:
dfOffset = spark1.read.parquet(path+"/"+listPath['pathSuffix'].max())
dfOffset.createOrReplaceTempView("temp_offset")
dfMaxOffset = spark1.sql("select partition, max(offset)+1 as maxoffset from tmp_offset group by partition").toPandas()
dfMaxOffset.index = dfMaxOffset["partition"]
jsonMaxOffset = dfMaxOffset["maxoffset"].to_json()
strStartOffset = '{\"' + topicName + '\":' + jsonMaxOffset + '}'
# extract kafka to hdfs
dfRaw = spark1.read.format("kafka") \
.option("kafka.bootstrap.servers","kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092") \
.option("subscribe",topicName) \
.option("auto.offset.reset","earliest") \
.option("startingOffsets",strStartOffset) \
.option("endingOffsets","latest") \
.option("failOnDataLoss","false") \
.load().drop_duplicates()
dfRaw = dfRaw.withColumn("value", dfRaw["value"].cast("string"))
dfRaw = dfRaw.withColumn("tmp_timestamp", func.get_json_object("value","$.@timestamp"))
dfRaw = dfRaw.withColumn("value", func.get_json_object("value", "$.message"))