You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
createtabletest(id bigint,name string) using iceberg
写数据
INSERT
INSERT INTO test values(1,'hsm');
INSERT INTO test select id ,name from test1 where id=1;
spark iceberg支持行级别的更新通过MERGE INTO和DELETE FROM
-- 如果匹配上了则相加,匹配不上则插入
MERGE INTO test t USING(select*from updates) u ONt.id=u.id
WHEN MATCHED THEN UPDATESETt.count=t.count+u.count
WHEN NOT MATCHED THEN INSERT *
-- create a struct columnALTERTABLEprod.db.sample
ADD COLUMN point struct<x: double, y: double>;
-- add a field to the structALTERTABLEprod.db.sample
ADD COLUMN point.z double
Spark2.4之后可以使用FIRST和AFTER
ALTERTABLEprod.db.sample
ADD COLUMN new_column bigint AFTER other_column
ALTERTABLEprod.db.sample
ADD COLUMN nested.new_columnbigint FIRST
ALTER TABLE ... RENAME COLUMN
ALTERTABLEprod.db.sample RENAME COLUMN data TO payload
ALTERTABLEprod.db.sample RENAME COLUMN location.lat TO latitude
ALTER TABLE ... ALTER COLUMN
修改列的类型
ALTERTABLEprod.db.sample ALTER COLUMN measurement TYPE double
修改列的类型和描述
ALTERTABLEprod.db.sample ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second'ALTERTABLEprod.db.sample ALTER COLUMN measurement COMMENT 'unit is kilobytes per second'
使用First和After
ALTERTABLEprod.db.sample ALTER COLUMN col FIRST
ALTERTABLEprod.db.sample ALTER COLUMN nested.col AFTER other_col
设置Not NULL和删除NOT Null
ALTERTABLEprod.db.sample ALTER COLUMN id DROP NOT NULL
ALTER TABLE ... DROP COLUMN
ALTERTABLEprod.db.sample DROP COLUMN id
ALTERTABLEprod.db.sample DROP COLUMN point.z
ALTERTABLEprod.db.sample ADD PARTITION FIELD catalog -- identity transform
分区转换也支持
ALTERTABLEprod.db.sample ADD PARTITION FIELD bucket(16, id)
ALTERTABLEprod.db.sample ADD PARTITION FIELD truncate(data, 4)
ALTERTABLEprod.db.sample ADD PARTITION FIELD years(ts)
-- use optional AS keyword to specify a custom name for the partition field ALTERTABLEprod.db.sample ADD PARTITION FIELD bucket(16, id) AS shard
ALTERTABLEprod.db.sample DROP PARTITION FIELD catalog
ALTERTABLEprod.db.sample DROP PARTITION FIELD bucket(16, id)
ALTERTABLEprod.db.sample DROP PARTITION FIELD truncate(data, 4)
ALTERTABLEprod.db.sample DROP PARTITION FIELD years(ts)
ALTERTABLEprod.db.sample DROP PARTITION FIELD shard
ALTERTABLEprod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)ALTERTABLEprod.db.sample WRITE ORDERED BY category ASC, id DESC-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)ALTERTABLEprod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
// time travel to October 26, 1986 at 01:21:00
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("path/to/table")
// time travel to snapshot with ID 10963874102873L
spark.read
.option("snapshot-id",8745438249199332230L)
.format("iceberg")
.load("iceberg_db.test")
.show(20)
MERGE INTO prod.db.target t -- a target table
USING (SELECT ...) s -- the source updatesONt.id=s.id-- condition to find updates for target rows
WHEN ... -- updates
WHEN MATCHED ANDs.op='delete' THEN DELETE
WHEN MATCHED ANDt.count IS NULLANDs.op='increment' THEN UPDATESETt.count=0
WHEN MATCHED ANDs.op='increment' THEN UPDATESETt.count=t.count+1
INSERT OVERWRITE
动态分区方式overwrite
INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROMprod.my_app.logs
WHERE cast(ts asdate) ='2020-07-01'GROUP BY uuid
静态分区写入
INSERT OVERWRITE prod.my_app.logs
PARTITION (level ='INFO')
SELECT uuid, first(level), first(ts), first(message)
FROMprod.my_app.logs
WHERE level ='INFO'GROUP BY uuid
DELETE FROM
DELETEFROMprod.db.table
WHERE ts >='2020-05-01 00:00:00'and ts <'2020-06-01 00:00:00'