Skip to content

Latest commit

 

History

History
270 lines (200 loc) · 8.89 KB

20170105_02.md

File metadata and controls

270 lines (200 loc) · 8.89 KB

PostgreSQL 流式数据处理(聚合、过滤、转换...)系列 - 2

作者

digoal

日期

2017-01-05

标签

PostgreSQL , 流式 , 函数 , 流式处理 , 异步统计 , count , group , agg , 触发器 , xid , 事务隔离 , 异步气泡 , gap , function , 串行处理


背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

接上一篇blog

http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

上一篇blog的后面一部分讲解了如何利用advisory lock来规避死锁的问题.

但是在事务中如果a的dml sql只有1条, 即不可能发生死锁的情况下, 这种方法性能远不如不做死锁规避的函数.

因此还有其他更优秀的方法来规避死锁吗?

当然有的, 这里就要介绍另一种方法, 结合Linux process id来使用.

当一个客户端连接PostgreSQL的时候, Postgres进行会fork一个进程出来, 称为PostgreSQL backend process. 这个进程负责与客户端进行交互.

接下来就要用PostgreSQL backend process的process id来标记cnt_a中的id字段.

因为同一时间点每个backend process 的pid不一样, 完全规避了死锁的问题.

具体方法如下 :

postgres=# create table a(id serial4 primary key, info text, crt_time timestamp(0) default now());  
NOTICE:  CREATE TABLE will create implicit sequence "a_id_seq" for serial column "a.id"  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "a_pkey" for table "a"  
CREATE TABLE  
  
postgres=# create table cnt_a(id int primary key, cnt int);  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "cnt_a_pkey" for table "cnt_a"  
CREATE TABLE  

cnt_a不需要初始值了

1. 创建插入触发器函数

CREATE OR REPLACE FUNCTION public.tg_insert_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
begin  
  update cnt_a set cnt=cnt+1 where id=pg_backend_pid();  
  if not found then  
    insert into cnt_a(id, cnt) values (pg_backend_pid(), 1);  
  end if;  
  return null;  
end;  
$function$;  

2. 创建删除触发器函数

CREATE OR REPLACE FUNCTION public.tg_delete_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
begin  
  update cnt_a set cnt=cnt-1 where id=pg_backend_pid();  
  if not found then  
    insert into cnt_a(id, cnt) values (pg_backend_pid(), -1);  
  end if;  
  return null;  
end;  
$function$;  

3. 创建truncate触发器函数

CREATE OR REPLACE FUNCTION public.tg_truncate_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
begin  
  update cnt_a set cnt=0 where not cnt=0;  
  return null;  
end;  
$function$;  

4. 创建触发器

create trigger tg1 after insert on a for each row execute procedure tg_insert_a();  
create trigger tg2 after delete on a for each row execute procedure tg_delete_a();  
create trigger tg3 after truncate on a for each statement execute procedure tg_truncate_a();  

5. 测试单dmlsql场景pgbench, 有显著提升.

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 2137104  
tps = 35606.972536 (including connections establishing)  
tps = 35615.523750 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.447084        insert into a (info) values ('test');  

验证

postgres=# select count(*) from a;  
  count    
---------  
 2137104  
(1 row)  
  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 2137104  
(1 row)  

6. 测试多dmlsql场景pgbench, 有显著提升.

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 275704  
tps = 4593.421227 (including connections establishing)  
tps = 4594.495475 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003119        \setrandom id 1 20000000  
        0.179727        begin;  
        0.215766        delete from a where id=:id;  
        0.215785        delete from a where id=:id;  
        0.335462        insert into a (info) values ('test');  
        0.222234        delete from a where id=:id;  
        0.328336        insert into a (info) values ('test');  
        0.222417        delete from a where id=:id;  
        0.325938        insert into a (info) values ('test');  
        0.221347        delete from a where id=:id;  
        0.325768        insert into a (info) values ('test');  
        0.329100        insert into a (info) values ('test');  
        0.328841        insert into a (info) values ('test');  
        0.212162        end;  

验证

postgres=# select count(*) from a;  
  count    
---------  
 3791328  
(1 row)  
  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 3791328  
(1 row)  

小结

1.

需要注意, 因为linux进程id范围可能会比较大, 最后cnt_a表也许会变得比较大.

如果有方法能够限定启动postgresql数据库用户的pid分配范围, 那么这将不会成为一个问题.

当然你也可以对cnt_a表进行整理, 合并掉一些记录. 给cnt_a表瘦身.

如下 :

postgres=# begin;  
BEGIN  
postgres=# select * from cnt_a limit 1;  
  id   |  cnt     
-------+--------  
 11305 | 122315  
(1 row)  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 3791328  
(1 row)  
postgres=# update cnt_a set cnt=3791328 where id=11305;  
UPDATE 1  
postgres=# delete from cnt_a where id<>11305;  
DELETE 31  
postgres=# end;  
COMMIT  

2.

应用本例的触发器函数, 最终的dml qps如下 :

单事务单dml sql场景 : 35615 qps

单事务多dml sql场景 : 50534 qps

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Count