본문 바로가기

kafka

[python] Getting startingOffsets value and reading kafka value into spark dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as func

# get maxoffset
spark1 = SparkSession.builder \
        .appName(appName1) \
        .config("spark.executor.memory","11g") \
        .enableHiveSupport() \
        .getOrCreate()
strStartOffset = 'earliest'
if len(listPath) > 0:
    dfOffset = spark1.read.parquet(path+"/"+listPath['pathSuffix'].max())
    dfOffset.createOrReplaceTempView("temp_offset")

    dfMaxOffset = spark1.sql("select partition, max(offset)+1 as maxoffset from tmp_offset group by partition").toPandas()
    dfMaxOffset.index = dfMaxOffset["partition"]

    jsonMaxOffset = dfMaxOffset["maxoffset"].to_json()
    strStartOffset = '{\"' + topicName + '\":' + jsonMaxOffset + '}'

# extract kafka to hdfs
dfRaw = spark1.read.format("kafka") \
    .option("kafka.bootstrap.servers","kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092") \
    .option("subscribe",topicName) \
    .option("auto.offset.reset","earliest") \
    .option("startingOffsets",strStartOffset) \
    .option("endingOffsets","latest") \
    .option("failOnDataLoss","false") \
    .load().drop_duplicates()

dfRaw = dfRaw.withColumn("value", dfRaw["value"].cast("string"))
dfRaw = dfRaw.withColumn("tmp_timestamp", func.get_json_object("value","$.@timestamp"))
dfRaw = dfRaw.withColumn("value", func.get_json_object("value", "$.message"))

'kafka' 카테고리의 다른 글

[command] kafka topic CLI  (0) 2021.12.27
Filebeat to Kafka procedure  (0) 2021.12.27