digoal
2016-10-21
PostgreSQL , 实时推荐系统 , 用户画像 , 标签 , TAG , 比特位
用户画像在市场营销的应用重建中非常常见,已经不是什么新鲜的东西,比较流行的解决方案是给用户贴标签,根据标签的组合,圈出需要的用户。
通常画像系统会用到宽表,以及分布式的系统。
宽表的作用是存储标签,例如每列代表一个标签。
但实际上这种设计不一定是最优或唯一的设计,本文将以PostgreSQL数据库为基础,给大家讲解一下更加另类的设计思路,并且看看效率如何。
假设有一个2B的实时用户推荐系统,每个APPID代表一个B。
业务数据包括APPID,USERIDs,TAGs。(2B的用户ID,最终用户ID,标签)
业务没有跨APPID的数据交换操作需求,也就是说仅限于APPID内的用户推荐。
查询局限在某个选定的APPID,以及TAG组合,搜索符合条件的USERID,并将USERID推送给用户。
数据总量约10亿,单个APPID的用户数最大约1亿。
TAG总数设计容量为1万个。
查询需求: 包含,不包含,或,与。
并发几百,RT 毫秒级。
接下来我会列举4个方案,并分析每种方案的优缺点。
通常表的宽度是有限制的,以PostgreSQL为例,一条记录是无法跨PAGE的(变长字段存储到TOAST存储,以存储超过1页大小的列,页内只存储指针),这就使得表的宽度受到了限制。
例如8KB的数据块,可能能存下接近2000个列。
如果要为每个TAG设计一个列,则需要1万个列的宽表。
相信其它数据库也有类似的限制,1万个列的宽表,除非改造数据库内核,否则无法满足需求。
那么可以使用APPID+USERID作为PK,存储为多个表来实现无限个TAG的需求。 以单表1000个列为例,10个表就能满足1万个TAG的需求。
create table t_tags_1(appid int, userid int8, tag1 boolean, tag2 boolean, ...... tag1000 boolean);
.....
create table t_tags_10(appid int, userid int8, tag9001 boolean, tag9002 boolean, ...... tag10000 boolean);
为了提升效率,要为每个tag字段创建索引,也就是说需要1万个索引。
如果TAG的组合跨表了,还有JOIN操作。
1. 优点
没有用什么特殊的优化,几乎所有的数据库都支持。
2. 缺点
性能不一定好,特别是查询组合条件多的话,性能会下降比较明显,例如(tag1 and tag2 and (tag4 or tag5) or not tag6) 。
使用数组代替TAG列,要求数据库有数组类型,同时有数组的高效检索能力,这一点PostgreSQL可以很好的满足需求。
1. 数据结构
APPID, USERID, TAG[] 数组
单个数组最大长度1GB(约支持2.6亿个TAG)
2. 按APPID分区,随机分片
3. query语法
3.1 包含array2指定的所有TAG
数组1包含数组2的所有元素
array1 @> array2
支持索引检索
3.2 包含array2指定的TAG之一
数组1与数组2有重叠元素
array1 && array2
支持索引检索
3.3 不包含array2指定的所有tag
数组1与数组2没有重叠元素
not array1 && array2
不支持索引检索
4. 例子
create table t_arr(appid int, userid int8, tags int2[]) with(parallel_workers=128);
create index idx_t_array_tags on t_arr using gin (tags) with (fastupdate=on, gin_pending_list_limit= 1024000000);
create index idx_t_arr_uid on t_arr(userid);
819200KB约缓冲10000条80K的数组记录,可以自行调整.
1.每个USERID包含10000个TAG(极限)。
insert into t_arr select 1, 2000000000*random(),(select array_agg(10000*random()) from generate_series(1,10000));
nohup pgbench -M prepared -n -r -f ./test.sql -P 1 -c 50 -j 50 -t 2000000 > ./arr.log 2>&1 &
5. 优点
可以存储很多TAG,几亿个足够用啦(行业内有1万个TAG的已经是非常多的啦)。
支持数组的索引查询,但是not不支持索引。
6. 缺点
数据量还是有点大,一条记录1万个TAG,约80KB。
1亿记录约8TB,索引还需要约8TB。
不是所有的数据库都支持数组类型。
使用BIT存储TAG,0和1表示有或者没有这个TAG。
1. 数据结构
APPID, USERID, TAG 比特流
单个BIT字段最大支持1GB长度BIT流(支持85亿个TAG)
每个BIT代表一个TAG
2. 按APPID分区,随机分片
3. query语法
3.1 包含bit2指定的所有TAG(需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) = bit2
3.2 包含bit2指定的TAG之一(需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) > 0
3.3 不包含bit2指定的所有tag (需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) = zerobit(10000)
4. 例子
create table t_bit(appid int, userid int8, tags varbit) ;
create index idx_t_bit_uid on t_bit(userid);
每个USERID对应10000位随机的比特值
date;for ((i=1;i<=50;i++)); do psql -c "insert into t_bit select 1, 2000000000*random(), \
(select (string_agg(mod((2*random())::int,2)::text,''))::varbit from generate_series(1,10000)) tags \
from generate_series(1,2000000)" ; done; date
127GB, 每秒插入24.5万, 326MB/s
插入速度
(批量)每秒插入24.5万, 326MB/s
更新、删除 tag速度
create or replace function randbit(int) returns varbit as $$
select (string_agg(mod((2*random())::int,2)::text,''))::varbit from generate_series(1,$1);
$$ language sql strict volatile;
create or replace function zerobit(int) returns varbit as $$
select (string_agg('0',''))::varbit from generate_series(1,$1);
$$ language sql strict immutable;
update t_bit set tags=randbit(10000) where userid=:id;
每秒更新、删除1万 记录,响应时间约4毫秒
查询速度
do language plpgsql $$
declare
sql text;
bit1 varbit := randbit(10000);
bit2 varbit := randbit(10000);
bit3 varbit := randbit(10000);
zbit varbit := zerobit(10000);
begin
set max_parallel_workers_per_gather =27;
sql := 'select * from t_bit where bitand(tags,'''||bit1::text||''')='''||bit1::text||''' and bitand(tags,'''||bit2::text||''')>bit''0'' and bitand(tags,'''||bit3::text||''')='''||zbit::text||'''';
raise notice '%', sql;
-- execute sql;
end;
$$;
开27个并行, 17秒。
5. 优点
可以存储很多TAG,85亿个TAG足够用啦吧(行业内有1万个TAG的已经是非常多的啦)。
1万个TAG,占用1万个BIT,约1.25KB。
1亿记录约120GB,无索引。
6. 缺点
没有索引方法,查询是只能通过并行计算提升性能。
PostgreSQL 9.6 支持CPU并行计算,1亿用户时,可以满足20秒内返回,但是会消耗很多的CPU资源,因此查询的并行度不能做到很高。
有没有又高效,又节省资源的方法呢?
答案是有的。
因为查询通常是以TAG为组合条件,取出复合条件的USERID的查询。
所以反过来设计,查询效果就会很好,以TAG为维度,USERID为比特位的设计。
我们需要维护的是每个tag下有哪些用户,所以这块的数据更新量会很大,需要考虑增量合并与读时合并的设计。
数据流如下,数据可以快速的写入
data -> 明细表 -> 增量聚合 -> appid, tagid, userid_bits
读取时,使用两部分数据进行合并,一部分是tag的计算结果,另一部分是未合并的明细表的结果,两者MERGE。
当然,如果可以做到分钟内的合并延迟,业务也能够忍受分钟的延迟的话,那么查询是就没有MERGE的必要了,直接查结果,那会非常非常快。
1. query
1.1 包含这些tags的用户
userids (bitand) userids
结果为bit位为1的用户
1.2 不包含这些tags的用户
userids (bitor) userids
结果为bit位为0的用户
1.3 包含这些tags之一的用户
userids (bitor) userids
结果为bit位为1的用户
2. 优点
因为数据存储的维度发生了变化,采用以查询为目标的设计,数据的查询效率非常高。
3. 缺点
由于使用了比特位表示USERID,所以必须有位置与USERID的映射关系。
需要维护用户ID字典表,需要使用增量合并的手段减少数据的更新频率。
会有一定的延迟,通常可以控制在分钟内,如果业务允许这样的延迟,则非常棒。
通常业务的USERID会周期性的失效(例如僵尸USERID,随着时间可以逐渐失效),那么需要周期性的维护用户ID字典,同时也要更新USERID比特信息。
架构如图
本文会用到几个新增的FUNCTION,这几个function很有用,同时会加入阿里云的RDS PostgreSQL中。
get_bit (varbit, int, int) returns varbit
从指定位置开始获取N个BIT位,返回varbit
例如 get_bit('111110000011', 3, 5) 返回11000
set_bit_array (varbit, int, int, int[]) returns varbit
将指定位置的BIT设置为0|1,超出原始长度的部分填充0|1
例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110
bit_count (varbit, int, int, int) returns int
从第n位开始,统计N个BIT位中有多少个0|1,如果N超出长度,则只计算已经存在的。
例如 bit_count('1111000011110000', 1, 5, 4) 返回 1 (0001)
bit_count (varbit, int) returns int
统计整个bit string中1|0的个数。
例如 bit_count('1111000011110000', 1) 返回 8
bit_fill (int, int) returns varbit
填充指定长度的0 或 1
例如 bit_fill(0,10) 返回 '0000000000'
bit_rand (int, int, float) returns varbit
填充指定长度的随机BIT,并指定1或0的随机比例
例如 bit_rand(10, 1, 0.3) 可能返回 '0101000001'
bit_posite (varbit, int, boolean) returns int[]
返回 1|0 的位置信息,下标从0开始计数, true时正向返回,false时反向返回
例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10]
bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]
bit_posite (varbit, int, int, boolean) returns int[]
返回 1|0 的位置信息,下标从0开始计数,true时正向返回,false时反向返回,返回N个为止
例如 bit_posite ('11110010011', 1, 3, true) 返回 [0,1,2]
bit_posite ('11110010011', 1, 3, false) 返回 [10,9,6]
get_bit_2 (varbit, int, int) returns int
返回指定位置的bit, 下标从0开始,如果超出BIT位置,返回指定的0或1.
例如 get_bit_2('111110000011', 100, 0) 返回 0 (100已经超出长度,返回用户指定的0)
数据库内置的BIT操作函数请参考源码
src/backend/utils/adt/varbit.c
使用bit存储用户
userid int8表示,可以超过40亿。
rowid int表示,也就是说单个APPID不能允许超过20亿的用户,从0开始自增,配合BIT下标的表示。
appid int表示,不会超过40亿个。
1. 字典表, rowid决定MAP顺序,使用窗口查询返回顺序。
drop table IF EXISTS t_userid_dic;
create table IF NOT EXISTS t_userid_dic(appid int not null, rowid int not null, userid int8 not null, unique (appid,userid), unique (appid,rowid));
插入用户字典表的函数,可以产生无缝的连续ROWID。
create or replace function f_uniq(i_appid int, i_userid int8) returns int as $$
declare
newid int;
i int := 0;
res int;
stack1 text;
stack2 text;
stack3 text;
stack4 text;
stack5 text;
stack6 text;
stack7 text;
stack8 text;
stack9 text;
stack10 text;
begin
loop
if i>0 then
perform pg_sleep(random());
else
i := i+1;
end if;
-- 获取已有的最大ID+1 (即将插入的ID)
select max(rowid)+1 into newid from t_userid_dic where appid=i_appid;
if newid is not null then
-- 获取AD LOCK (乘以appid, 为了避免不同ID的冲突, 算法可以再改进)
if pg_try_advisory_xact_lock(i_appid::int8 * newid) then
-- 插入
insert into t_userid_dic (appid, rowid, userid) values (i_appid, newid, i_userid);
-- 返回此次获取到的UID
return newid;
else
-- 没有获取到AD LOCK则继续循环
continue;
end if;
else
-- 表示这是第一条记录,获取AD=0 的LOCK
if pg_try_advisory_xact_lock(0 * i_appid::int8) then
insert into t_userid_dic (appid, rowid, userid) values (i_appid, 0, i_userid);
return 0;
else
continue;
end if;
end if;
end loop;
exception
-- 只忽略违反唯一约束的错误,其他错误继续报
when SQLSTATE '23505' then
/*
-- 参考 https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING
GET STACKED DIAGNOSTICS stack1 = RETURNED_SQLSTATE,
stack2 = COLUMN_NAME,
stack3 = CONSTRAINT_NAME,
stack4 = PG_DATATYPE_NAME,
stack5 = MESSAGE_TEXT,
stack6 = TABLE_NAME,
stack7 = SCHEMA_NAME,
stack8 = PG_EXCEPTION_DETAIL,
stack9 = PG_EXCEPTION_HINT,
stack10 = PG_EXCEPTION_CONTEXT;
-- 如果因为瞬态导致PK冲突了,继续调用(注意如果appid+userid重复,会无限循环,所以不建议使用)
-- select f_uniq(i_appid, i_userid) into res;
raise notice 'RETURNED_SQLSTATE, %', stack1;
raise notice 'COLUMN_NAME, %', stack2;
raise notice 'CONSTRAINT_NAME, %', stack3;
raise notice 'PG_DATATYPE_NAME, %', stack4;
raise notice 'MESSAGE_TEXT, %', stack5;
raise notice 'TABLE_NAME, %', stack6;
raise notice 'SCHEMA_NAME, %', stack7;
raise notice 'PG_EXCEPTION_DETAIL, %', stack8;
raise notice 'PG_EXCEPTION_HINT, %', stack9;
raise notice 'PG_EXCEPTION_CONTEXT, %', stack10;
-- 没有插入成功,返回NULL
*/
return null;
end;
$$ language plpgsql strict;
如果以上调用返回NULL,说明插入失败,可能违反了唯一约束,应用端重试即可。
压测以上函数是否能无缝插入,压测时raise notice可以去掉。
$ vi test.sql
\set appid random(1,1000)
\set userid random(1,2000000000)
select f_uniq(:appid, :userid);
164个并发
$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 164 -j 164 -T 10
progress: 10.0 s, 85720.5 tps, lat 1.979 ms stddev 34.808
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 164
number of threads: 164
duration: 10 s
number of transactions actually processed: 827654
latency average = 2.039 ms
latency stddev = 35.674 ms
tps = 75435.422933 (including connections establishing)
tps = 75483.813182 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set appid random(1,1000)
0.000 \set userid random(1,2000000000)
2.034 select f_uniq(:appid, :userid);
验证
postgres=# select count(*) from t_userid_dic ;
count
--------
825815
(1 row)
postgres=# select appid,count(*),max(rowid) from t_userid_dic group by 1;
appid | count | max
-------+-------+-----
251 | 857 | 857
106 | 854 | 854
681 | 816 | 816
120 | 826 | 826
285 | 815 | 815
866 | 778 | 778
264 | 873 | 873
......
postgres=# select * from (select appid,count(*),max(rowid) from t_userid_dic group by 1) t where max<>count;
appid | count | max
-------+-------+-----
(0 rows)
插入速度,无缝需求,完全符合要求。
生成1亿测试用户,APPID=1, 用于后面的测试
truncate t_userid_dic;
insert into t_userid_dic select 1, generate_series(1,100000000), generate_series(1,100000000);
2. 实时变更表
为了提高写入性能,数据将实时的写入这张表,后台增量的将这个表的数据合并到TAG表。
drop table IF EXISTS t_user_tags;
create table IF NOT EXISTS t_user_tags(id serial8 primary key, appid int, userid int8, tag int, ins boolean, dic boolean default false);
create index idx_t_user_tags_id on t_user_tags(id) where dic is false;
-- ins = true表示新增tag, =false 表示删除tag。
-- dic = true表示该记录设计的USERID已经合并到用户字典表
-- 这张表的设计有优化余地,例如最好切换使用,以清理数据,比如每天一张,保存31天。
生成1.5千万测试数据(APPID=1 , USERID 总量20亿,随机产生, 新增tagid 范围1-10000, 删除tagid 范围1-1000)
insert into t_user_tags (appid,userid,tag,ins) select 1, 2000000000*random(),10000*random(),true from generate_series(1,10000000);
insert into t_user_tags (appid,userid,tag,ins) select 1, 2000000000*random(),5000*random(),false from generate_series(1,5000000);
3. tag + userids bitmap 表,这个是最关键的表,查询量很大,从t_user_tags增量合并进这个表。
drop table IF EXISTS t_tags;
create table IF NOT EXISTS t_tags(id serial primary key, appid int, tag int, userids varbit);
-- id可以用于advisory lock, 并行更新
create unique index idx_t_tags_uk on t_tags(tag,appid);
生成1万个TAG的测试数据,每个TAG包含1亿个用户的BIT。方便下面的测试
-- 生成随机bit
CREATE OR REPLACE FUNCTION public.randbit(integer)
RETURNS bit varying
LANGUAGE plpgsql
STRICT
AS $function$
declare
res varbit;
begin
select (string_agg(mod((2*random())::int,2)::text,''))::varbit into res from generate_series(1,$1);
if res is not null then
return res;
else
return ''::varbit;
end if;
end;
$function$
create sequence seq;
-- 并行插入10000条记录
$ vi test.sql
insert into t_tags(appid,tag,userids) select 1,nextval('seq'::regclass),randbit(100000000);
$ pgbench -M simple -n -r -f ./test.sql -c 50 -j 50 -t 200
这个指标显示了用户勾选一些TAG组合后,圈定并返回用户群体的性能。
测试方法很简单: 包含所有,不包含,包含任意。
1. 包含以下TAG的用户ID
userids (bitand) userids
结果为bit位为1的用户
测试SQL如下
-- 获取最大BIT的长度
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) )
select appid,userid from t_userid_dic, tmp
where appid = ?
and rowid = any
(
(
-- 正向取出1万个bit=1的用户, 如果要反向取,使用false(表示取最近的用户数据)
-- rowid从0开始计数,刚好与bit下标起始位置匹配
select bit_posite(res, 1, 10000, true) from
(
select t1.userids & t2.userids & t3.userids & t4.userids AS res -- & ......
from
-- 根据最大长度, BIT补齐
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4
-- ......
) t
)::int[]
)
;
性能数据
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) )
select appid,userid from t_userid_dic, tmp
where appid = 1
and rowid = any
(
(
select bit_posite(res, 1, 10000, true) from
(
select t1.userids & t2.userids & t3.userids & t4.userids as res
from
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4
) t
)::int[]
)
;
2. 不包含以下TAG的用户
userids (bitor) userids
结果为bit位为0的用户
测试SQL如下
-- 获取最大BIT的长度
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) )
select appid,userid from t_userid_dic, tmp
where appid = ?
and rowid = any
(
(
-- 正向取出1万个bit=0的用户, 如果要反向取,使用false(表示取最近的用户数据)
select bit_posite(res, 0, 10000, true) from
(
select t1.userids | t2.userids | t3.userids | t4.userids AS res -- | ......
from
-- 根据最大长度, BIT补齐
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4
-- ......
) t
)::int[]
)
;
性能数据
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) )
select appid,userid from t_userid_dic, tmp
where appid = 1
and rowid = any
(
(
select bit_posite(res, 0, 10000, true) from
(
select t1.userids | t2.userids | t3.userids | t4.userids as res
from
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4
) t
)::int[]
)
;
3. 包含以下任意TAG
userids (bitor) userids
结果为bit位为1的用户
测试SQL如下
-- 获取最大BIT的长度
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) )
select appid,userid from t_userid_dic, tmp
where appid = ?
and rowid = any
(
(
-- 正向取出1万个bit=1的用户, 如果要反向取,使用false(表示取最近的用户数据)
select bit_posite(res, 1, 10000, true) from
(
select t1.userids | t2.userids | t3.userids | t4.userids AS res -- | ......
from
-- 根据最大长度, BIT补齐
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4
-- ......
) t
)::int[]
)
;
性能数据
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) )
select appid,userid from t_userid_dic, tmp
where appid = 1
and rowid = any
(
(
select bit_posite(res, 1, 10000, true) from
(
select t1.userids | t2.userids | t3.userids | t4.userids as res
from
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 ,
( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4
) t
)::int[]
)
;
1. 结合bit_posite,可以实现正向取若干用户,反向取若干用户(例如有100万个结果,本次推广只要1万个用户,而且要最近新增的1万个用户,则反向取1万个用户即可)。
2. 结合get_bit则可以实现截取某一段BIT,再取得结果,很好用哦。
新增数据即往t_user_tags表插入数据的性能。
postgres=# \d+ t_user_tags
Table "public.t_user_tags"
Column | Type | Modifiers | Storage | Stats target | Description
--------+---------+----------------------------------------------------------+---------+--------------+-------------
id | bigint | not null default nextval('t_user_tags_id_seq'::regclass) | plain | |
appid | integer | | plain | |
userid | bigint | | plain | |
tag | integer | | plain | |
ins | boolean | | plain | |
dic | boolean | default false | plain | |
Indexes:
"t_user_tags_pkey" PRIMARY KEY, btree (id)
"idx_t_user_tags_id" btree (id) WHERE dic IS FALSE
测试如下
$ vi test.sql
\set appid random(1,1000)
\set userid random(1,2000000000)
\set new_tag random(1,10000)
\set old_tag random(8001,10000)
insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :new_tag, true);
insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :old_tag, false);
$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 128 -j 128 -T 120
postgres=# select count(*) from t_user_tags;
count
----------
14721724
(1 row)
性能数据(单步操作的QPS约12.2万,包括新增,删除TAG)
更新的动作需要拆成两个部分,新增和删除,不要合并到一条记录中。
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 128
number of threads: 128
duration: 120 s
number of transactions actually processed: 7360862
latency average = 2.085 ms
latency stddev = 1.678 ms
tps = 61326.338528 (including connections establishing)
tps = 61329.196790 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set appid random(1,1000)
0.000 \set userid random(1,2000000000)
0.000 \set new_tag random(1,10000)
0.000 \set old_tag random(8001,10000)
1.042 insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :new_tag, true);
1.037 insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :old_tag, false);
数据的合并包括3个部分,
1. 更新用户字典表t_userid_dic,
2. 批量获取并删除t_user_tags的记录,
3. 合并标签数据到t_tags。
以上三个动作应该在一个事务中完成。
考虑到t_tags表userids字段1亿BIT约12.5MB,更新单条记录耗时会比较长,所以建议采用并行的模式,每个TAG都可以并行。
从t_user_tags表取出数据并更新数据字典,被取出的数据标记为允许合并。
此操作没有必要并行,串行即可,搞个后台进程无限循环。
create or replace function update_dict(v_rows int) returns void as $$
declare
min_id int; -- 边界id
f_uniq_res int8;
begin
if v_rows<1 then
raise notice 'v_rows must >=1';
return;
end if;
-- 查找APPID, tag对应最小边界, 没有记录直接返回
select min(id) into min_id from t_user_tags where dic=false;
if not found then
raise notice 'no data';
return;
end if;
-- 插入本次需要处理的临时数据到数组
-- f_uniq失败也不会报错,这里需要修改一下f_uniq仅对UK冲突不处理,其他错误还是需要处理的,否则t_user_tags改了,但是USER可能没有进入字典。
with tmp as (update t_user_tags t set dic=true where id>=min_id and id<=min_id+v_rows returning *)
select count(*) into f_uniq_res from (select f_uniq(appid,userid) from (select appid,userid from tmp group by 1,2) t) t;
end;
$$ language plpgsql;
由于批量操作,可能申请大量的ad lock, 所以需要增加max_locks_per_transaction, 数据库参数调整
max_locks_per_transaction=40960
验证
postgres=# select update_dict(200000);
update_dict
-------------
(1 row)
Time: 8986.175 ms
多执行几次
Time: 9395.991 ms
Time: 10798.631 ms
Time: 10726.547 ms
Time: 10620.055 ms
每秒约处理2万
验证字典更新是否准确
postgres=# select count(*) from t_userid_dic ;
count
---------
1399501
(1 row)
Time: 110.656 ms
postgres=# select count(*) from (select appid,userid from t_user_tags where dic=true group by 1,2) t;
count
---------
1399501
(1 row)
Time: 2721.264 ms
postgres=# select * from t_userid_dic order by appid,rowid limit 10;
appid | rowid | userid
-------+-------+------------
1 | 0 | 1802787010
1 | 1 | 1342147584
1 | 2 | 1560458710
1 | 3 | 1478701081
1 | 4 | 1826138023
1 | 5 | 182295180
1 | 6 | 1736227913
1 | 7 | 512247294
1 | 8 | 686842950
1 | 9 | 1940486738
(10 rows)
postgres=# select min(rowid),max(rowid),count(*),appid from t_userid_dic group by appid;
min | max | count | appid
-----+---------+---------+-------
1 | 1399501 | 1399501 | 1
(1 row)
Time: 369.562 ms
虽然没有必要并行,但是这个函数需要保护其并行的安全性,所以接下来验证并行安全性
$ vi test.sql
select update_dict(1000);
$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 100
验证并行结果的安全性,结果可靠
postgres=# select count(*) from t_userid_dic ;
count
---------
1533534
(1 row)
postgres=# select count(*) from (select appid,userid from t_user_tags where dic=true group by 1,2) t;
count
---------
1533534
(1 row)
postgres=# select * from t_userid_dic order by appid,rowid limit 10;
appid | rowid | userid
-------+-------+------------
1 | 0 | 1802787010
1 | 1 | 1342147584
1 | 2 | 1560458710
1 | 3 | 1478701081
1 | 4 | 1826138023
1 | 5 | 182295180
1 | 6 | 1736227913
1 | 7 | 512247294
1 | 8 | 686842950
1 | 9 | 1940486738
(10 rows)
postgres=# select min(rowid),max(rowid),count(*),appid from t_userid_dic group by appid;
min | max | count | appid
-----+---------+---------+-------
1 | 1533534 | 1533534 | 1
(1 row)
写在前面: 如果你要在一个函数中处理用户字典的更新和合并的动作,务必使用repeatable read隔离级别,保证处理字典的数据与合并的数据一致。
前面已经处理了字典的更新,接下来就可以将t_user_tags.dic=true的数据,合并到t_tags中。
考虑更新T_TAGS可能较慢,尽量提高并行度,不同TAG并行。
-- 不要对同一个APPID并行使用APPID与APPID+tag的模式.
create or replace function merge_tags(
v_appid int, -- 输入本次要处理的appid
v_tag int, -- 输入本次要处理的tags , 输入null 则处理该APPID的所有tag, 注意不要使用strict function, 否则遇到NULL参数会直接返回NULL
v_rows int -- 输入要本次处理多少条记录,即合并量
) returns void as $$
declare
min_id int; -- 边界id
ad_lockid int8; -- 可以有效避免不同的APPID在并行操作时冲突
-- 循环
i_tag int;
i_userid_del int8[];
i_userid_add int8[];
-- 将userid[]转换为ROWID[]
i_rowid_del int[];
i_rowid_add int[];
-- 临时存储, 用于debug
i_userids_del varbit := '0'::varbit;
i_userids_add varbit := '0'::varbit;
begin
-- 计算ad_lockid
if v_tag is not null then
ad_lockid := (v_appid+1)^2 + (v_tag+1)^2;
else
ad_lockid := (v_appid+1)^2;
end if;
-- 获取并行保护锁
if not pg_try_advisory_xact_lock(ad_lockid) then
raise notice 'cann''t parallel merge same tag with ad_lockid: %', ad_lockid;
return;
end if;
-- 处理行数
if v_rows<1 then
raise notice 'v_rows must >=1';
return;
end if;
-- 判断是否处理单个tag
if v_tag is not null then
-- raise notice 'v_tag: %', v_tag;
-- 处理单个tag
-- 查找APPID, tag对应最小边界, 没有记录直接返回
select min(id) into min_id from t_user_tags where dic=true and appid=v_appid and tag=v_tag;
if not found then
raise notice 'no data for appid:% , tag:% ', v_appid, v_tag;
return;
end if;
-- 获取实时数据,删除实时数据,聚合实时数据,合并到TAGS
-- 建议RDS PostgreSQL新增delete|update limit语法,抛弃with id>=min_id for update
-- 这里有个BUG, 合并没有考虑到TAG的新增或删除的先后顺序,例如某APPID的某用户新增了一个TAG,然后又把这个TAG删除了。最后合并的结果会变成有这个TAG。
-- 使用window函数可以解决这个问题, 将array_agg这个QUERY改掉, 在聚合前,先合并数据,使用ins, row_number() over (partition by appid,userid,tag order by id desc) rn .... where rn=1, 每一个appid,userid,tag取最后一个ins的值。
for i_tag, i_rowid_del, i_rowid_add in
with tmp as (select * from t_user_tags t where dic=true and appid=v_appid and tag=v_tag and id>=min_id order by id limit v_rows for update),
tmp0 as (select * from ( select *, row_number() over (partition by appid,userid,tag order by id desc) as rn from tmp ) as ss where ss.rn=1), -- 同一个appid,userid用窗口取最后一条状态
tmp1 as (delete from t_user_tags t where exists (select 1 from tmp where tmp.id=t.id))
select t1.tag, array_remove(array_agg(case when not t1.ins then t2.rowid else null end), null) code_del, array_remove(array_agg(case when t1.ins then t2.rowid else null end), null) code_add
from tmp3 t1 join t_userid_dic t2 on (t1.type=t2.type and t1.code=t2.code and t2.appid=v_appid) group by t1.tag_name
-- select tag, array_agg(case when not ins then userid else null end) userid_del, array_agg(case when ins then userid else null end) userid_add from tmp group by tag
loop
-- 判断TAG是否存在, 存在则更新,否则插入
perform 1 from t_tags where appid=v_appid and tag=i_tag;
if found then
update t_tags set userids = set_bit_array( set_bit_array(userids, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit where appid=v_appid and tag=i_tag;
else
insert into t_tags(appid, tag, userids) values (v_appid, i_tag, set_bit_array( set_bit_array('0'::varbit, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit);
end if;
end loop;
else
-- 处理所有tag
-- 查找APPID最小边界, 没有记录直接返回
select min(id) into min_id from t_user_tags where dic=true and appid=v_appid;
if not found then
raise notice 'no data for appid:%', v_appid;
return;
end if;
-- 获取实时数据,删除实时数据,聚合实时数据,合并到TAGS
-- 这里有个BUG, 合并没有考虑到TAG的新增或删除的先后顺序,例如某APPID的某用户新增了一个TAG,然后又把这个TAG删除了。最后合并的结果会变成有这个TAG。
-- 使用window函数可以解决这个问题, 将array_agg这个QUERY改掉, 在聚合前, 先合并数据, 使用ins, row_number() over (partition by appid,userid,tag order by id desc) rn .... where rn=1, 每一个appid,userid,tag取最后一个ins的值。
for i_tag, i_rowid_del, i_rowid_add in
with tmp as (select * from t_user_tags t where dic=true and appid=v_appid and id>=min_id order by id limit v_rows for update),
tmp0 as (select * from ( select *, row_number() over (partition by appid,userid,tag order by id desc) as rn from tmp ) as ss where ss.rn=1), -- 同一个appid,userid用窗口取最后一条状态
tmp1 as (delete from t_user_tags t where exists (select 1 from tmp where tmp.id=t.id))
select t1.tag, array_remove(array_agg(case when not t1.ins then t2.rowid else null end), null) code_del, array_remove(array_agg(case when t1.ins then t2.rowid else null end), null) code_add
from tmp3 t1 join t_userid_dic t2 on (t1.type=t2.type and t1.code=t2.code and t2.appid=v_appid) group by t1.tag_name
-- select tag, array_agg(case when not ins then userid else null end) userid_del, array_agg(case when ins then userid else null end) userid_add from tmp group by tag
loop
-- execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_del, null) ) into i_rowid_del;
-- execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_add, null) ) into i_rowid_add;
-- 判断TAG是否存在, 存在则更新,否则插入
perform 1 from t_tags where appid=v_appid and tag=i_tag;
if found then
update t_tags set userids = set_bit_array( set_bit_array(userids, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit where appid=v_appid and tag=i_tag;
else
insert into t_tags(appid, tag, userids) values (v_appid, i_tag, set_bit_array( set_bit_array('0'::varbit, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit);
end if;
end loop;
end if;
end;
$$ language plpgsql;
-- 不要使用strict
速度测试
$ vi test.sql
\set tag random(1,10000)
select merge_tags(1,:tag,10000);
$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 128 -j 128 -T 100
每秒约合并15万记录。
验证方法,merge的结果数据与被merge的数据一致即可。
符合要求
truncate t_tags;
select update_dict(100000);
create table check_merge1 as select tag,count(*) cnt from (select tag,userid from t_user_tags where dic=true order by id limit 10000) t group by tag;
select merge_tags(1,null,10000);
。。。。。。
select merge_tags(1,null,10000);
create table check_merge2 as select tag, count_bit(userids,1) cnt from t_tags;
postgres=# select t1.*,t2.* from check_merge1 t1 full outer join check_merge2 t2 on (t1.tag=t2.tag and t1.cnt=t2.cnt) where t1.* is null or t2.* is null;
tag | cnt | tag | cnt
-----+-----+-----+-----
(0 rows)
Time: 5.133 ms
如果有结果说明合并有问题。
找到userid对应的rowid, 根据userids rowid位置的bit,判断是否有该tag.
select tag from (select tag, get_bit(t1.userids, t2.rowid-1) bt from t_tags t1, (select rowid from t_userid_dic where userid=?) t2) t where bt=1;
例子,注意对齐(或者改造get_bit函数,支持没有BIT位的操作)
postgres=# \set FETCH_COUNT 1
postgres=# select tag from (select tag, get_bit(t1.userids, t2.rowid-1) bt from t_tags t1, (select rowid from t_userid_dic where userid=100000) t2) t where bt=1;
tag
--------
226813
226824
226818
226810
226782
226790
226792
226787
226803
226826
(10 rows)
Time: 152.636 ms
查询单个用户有哪些TAG是一个比较重的操作,如果碰到有很多TAG并且用户数非常庞大时,建议使用并行。
并行相关参数配置
postgres=# show parallel_tuple_cost;
parallel_tuple_cost
---------------------
0
postgres=# show parallel_setup_cost ;
parallel_setup_cost
---------------------
0
postgres=# show max_parallel_workers_per_gather ;
max_parallel_workers_per_gather
---------------------------------
27
postgres=# show max_worker_processes ;
max_worker_processes
----------------------
128
postgres=# show force_parallel_mode ;
force_parallel_mode
---------------------
on
postgres=# alter table t_tags set (parallel_workers=27);
ALTER TABLE
返回用户数组,并行化之后,每个tag处理约耗时0.76毫秒。
如果用游标返回,可以快速得到第一个用户。
postgres=# explain (analyze,verbose,costs,buffers,timing) select array_agg(tag) from t_tags where get_bit(userids,10000)=1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
Aggregate (cost=1047.68..1047.69 rows=1 width=32) (actual time=7176.745..7176.745 rows=1 loops=1)
Output: array_agg(tag)
Buffers: shared hit=15912565
-> Gather (cost=0.00..1047.55 rows=50 width=4) (actual time=8.940..7175.486 rows=4957 loops=1)
Output: tag
Workers Planned: 27
Workers Launched: 27
Buffers: shared hit=15912565
-> Parallel Seq Scan on public.t_tags (cost=0.00..1047.55 rows=2 width=4) (actual time=51.974..6023.333 rows=177 loops=28)
Output: tag
Filter: (get_bit((t_tags.userids)::"bit", 10000) = 1)
Rows Removed by Filter: 180
Buffers: shared hit=15909973
Worker 0: actual time=24.376..5791.799 rows=158 loops=1
Buffers: shared hit=528366
Worker 1: actual time=23.143..6749.264 rows=198 loops=1
Buffers: shared hit=632954
Worker 2: actual time=55.201..6400.872 rows=186 loops=1
Buffers: shared hit=604388
Worker 3: actual time=23.456..5351.070 rows=152 loops=1
Buffers: shared hit=482151
Worker 4: actual time=71.248..6179.161 rows=181 loops=1
Buffers: shared hit=580237
Worker 5: actual time=124.402..5395.424 rows=140 loops=1
Buffers: shared hit=493010
Worker 6: actual time=111.926..6013.077 rows=168 loops=1
Buffers: shared hit=553851
Worker 7: actual time=24.483..7170.148 rows=202 loops=1
Buffers: shared hit=677578
Worker 8: actual time=23.284..5448.081 rows=139 loops=1
Buffers: shared hit=487985
Worker 9: actual time=54.677..7057.927 rows=233 loops=1
Buffers: shared hit=666715
Worker 10: actual time=73.070..6615.151 rows=177 loops=1
Buffers: shared hit=622393
Worker 11: actual time=25.978..5977.110 rows=182 loops=1
Buffers: shared hit=552329
Worker 12: actual time=22.975..5366.569 rows=150 loops=1
Buffers: shared hit=480447
Worker 13: actual time=76.756..6940.743 rows=201 loops=1
Buffers: shared hit=655799
Worker 14: actual time=54.590..5362.862 rows=161 loops=1
Buffers: shared hit=482488
Worker 15: actual time=106.099..5454.446 rows=153 loops=1
Buffers: shared hit=494638
Worker 16: actual time=53.649..6048.233 rows=165 loops=1
Buffers: shared hit=553771
Worker 17: actual time=23.089..5810.984 rows=160 loops=1
Buffers: shared hit=532711
Worker 18: actual time=55.039..5981.338 rows=165 loops=1
Buffers: shared hit=542380
Worker 19: actual time=24.163..6187.498 rows=182 loops=1
Buffers: shared hit=571046
Worker 20: actual time=23.965..6119.395 rows=194 loops=1
Buffers: shared hit=566214
Worker 21: actual time=106.038..6238.629 rows=187 loops=1
Buffers: shared hit=582724
Worker 22: actual time=54.568..6488.311 rows=183 loops=1
Buffers: shared hit=613989
Worker 23: actual time=24.021..5368.295 rows=152 loops=1
Buffers: shared hit=488385
Worker 24: actual time=53.327..5658.396 rows=178 loops=1
Buffers: shared hit=515591
Worker 25: actual time=23.201..5358.615 rows=142 loops=1
Buffers: shared hit=483975
Worker 26: actual time=109.940..5560.662 rows=163 loops=1
Buffers: shared hit=505844
Planning time: 0.081 ms
Execution time: 7637.509 ms
(69 rows)
Time: 7638.100 ms
postgres=# select tag from t_tags where get_bit(userids,10000)=1;
tag
--------
226813
226824
Cancel request sent
ERROR: canceling statement due to user request
Time: 17.521 ms
profile,get_bit的瓶颈在memcpy,这个可以通过优化PG内核改进。
3647.00 34.2% memcpy /lib64/libc-2.12.so
大APPID,按USER号段切分
APPID+号段分片
如果一个APPID 1万个TAG,1亿用户,只占用120GB。
通常是出现倾斜时才需要重分布。
PostgreSQL使用postgres_fdw可以原生支持数据分片。
参考我之前写的文档
《PostgreSQL 9.6 单元化,sharding (based on postgres_fdw) - 内核层支持前传》
《PostgreSQL 9.6 sharding + 单元化 (based on postgres_fdw) 最佳实践 - 通用水平分库场景设计与实践》
同一个APPID的一批TAG必须在一个节点
机器学习,生成TAG,本文不涉及。
参考 MADlib、R ,聚类分析 PostgreSQL都可以非常好的支持。