Skip to content

Latest commit

 

History

History
837 lines (725 loc) · 37.6 KB

20150510_01.md

File metadata and controls

837 lines (725 loc) · 37.6 KB

PostgreSQL find out session's current query blocked by which transaction use pg_locks & pg_stat_activity

作者

digoal

日期

2015-05-10

标签

PostgreSQL , 查询 , 锁等待 , 诊断 , 等谁


背景

PostgreSQL和大多数传统RDBMS一样,都设计了大量的锁来保证并发操作的数据一致性。

同时PG在设计锁等待时,以队列方式存储等待锁。

参考

ProcSleep()@src/backend/storage/lmgr/proc.c

http://blog.163.com/digoal@126/blog/static/163877040201352010122653/

因此,会出现一种问题。

例如一个长事务A持有一个表的某条记录的更新锁。

接下来的一个事务B要TRUNCATE这个表,会把这个锁放到等待队列去。

在接下来的事务C......如果请求的锁和TRUNCATE表的锁发生冲突,也会放到等待队列去。

这其实是有利有弊的,

利是什么呢?

B在A释放后,可以立即获得锁进行TRUNCATE。

弊是什么?

排在B后面的事务会被堵塞,虽然它们可能和A没有锁冲突,也需要等待。

弊端往往在某些凑巧的情况下起到放大效果。

例如一个表的操作非常频繁,如果刚好有一个长事务在里面,然后又刚好有事务需要获得一个排他锁,那么接下来的频繁DML请求都会被堵塞。

一般可以用锁超时来降低弊端带来的这种影响,配置参数lock_timeout,如果锁等待超过这个时间,会强行中断,从等待队列去除。

另外,如果我们没有设置lock_timeout或者不方便设置lock_timeout的话,一旦发现数据库出现了大量的等待,

应该如何找到罪魁祸首呢?即处于等待状态的查询,它们到底在等待谁?

找到之后可以人为的杀掉这些罪魁祸首。

正文

使用以下SQL查询等待信息

with t_wait as   
(select a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),  
t_run as   
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)   
select r.locktype,r.mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,r.xact_start r_xact_start,r.query_start r_query_start,r.query r_query,  
w.usename w_user,w.datname w_db,w.pid w_pid,w.xact_start w_xact_start,w.query_start w_query_start,w.query w_query    
from t_wait w,t_run r where  
  r.locktype is not distinct from w.locktype and  
  r.database is not distinct from w.database and  
  r.relation is not distinct from w.relation and  
  r.page is not distinct from w.page and  
  r.tuple is not distinct from w.tuple and  
  r.classid is not distinct from w.classid and  
  r.objid is not distinct from w.objid and  
  r.objsubid is not distinct from w.objsubid  
  order by r.xact_start;  

例如:

-[ RECORD 1 ]-+---------------------------------------------------------------------  
locktype      | relation  
mode          | ShareUpdateExclusiveLock  
r_user        | postgres  
r_db          | postgres  
relation      | tbl  
r_pid         | 24579  
r_xact_start  | 2015-05-10 09:43:53.956252+08  
r_query_start | 2015-05-10 09:43:53.956252+08  
r_query       | autovacuum: VACUUM ANALYZE public.tbl (to prevent wraparound)  
w_user        | postgres  
w_db          | postgres  
w_pid         | 24737  
w_xact_start  | 2015-05-10 09:47:15.294562+08  
w_query_start | 2015-05-10 09:47:15.294562+08  
w_query       | insert into tbl(crt_time) select now() from generate_series(1,1000);  
.....  
(1001 rows)  

干掉它:

postgres=# select pg_terminate_backend(24579);  
-[ RECORD 1 ]--------+--  
pg_terminate_backend | t  

再次查询,等待消失。

postgres=# with t_wait as                       
(select a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),  
t_run as   
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)   
select r.locktype,r.mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,r.xact_start r_xact_start,r.query_start r_query_start,r.query r_query,  
w.usename w_user,w.datname w_db,w.pid w_pid,w.xact_start w_xact_start,w.query_start w_query_start,w.query w_query    
from t_wait w,t_run r where  
  r.locktype is not distinct from w.locktype and  
  r.database is not distinct from w.database and  
  r.relation is not distinct from w.relation and  
  r.page is not distinct from w.page and  
  r.tuple is not distinct from w.tuple and  
  r.classid is not distinct from w.classid and  
  r.objid is not distinct from w.objid and  
  r.objsubid is not distinct from w.objsubid  
  order by r.xact_start;  
(No rows)  

实际上,这个查询还不是完美,因为锁等待实际上是一个TRUNCATE TABLE的操作造成的,而因为AUTO VACUUM FREEZE和TRUNCATE操作冲突了,所以这两个会话的锁都会对后面的DML造成影响,实际上,我们要找到的应该是级别最高的锁(TRUNCATE),干掉这个才是最重要的,普通的vacuum并不会和DML冲突。

所以我们需要改进一下这个查询语句。

从pg_locks视图的函数的源码,可以知道mode是怎么来的。

src/backend/utils/adt/lockfuncs.c

/*  
 * pg_lock_status - produce a view with one row per held or awaited lock mode  
 */  
Datum  
pg_lock_status(PG_FUNCTION_ARGS)  
{  
......  
                Datum           values[NUM_LOCK_STATUS_COLUMNS];  
......  
                values[12] = CStringGetTextDatum(GetLockmodeName(instance->locktag.locktag_lockmethodid, mode));  
......  
}  
src/backend/storage/lmgr/lock.c  
/*  
 * Fetch the lock method table associated with a given lock  
 */  
LockMethod  
GetLocksMethodTable(const LOCK *lock)  
{  
        LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);  
  
        Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));  
        return LockMethods[lockmethodid];  
}  
  
/* Names of lock modes, for debug printouts */  
static const char *const lock_mode_names[] =  
{  
        "INVALID",  
        "AccessShareLock",  
        "RowShareLock",  
        "RowExclusiveLock",  
        "ShareUpdateExclusiveLock",  
        "ShareLock",  
        "ShareRowExclusiveLock",  
        "ExclusiveLock",  
        "AccessExclusiveLock"  
};  
  
/*  
 * Data structures defining the semantics of the standard lock methods.  
 *  
 * The conflict table defines the semantics of the various lock modes.  
 */  
static const LOCKMASK LockConflicts[] = {  
        0,  
        /* AccessShareLock */  
        (1 << AccessExclusiveLock),  
  
        /* RowShareLock */  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* RowExclusiveLock */  
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* ShareUpdateExclusiveLock */  
        (1 << ShareUpdateExclusiveLock) |  
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* ShareLock */  
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |  
        (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* ShareRowExclusiveLock */  
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |  
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* ExclusiveLock */  
        (1 << RowShareLock) |  
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |  
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock),  
  
        /* AccessExclusiveLock */  
        (1 << AccessShareLock) | (1 << RowShareLock) |  
        (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |  
        (1 << ShareLock) | (1 << ShareRowExclusiveLock) |  
        (1 << ExclusiveLock) | (1 << AccessExclusiveLock)  
  
};  
  
static const LockMethodData default_lockmethod = {  
        AccessExclusiveLock,            /* highest valid lock mode number */  
        LockConflicts,  
        lock_mode_names,  
#ifdef LOCK_DEBUG  
        &Trace_locks  
#else  
        &Dummy_trace  
#endif  
};  
  
static const LockMethodData user_lockmethod = {  
        AccessExclusiveLock,            /* highest valid lock mode number */  
        LockConflicts,  
        lock_mode_names,  
#ifdef LOCK_DEBUG  
        &Trace_userlocks  
#else  
        &Dummy_trace  
#endif  
};  
src/include/storage/lock.h  
/*  
 * These are the valid values of type LOCKMODE for all the standard lock  
 * methods (both DEFAULT and USER).  
 */  
  
/* NoLock is not a lock mode, but a flag value meaning "don't get a lock" */  
#define NoLock                                  0  
  
#define AccessShareLock                 1               /* SELECT */  
#define RowShareLock                    2               /* SELECT FOR UPDATE/FOR SHARE */  
#define RowExclusiveLock                3               /* INSERT, UPDATE, DELETE */  
#define ShareUpdateExclusiveLock 4              /* VACUUM (non-FULL),ANALYZE, CREATE  
                                       * INDEX CONCURRENTLY */  
#define ShareLock                               5               /* CREATE INDEX (WITHOUT CONCURRENTLY) */  
#define ShareRowExclusiveLock   6               /* like EXCLUSIVE MODE, but allows ROW  
                                       * SHARE */  
#define ExclusiveLock                   7               /* blocks ROW SHARE/SELECT...FOR  
                                                 * UPDATE */  
#define AccessExclusiveLock             8               /* ALTER TABLE, DROP TABLE, VACUUM  
                                                * FULL, and unqualified LOCK TABLE */  

改进后的查询如下:

用一个函数来将锁转换为数字,当然,你也可以直接使用case when子句来实现这个转换

postgres=# create or replace function f_lock_level(i_mode text) returns int as $$  
declare  
begin  
  case i_mode  
    when 'INVALID' then return 0;  
    when 'AccessShareLock' then return 1;  
    when 'RowShareLock' then return 2;  
    when 'RowExclusiveLock' then return 3;  
    when 'ShareUpdateExclusiveLock' then return 4;  
    when 'ShareLock' then return 5;  
    when 'ShareRowExclusiveLock' then return 6;  
    when 'ExclusiveLock' then return 7;  
    when 'AccessExclusiveLock' then return 8;  
    else return 0;  
  end case;  
end;   
$$ language plpgsql strict;  

修改查询语句,按锁级别排序:

with t_wait as                       
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,  
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,  
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),  
t_run as   
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,  
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,  
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)   
select r.locktype,r.mode r_mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,  
r.page r_page,r.tuple r_tuple,r.xact_start r_xact_start,r.query_start r_query_start,  
now()-r.query_start r_locktime,r.query r_query,w.mode w_mode,w.pid w_pid,w.page w_page,  
w.tuple w_tuple,w.xact_start w_xact_start,w.query_start w_query_start,  
now()-w.query_start w_locktime,w.query w_query    
from t_wait w,t_run r where  
  r.locktype is not distinct from w.locktype and  
  r.database is not distinct from w.database and  
  r.relation is not distinct from w.relation and  
  r.page is not distinct from w.page and  
  r.tuple is not distinct from w.tuple and  
  r.classid is not distinct from w.classid and  
  r.objid is not distinct from w.objid and  
  r.objsubid is not distinct from w.objsubid and  
  r.transactionid is not distinct from w.transactionid and  
  r.pid <> w.pid  
  order by f_lock_level(w.mode)+f_lock_level(r.mode) desc,r.xact_start;  

现在可以排在前面的就是锁级别高的等待,优先干掉这个。

-[ RECORD 1 ]-+---------------------------------------------------------------------  
locktype      | relation  -- 冲突类型  
r_mode        | ShareUpdateExclusiveLock  -- 持锁模式  
r_user        | postgres  -- 持锁用户  
r_db          | postgres  -- 持锁数据库  
relation      | tbl  -- 持锁对象  
r_pid         | 25656  -- 持锁进程  
r_xact_start  | 2015-05-10 14:11:16.08318+08  -- 持锁事务开始时间  
r_query_start | 2015-05-10 14:11:16.08318+08  -- 持锁SQL开始时间  
r_locktime    | 00:01:49.460779  -- 持锁时长  
r_query       | vacuum freeze tbl;  --  持锁SQL,注意不一定是这个SQL带来的锁,也有可能是这个事务在之前执行的SQL加的锁  
w_mode        | AccessExclusiveLock  -- 等待锁模式  
w_pid         | 26731  -- 等待锁进程  
w_xact_start  | 2015-05-10 14:11:17.987362+08  --  等待锁事务开始时间  
w_query_start | 2015-05-10 14:11:17.987362+08  --  等待锁SQL开始时间  
w_locktime    | 00:01:47.556597  --  等待锁时长  
w_query       | truncate tbl;  -- 等待锁SQL  
-[ RECORD 2 ]-+---------------------------------------------------------------------  
locktype      | relation  
r_mode        | ShareUpdateExclusiveLock  
r_user        | postgres  
r_db          | postgres  
relation      | tbl  
r_pid         | 25656  
r_xact_start  | 2015-05-10 14:11:16.08318+08  
r_query_start | 2015-05-10 14:11:16.08318+08  
r_locktime    | 00:01:49.460779  
r_query       | vacuum freeze tbl;  
w_mode        | RowExclusiveLock  
w_pid         | 25582  
w_xact_start  | 2015-05-10 14:11:22.845+08  
w_query_start | 2015-05-10 14:11:22.845+08  
w_locktime    | 00:01:42.698959  
w_query       | insert into tbl(crt_time) select now() from generate_series(1,1000);  -- 这个SQL其实等待的是truncate tbl的锁;  
......  

锁冲突判断函数:

LockCheckConflicts()@src/backend/storage/lmgr/lock.c

我们可以创建一个函数用来杀掉对同一个对象某些锁等待时间和等待进程超出阈值的进程。

例如扩展数据块的锁超出一定数量,我们想办法杀掉,避免大量等待。

CREATE OR REPLACE FUNCTION public.f_kill_extend(i_interval interval, i_waiting bigint)  
 RETURNS void  
 LANGUAGE plpgsql  
 STRICT  
AS $function$  
declare  
  v_database oid;  
  v_relation oid;  
  v_pid int;  
  v_record record;  
begin  
if (pg_is_in_recovery()) then  
  return;  
end if;  
  
for v_record in with t_wait as                       
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,  
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,  
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and not a.granted),  
t_run as   
(select a.mode,a.locktype,a.database,a.relation,a.page,a.tuple,a.classid,a.objid,a.objsubid,  
a.pid,a.virtualtransaction,a.virtualxid,a,transactionid,b.query,b.xact_start,b.query_start,  
b.usename,b.datname from pg_locks a,pg_stat_activity b where a.pid=b.pid and a.granted)   
select r.locktype,r.mode r_mode,r.usename r_user,r.datname r_db,r.relation::regclass,r.pid r_pid,  
r.page r_page,r.tuple r_tuple,r.xact_start r_xact_start,r.query_start r_query_start,  
now()-r.query_start r_locktime,r.query r_query,w.mode w_mode,w.pid w_pid,w.page w_page,  
w.tuple w_tuple,w.xact_start w_xact_start,w.query_start w_query_start,  
now()-w.query_start w_locktime,w.query w_query    
from t_wait w,t_run r where  
  r.locktype is not distinct from w.locktype and  
  r.database is not distinct from w.database and  
  r.relation is not distinct from w.relation and  
  r.page is not distinct from w.page and  
  r.tuple is not distinct from w.tuple and  
  r.classid is not distinct from w.classid and  
  r.objid is not distinct from w.objid and  
  r.objsubid is not distinct from w.objsubid and  
  r.transactionid is not distinct from w.transactionid and  
  r.pid <> w.pid  
  order by f_lock_level(w.mode)+f_lock_level(r.mode) desc,r.xact_start   
LOOP  
  raise notice '%', v_record;  
END LOOP;  
  
for v_database,v_relation in select database,relation from pg_locks where   
  locktype='extend' and mode='ExclusiveLock' and not granted and  
  pid in (select pid from pg_stat_activity where now()-xact_start > i_interval)   
  group by 1,2 having count(*) > i_waiting  
loop  
  perform pg_terminate_backend(pid) from pg_locks   
    where database=v_database and relation=v_relation;  
end loop;  
  
return;  
end;  
$function$;  

例如:

psql -c "select f_kill_extend(interval '1 sec', 10);"  

参考

1. http://blog.163.com/digoal@126/blog/static/163877040201352010122653/

2. src/backend/storage/lmgr/proc.c

/*  
 * ProcSleep -- put a process to sleep on the specified lock  
 *  
 * Caller must have set MyProc->heldLocks to reflect locks already held  
 * on the lockable object by this process (under all XIDs).  
 *  
 * The lock table's partition lock must be held at entry, and will be held  
 * at exit.  
 *  
 * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).  
 *  
 * ASSUME: that no one will fiddle with the queue until after  
 *              we release the partition lock.  
 *  
 * NOTES: The process queue is now a priority queue for locking.  
 *  
 * P() on the semaphore should put us to sleep.  The process  
 * semaphore is normally zero, so when we try to acquire it, we sleep.  
 */  
int  
ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)  
{  
        LOCKMODE        lockmode = locallock->tag.mode;  
        LOCK       *lock = locallock->lock;  
        PROCLOCK   *proclock = locallock->proclock;  
        uint32          hashcode = locallock->hashcode;  
        LWLockId        partitionLock = LockHashPartitionLock(hashcode);  
        PROC_QUEUE *waitQueue = &(lock->waitProcs);  
        LOCKMASK        myHeldLocks = MyProc->heldLocks;  
        bool            early_deadlock = false;  
        bool            allow_autovacuum_cancel = true;  
        int                     myWaitStatus;  
        PGPROC     *proc;  
        int                     i;  
  
        /*  
         * Determine where to add myself in the wait queue.  
         *  
         * Normally I should go at the end of the queue.  However, if I already  
         * hold locks that conflict with the request of any previous waiter, put  
         * myself in the queue just in front of the first such waiter. This is not  
         * a necessary step, since deadlock detection would move me to before that  
         * waiter anyway; but it's relatively cheap to detect such a conflict  
         * immediately, and avoid delaying till deadlock timeout.  
         *  
         * Special case: if I find I should go in front of some waiter, check to  
         * see if I conflict with already-held locks or the requests before that  
         * waiter.  If not, then just grant myself the requested lock immediately.  
         * This is the same as the test for immediate grant in LockAcquire, except  
         * we are only considering the part of the wait queue before my insertion  
         * point.  
         */  
        if (myHeldLocks != 0)  
        {  
                LOCKMASK        aheadRequests = 0;  
  
                proc = (PGPROC *) waitQueue->links.next;  
                for (i = 0; i < waitQueue->size; i++)  
                {  
                        /* Must he wait for me? */  
                        if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)  
                        {  
                                /* Must I wait for him ? */  
                                if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks)  
                                {  
                                        /*  
                                         * Yes, so we have a deadlock.  Easiest way to clean up  
                                         * correctly is to call RemoveFromWaitQueue(), but we  
                                         * can't do that until we are *on* the wait queue. So, set  
                                         * a flag to check below, and break out of loop.  Also,  
                                         * record deadlock info for later message.  
                                         */  
                                        RememberSimpleDeadLock(MyProc, lockmode, lock, proc);  
                                        early_deadlock = true;  
                                        break;  
                                }  
                                /* I must go before this waiter.  Check special case. */  
                                if ((lockMethodTable->conflictTab[lockmode] & aheadRequests) == 0 &&  
                                        LockCheckConflicts(lockMethodTable,  
                                                                           lockmode,  
                                                                           lock,  
                                                                           proclock,  
                                                                           MyProc) == STATUS_OK)  
                                {  
                                        /* Skip the wait and just grant myself the lock. */  
                                        GrantLock(lock, proclock, lockmode);  
                                        GrantAwaitedLock();  
                                        return STATUS_OK;  
                                }  
                                /* Break out of loop to put myself before him */  
                                break;  
                        }  
                        /* Nope, so advance to next waiter */  
                        aheadRequests |= LOCKBIT_ON(proc->waitLockMode);  
                        proc = (PGPROC *) proc->links.next;  
                }  
  
                /*  
                 * If we fall out of loop normally, proc points to waitQueue head, so  
                 * we will insert at tail of queue as desired.  
                 */  
        }  
        else  
        {  
                /* I hold no locks, so I can't push in front of anyone. */  
                proc = (PGPROC *) &(waitQueue->links);  
        }  
  
        /*  
         * Insert self into queue, ahead of the given proc (or at tail of queue).  
         */  
        SHMQueueInsertBefore(&(proc->links), &(MyProc->links));  
        waitQueue->size++;  
        lock->waitMask |= LOCKBIT_ON(lockmode);  
  
        /* Set up wait information in PGPROC object, too */  
        MyProc->waitLock = lock;  
        MyProc->waitProcLock = proclock;  
        MyProc->waitLockMode = lockmode;  
  
        MyProc->waitStatus = STATUS_WAITING;  
  
        /*  
         * If we detected deadlock, give up without waiting.  This must agree with  
         * CheckDeadLock's recovery code, except that we shouldn't release the  
         * semaphore since we haven't tried to lock it yet.  
         */  
        if (early_deadlock)  
        {  
                RemoveFromWaitQueue(MyProc, hashcode);  
                return STATUS_ERROR;  
        }  
  
        /* mark that we are waiting for a lock */  
        lockAwaited = locallock;  
  
        /*  
         * Release the lock table's partition lock.  
         *  
         * NOTE: this may also cause us to exit critical-section state, possibly  
         * allowing a cancel/die interrupt to be accepted. This is OK because we  
         * have recorded the fact that we are waiting for a lock, and so  
         * LockErrorCleanup will clean up if cancel/die happens.  
         */  
        LWLockRelease(partitionLock);  
  
        /*  
         * Also, now that we will successfully clean up after an ereport, it's  
         * safe to check to see if there's a buffer pin deadlock against the  
         * Startup process.  Of course, that's only necessary if we're doing Hot  
         * Standby and are not the Startup process ourselves.  
         */  
        if (RecoveryInProgress() && !InRecovery)  
                CheckRecoveryConflictDeadlock();  
  
        /* Reset deadlock_state before enabling the timeout handler */  
        deadlock_state = DS_NOT_YET_CHECKED;  
  
        /*  
         * Set timer so we can wake up after awhile and check for a deadlock. If a  
         * deadlock is detected, the handler releases the process's semaphore and  
         * sets MyProc->waitStatus = STATUS_ERROR, allowing us to know that we  
         * must report failure rather than success.  
         *  
         * By delaying the check until we've waited for a bit, we can avoid  
         * running the rather expensive deadlock-check code in most cases.  
         *  
         * If LockTimeout is set, also enable the timeout for that.  We can save a  
         * few cycles by enabling both timeout sources in one call.  
         */  
        if (LockTimeout > 0)  
        {  
                EnableTimeoutParams timeouts[2];  
  
                timeouts[0].id = DEADLOCK_TIMEOUT;  
                timeouts[0].type = TMPARAM_AFTER;  
                timeouts[0].delay_ms = DeadlockTimeout;  
                timeouts[1].id = LOCK_TIMEOUT;  
                timeouts[1].type = TMPARAM_AFTER;  
                timeouts[1].delay_ms = LockTimeout;  
                enable_timeouts(timeouts, 2);  
        }  
        else  
                enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);  
  
        /*  
         * If someone wakes us between LWLockRelease and PGSemaphoreLock,  
         * PGSemaphoreLock will not block.  The wakeup is "saved" by the semaphore  
         * implementation.  While this is normally good, there are cases where a  
         * saved wakeup might be leftover from a previous operation (for example,  
         * we aborted ProcWaitForSignal just before someone did ProcSendSignal).  
         * So, loop to wait again if the waitStatus shows we haven't been granted  
         * nor denied the lock yet.  
         *  
         * We pass interruptOK = true, which eliminates a window in which  
         * cancel/die interrupts would be held off undesirably.  This is a promise  
         * that we don't mind losing control to a cancel/die interrupt here.  We  
         * don't, because we have no shared-state-change work to do after being  
         * granted the lock (the grantor did it all).  We do have to worry about  
         * canceling the deadlock timeout and updating the locallock table, but if  
         * we lose control to an error, LockErrorCleanup will fix that up.  
         */  
        do  
        {  
                PGSemaphoreLock(&MyProc->sem, true);  
  
                /*  
                 * waitStatus could change from STATUS_WAITING to something else  
                 * asynchronously.  Read it just once per loop to prevent surprising  
                 * behavior (such as missing log messages).  
                 */  
                myWaitStatus = MyProc->waitStatus;  
  
                /*  
                 * If we are not deadlocked, but are waiting on an autovacuum-induced  
                 * task, send a signal to interrupt it.  
                 */  
                if (deadlock_state == DS_BLOCKED_BY_AUTOVACUUM && allow_autovacuum_cancel)  
                {  
                        PGPROC     *autovac = GetBlockingAutoVacuumPgproc();  
                        PGXACT     *autovac_pgxact = &ProcGlobal->allPgXact[autovac->pgprocno];  
  
                        LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);  
  
                        /*  
                         * Only do it if the worker is not working to protect against Xid  
                         * wraparound.  
                         */  
                        if ((autovac != NULL) &&  
                                (autovac_pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&  
                                !(autovac_pgxact->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))  
                        {  
                                int                     pid = autovac->pid;  
                                StringInfoData locktagbuf;  
                                StringInfoData logbuf;  /* errdetail for server log */  
  
                                initStringInfo(&locktagbuf);  
                                initStringInfo(&logbuf);  
                                DescribeLockTag(&locktagbuf, &lock->tag);  
                                appendStringInfo(&logbuf,  
                                                                 _("Process %d waits for %s on %s."),  
                                                                 MyProcPid,  
                                                          GetLockmodeName(lock->tag.locktag_lockmethodid,  
                                                                                          lockmode),  
                                                                 locktagbuf.data);  
  
                                /* release lock as quickly as possible */  
                                LWLockRelease(ProcArrayLock);  
  
                                ereport(LOG,  
                                          (errmsg("sending cancel to blocking autovacuum PID %d",  
                                                          pid),  
                                           errdetail_log("%s", logbuf.data)));  
  
                                pfree(logbuf.data);  
                                pfree(locktagbuf.data);  
  
                                /* send the autovacuum worker Back to Old Kent Road */  
                                if (kill(pid, SIGINT) < 0)  
                                {  
                                        /* Just a warning to allow multiple callers */  
                                        ereport(WARNING,  
                                                        (errmsg("could not send signal to process %d: %m",  
                                                                        pid)));  
                                }  
                        }  
                        else  
                                LWLockRelease(ProcArrayLock);  
  
                        /* prevent signal from being resent more than once */  
                        allow_autovacuum_cancel = false;  
                }  
  
                /*  
                 * If awoken after the deadlock check interrupt has run, and  
                 * log_lock_waits is on, then report about the wait.  
                 */  
                if (log_lock_waits && deadlock_state != DS_NOT_YET_CHECKED)  
                {  
                        StringInfoData buf;  
                        const char *modename;  
                        long            secs;  
                        int                     usecs;  
                        long            msecs;  
  
                        initStringInfo(&buf);  
                        DescribeLockTag(&buf, &locallock->tag.lock);  
                        modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,  
                                                                           lockmode);  
                        TimestampDifference(get_timeout_start_time(DEADLOCK_TIMEOUT),  
                                                                GetCurrentTimestamp(),  
                                                                &secs, &usecs);  
                        msecs = secs * 1000 + usecs / 1000;  
                        usecs = usecs % 1000;  
  
                        if (deadlock_state == DS_SOFT_DEADLOCK)  
                                ereport(LOG,  
                                                (errmsg("process %d avoided deadlock for %s on %s by rearranging queue order after %ld.%03d ms",  
                                                          MyProcPid, modename, buf.data, msecs, usecs)));  
                        else if (deadlock_state == DS_HARD_DEADLOCK)  
                        {  
                                /*  
                                 * This message is a bit redundant with the error that will be  
                                 * reported subsequently, but in some cases the error report  
                                 * might not make it to the log (eg, if it's caught by an  
                                 * exception handler), and we want to ensure all long-wait  
                                 * events get logged.  
                                 */  
                                ereport(LOG,  
                                                (errmsg("process %d detected deadlock while waiting for %s on %s after %ld.%03d ms",  
                                                          MyProcPid, modename, buf.data, msecs, usecs)));  
                        }  
  
                        if (myWaitStatus == STATUS_WAITING)  
                                ereport(LOG,  
                                                (errmsg("process %d still waiting for %s on %s after %ld.%03d ms",  
                                                          MyProcPid, modename, buf.data, msecs, usecs)));  
                        else if (myWaitStatus == STATUS_OK)  
                                ereport(LOG,  
                                        (errmsg("process %d acquired %s on %s after %ld.%03d ms",  
                                                        MyProcPid, modename, buf.data, msecs, usecs)));  
                        else  
                        {  
                                Assert(myWaitStatus == STATUS_ERROR);  
  
                                /*  
                                 * Currently, the deadlock checker always kicks its own  
                                 * process, which means that we'll only see STATUS_ERROR when  
                                 * deadlock_state == DS_HARD_DEADLOCK, and there's no need to  
                                 * print redundant messages.  But for completeness and  
                                 * future-proofing, print a message if it looks like someone  
                                 * else kicked us off the lock.  
                                 */  
                                if (deadlock_state != DS_HARD_DEADLOCK)  
                                        ereport(LOG,  
                                                        (errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",  
                                                          MyProcPid, modename, buf.data, msecs, usecs)));  
                        }  
  
                        /*  
                         * At this point we might still need to wait for the lock. Reset  
                         * state so we don't print the above messages again.  
                         */  
                        deadlock_state = DS_NO_DEADLOCK;  
  
                        pfree(buf.data);  
                }  
        } while (myWaitStatus == STATUS_WAITING);  
  
        /*  
         * Disable the timers, if they are still running.  As in LockErrorCleanup,  
         * we must preserve the LOCK_TIMEOUT indicator flag: if a lock timeout has  
         * already caused QueryCancelPending to become set, we want the cancel to  
         * be reported as a lock timeout, not a user cancel.  
         */  
        if (LockTimeout > 0)  
        {  
                DisableTimeoutParams timeouts[2];  
  
                timeouts[0].id = DEADLOCK_TIMEOUT;  
                timeouts[0].keep_indicator = false;  
                timeouts[1].id = LOCK_TIMEOUT;  
                timeouts[1].keep_indicator = true;  
                disable_timeouts(timeouts, 2);  
        }  
        else  
                disable_timeout(DEADLOCK_TIMEOUT, false);  
  
        /*  
         * Re-acquire the lock table's partition lock.  We have to do this to hold  
         * off cancel/die interrupts before we can mess with lockAwaited (else we  
         * might have a missed or duplicated locallock update).  
         */  
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);  
  
        /*  
         * We no longer want LockErrorCleanup to do anything.  
         */  
        lockAwaited = NULL;  
  
        /*  
         * If we got the lock, be sure to remember it in the locallock table.  
         */  
        if (MyProc->waitStatus == STATUS_OK)  
                GrantAwaitedLock();  
  
        /*  
         * We don't have to do anything else, because the awaker did all the  
         * necessary update of the lock table and MyProc.  
         */  
        return MyProc->waitStatus;  
}  

Count