from pyspark.sql import SparkSession
import pyspark.sql.functions as func
# get maxoffset
spark1 = SparkSession.builder \
.appName(appName1) \
.config("spark.executor.memory","11g") \
.enableHiveSupport() \
.getOrCreate()
strStartOffset = 'earliest'
if len(listPath) > 0:
dfOffset = spark1.read.parquet(path+"/"+listPath['pathSuffix'].max())
dfOffset.createOrReplaceTempView("temp_offset")
dfMaxOffset = spark1.sql("select partition, max(offset)+1 as maxoffset from tmp_offset group by partition").toPandas()
dfMaxOffset.index = dfMaxOffset["partition"]
jsonMaxOffset = dfMaxOffset["maxoffset"].to_json()
strStartOffset = '{\"' + topicName + '\":' + jsonMaxOffset + '}'
# extract kafka to hdfs
dfRaw = spark1.read.format("kafka") \
.option("kafka.bootstrap.servers","kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092") \
.option("subscribe",topicName) \
.option("auto.offset.reset","earliest") \
.option("startingOffsets",strStartOffset) \
.option("endingOffsets","latest") \
.option("failOnDataLoss","false") \
.load().drop_duplicates()
dfRaw = dfRaw.withColumn("value", dfRaw["value"].cast("string"))
dfRaw = dfRaw.withColumn("tmp_timestamp", func.get_json_object("value","$.@timestamp"))
dfRaw = dfRaw.withColumn("value", func.get_json_object("value", "$.message"))
'kafka' 카테고리의 다른 글
[command] kafka topic CLI (0) | 2021.12.27 |
---|---|
Filebeat to Kafka procedure (0) | 2021.12.27 |