kafka (3) 썸네일형 리스트형 [python] Getting startingOffsets value and reading kafka value into spark dataframe from pyspark.sql import SparkSession import pyspark.sql.functions as func # get maxoffset spark1 = SparkSession.builder \ .appName(appName1) \ .config("spark.executor.memory","11g") \ .enableHiveSupport() \ .getOrCreate() strStartOffset = 'earliest' if len(listPath) > 0: dfOffset = spark1.read.parquet(path+"/"+listPath['pathSuffix'].max()) dfOffset.createOrReplaceTempView("temp_offset") dfMaxOff.. [command] kafka topic CLI # topic list check kafka-topics --bootstrap-server kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092 --list # topic details kafka-topics --bootstrap-server kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092 --topic topic1 --describe # change topic partition kafka-topics --bootstrap-server kafka01.host.com:9092,kafka02.host.com:9092,kafka03.host.com:9092 --alter --t.. Filebeat to Kafka procedure # ssh ssh id@log_generating_server # cd cd /dir/filebeat_to_kafka # unzip tar -xvf filebeat_to_kafka.tar # Check running processes ps -ef | grep filebeat # stop work kill -9 `ps -ef | grep -w filebeat-big.yml | grep -v grep | awk '{print $2}'` # update yml file vi filebeat-big.yml #-----------------------filebeat prospectors--------------------------------- - input_type: log paths: - /dir/AAA???.. 이전 1 다음