본문 바로가기

SQL/kudu

[kudu] query(insert/upsert/update/delete), coding(python, pyspark)

[query]

INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");

UPSERT INTO my_first_table VALUES (99, "zoe");

UPDATE my_first_table SET name="bob" where age > 10;

DELETE FROM my_first_table WHERE id < 3;

 

[python]

<<case1>>

import kudu

client = kudu.Client("myhost.com:port")

tableName = "impala::tmp.table_name"

kuduTable = client.table(tableName)

session = client.new_session()

 

op = kuduTable.new_upsert({'col1':'value1', 'col2':'value2', 'col3':value3})

session.apply(op)

session.flush()

 

<<case2 : using spark>>

import pyspark.sql import SparkSession

spark = SparkSession.builder\

          .appName("test")

          .config("spark.driver.cores","4")\

          .config("spark.driver.memory","4g")\

          .config("spark.kryoserializer.buffer.max","512m")\

          .config("spark.scheduler.mode","FAIR")\

          .config("spark.dynamicAllocation.maxExecutors","11")\

          .config("spark.jars","~/kudu-spark2_2.11.jar,~/kudu-spark2-tools_2.11.jar")\

          .config("spark.extraClassPath","~/kudu-spark2_2.11.jar,~/kudu-spark2-tools_2.11.jar")\

          .config("mapreduce.fileoutputcommitter.marksucessfuljobs","false")\

          .enableHiveSupport()\

          .getOrCreate()

readdf = spark.read.format("org.apache.kudu.spark.kudu")\

            .option("kudu.master","myhost.com:port")\

            .option("kudu.table","impala::tmp.table_name").load()

writedf.write.format("org.apache.kudu.spark.kudu")\

        .option("kudu.master","myhost.com:port")\

        .option("kudu.table","impala::tmp.table_name").mode("append").save()

'SQL > kudu' 카테고리의 다른 글

[kudu] column add/change  (0) 2020.05.21
[kudu] create table  (0) 2020.05.14