From a6b31ea68a894cd2d8efc8740297350fceded586 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Fri, 20 Feb 2026 12:13:48 +0300 Subject: [PATCH 1/3] Fix races between pg_query_state and pg_progress_bar. pg_progress_bar should be locked with PG_QS_RCV_KEY, because it is using the same shmem as pg_query_state. --- pg_query_state.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pg_query_state.c b/pg_query_state.c index 739a44e..cbade05 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -132,6 +132,7 @@ pg_qs_shmem_startup(void) void *shmem; int num_toc = 0; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); shmem = ShmemInitStruct("pg_query_state", shmem_size, &found); if (!found) { @@ -162,6 +163,7 @@ pg_qs_shmem_startup(void) mq = shm_toc_lookup(toc, num_toc++, false); #endif } + LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -1434,6 +1436,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) List *msgs; double progress; double old_progress; + LOCKTAG tag; if (PG_NARGS() == 2) { @@ -1467,6 +1470,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("backend with pid=%d not found", pid))); + LockShmem(&tag, PG_QS_RCV_KEY); counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1493,16 +1497,19 @@ pg_progress_bar(PG_FUNCTION_ARGS) { case QUERY_NOT_RUNNING: elog(INFO, "query not runing"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); break; case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); default: break; } if (msg->result_code == QS_RETURNED && delay == 0) { + UnlockShmem(&tag); progress = GetCurrentNumericState(msg); if (progress < 0) { @@ -1545,7 +1552,9 @@ pg_progress_bar(PG_FUNCTION_ARGS) } if (progress > -1) elog(INFO, "\rProgress = 1.000000"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) 1); } + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); } From ad0e67494834bf797afaa115379a46017c74961d Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Thu, 26 Feb 2026 14:43:40 +0300 Subject: [PATCH 2/3] Fix inter-process communication issues. 1. Fix reqid logic, now it controls backend interruptions. It is used to control that currently processed request has the same reqid as the one, polled from shmem queue. 2. Now all inter-process communication in based on shmem queue reads/sends with appropriate timeouts. --- pg_query_state.c | 269 ++++++++++++++++++++++++----------------------- pg_query_state.h | 36 +++++-- signal_handler.c | 82 ++++++--------- 3 files changed, 195 insertions(+), 192 deletions(-) diff --git a/pg_query_state.c b/pg_query_state.c index cbade05..0b8b59f 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -70,15 +70,6 @@ static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; static bool module_initialized = false; static int reqid = 0; -typedef struct -{ - slock_t mutex; /* protect concurrent access to `userid` */ - Oid userid; - Latch *caller; - pg_atomic_uint32 n_peers; -} RemoteUserIdResult; - -static void SendCurrentUserId(void); static void SendBgWorkerPids(void); static Oid GetRemoteBackendUserId(PGPROC *proc); static List *GetRemoteBackendWorkers(PGPROC *proc); @@ -90,12 +81,16 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, bool buffers, bool triggers, ExplainFormat format); +static shm_mq_result shm_mq_receive_with_timeout(shm_mq_handle *mqh, + Size *nbytesp, + void **datap, + int64 timeout); /* Shared memory variables */ static shm_toc *toc = NULL; -static RemoteUserIdResult *counterpart_userid = NULL; pg_qs_params *params = NULL; shm_mq *mq = NULL; +uint32 *mq_req_id = NULL; /* * Estimate amount of shared memory needed. @@ -111,9 +106,9 @@ pg_qs_shmem_size() nkeys = 3; - shm_toc_estimate_chunk(&e, sizeof(RemoteUserIdResult)); shm_toc_estimate_chunk(&e, sizeof(pg_qs_params)); shm_toc_estimate_chunk(&e, (Size) QUEUE_SIZE); + shm_toc_estimate_chunk(&e, sizeof(uint32)); shm_toc_estimate_keys(&e, nkeys); size = shm_toc_estimate(&e); @@ -138,29 +133,29 @@ pg_qs_shmem_startup(void) { toc = shm_toc_create(PG_QS_MODULE_KEY, shmem, shmem_size); - counterpart_userid = shm_toc_allocate(toc, sizeof(RemoteUserIdResult)); - shm_toc_insert(toc, num_toc++, counterpart_userid); - SpinLockInit(&counterpart_userid->mutex); - pg_atomic_init_u32(&counterpart_userid->n_peers, 0); - params = shm_toc_allocate(toc, sizeof(pg_qs_params)); shm_toc_insert(toc, num_toc++, params); + pg_atomic_init_u32(¶ms->cur_reqid, 0); mq = shm_toc_allocate(toc, QUEUE_SIZE); shm_toc_insert(toc, num_toc++, mq); + + mq_req_id = shm_toc_allocate(toc, sizeof(uint32)); + shm_toc_insert(toc, num_toc++, mq_req_id); + *mq_req_id = 0; } else { toc = shm_toc_attach(PG_QS_MODULE_KEY, shmem); #if PG_VERSION_NUM < 100000 - counterpart_userid = shm_toc_lookup(toc, num_toc++); params = shm_toc_lookup(toc, num_toc++); mq = shm_toc_lookup(toc, num_toc++); + mq_req_id = shm_toc_lookup(toc, num_toc++); #else - counterpart_userid = shm_toc_lookup(toc, num_toc++, false); params = shm_toc_lookup(toc, num_toc++, false); mq = shm_toc_lookup(toc, num_toc++, false); + mq_req_id = shm_toc_lookup(toc, num_toc++, false); #endif } LWLockRelease(AddinShmemInitLock); @@ -525,8 +520,6 @@ pg_query_state(PG_FUNCTION_ARGS) shm_mq_msg *msg; List *bg_worker_procs = NIL; List *msgs; - instr_time start_time; - instr_time cur_time; if (!module_initialized) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -565,22 +558,7 @@ pg_query_state(PG_FUNCTION_ARGS) */ LockShmem(&tag, PG_QS_RCV_KEY); - INSTR_TIME_SET_CURRENT(start_time); - - while (pg_atomic_read_u32(&counterpart_userid->n_peers) != 0) - { - pg_usleep(1000000); /* wait one second */ - CHECK_FOR_INTERRUPTS(); - - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - if (INSTR_TIME_GET_MILLISEC(cur_time) > MAX_RCV_TIMEOUT) - { - elog(WARNING, "pg_query_state: last request was interrupted"); - break; - } - } + reqid = pg_atomic_add_fetch_u32(¶ms->cur_reqid, 1); counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) @@ -590,10 +568,6 @@ pg_query_state(PG_FUNCTION_ARGS) errmsg("permission denied"))); } - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - pg_write_barrier(); - bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -746,16 +720,6 @@ pg_query_state(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -static void -SendCurrentUserId(void) -{ - SpinLockAcquire(&counterpart_userid->mutex); - counterpart_userid->userid = GetUserId(); - SpinLockRelease(&counterpart_userid->mutex); - - SetLatch(counterpart_userid->caller); -} - /* * Extract effective user id from backend on which `proc` points. * @@ -767,7 +731,12 @@ SendCurrentUserId(void) static Oid GetRemoteBackendUserId(PGPROC *proc) { - Oid result; + int sig_result; + shm_mq_handle *mqh; + shm_mq_result mq_receive_result; + shm_mq_userid_msg *msg; + Size msg_len; + LOCKTAG tag; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -776,40 +745,51 @@ GetRemoteBackendUserId(PGPROC *proc) #endif Assert(UserIdPollReason != INVALID_PROCSIGNAL); - Assert(counterpart_userid); + Assert(mq); - counterpart_userid->userid = InvalidOid; - counterpart_userid->caller = MyLatch; - pg_write_barrier(); + LockShmem(&tag, PG_QS_SND_KEY); + mq = shm_mq_create(mq, QUEUE_SIZE); + shm_mq_set_sender(mq, proc); + shm_mq_set_receiver(mq, MyProc); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); + UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 - SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); #else - SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); #endif - for (;;) + if (sig_result == -1) { - SpinLockAcquire(&counterpart_userid->mutex); - result = counterpart_userid->userid; - SpinLockRelease(&counterpart_userid->mutex); - - if (result != InvalidOid) - break; + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("can't send signal to get remote backend userid"))); + return InvalidOid; + } + mqh = shm_mq_attach(mq, NULL, NULL); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); + if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != sizeof(shm_mq_userid_msg)) + { #if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET, 0); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION); + shm_mq_detach(mq); #else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, - PG_WAIT_EXTENSION); + shm_mq_detach(mqh); #endif - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("error in message queue data transmitting"))); + return InvalidOid; } - return result; +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + return msg->userid; } /* @@ -918,12 +898,23 @@ SendBgWorkerPids(void) int i; shm_mq_handle *mqh; LOCKTAG tag; - shm_mq_result result; + msg_by_parts_result result; LockShmem(&tag, PG_QS_SND_KEY); - mqh = shm_mq_attach(mq, NULL, NULL); + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) + { + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + UnlockShmem(&tag); + return; + } + foreach(iter, QueryDescStack) { QueryDesc *curQueryDesc = (QueryDesc *) lfirst(iter); @@ -936,7 +927,7 @@ SendBgWorkerPids(void) msg_len = offsetof(BgWorkerPids, pids) + sizeof(pid_t) * list_length(all_workers); msg = palloc(msg_len); - msg->reqid = params->reqid; + msg->reqid = *mq_req_id; msg->number = list_length(all_workers); i = 0; foreach(iter, all_workers) @@ -947,16 +938,18 @@ SendBgWorkerPids(void) msg->pids[i++] = current_pid; } -#if PG_VERSION_NUM < 150000 - result = shm_mq_send(mqh, msg_len, msg, false); -#else - result = shm_mq_send(mqh, msg_len, msg, false, true); -#endif + result = send_msg_by_parts(mqh, msg_len, msg); /* Check for failure. */ - if(result == SHM_MQ_DETACHED) - elog(WARNING, "could not send message queue to shared-memory queue: receiver has been detached"); - + if(result != MSG_BY_PARTS_SUCCEEDED) + { + elog(WARNING, "pg_query_state: peer seems to have detached"); +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } UnlockShmem(&tag); } @@ -974,6 +967,7 @@ GetRemoteBackendWorkers(PGPROC *proc) int i; List *result = NIL; LOCKTAG tag; + bool mqh_attached = false; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -988,6 +982,7 @@ GetRemoteBackendWorkers(PGPROC *proc) mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 @@ -1000,7 +995,11 @@ GetRemoteBackendWorkers(PGPROC *proc) goto signal_error; mqh = shm_mq_attach(mq, NULL, NULL); - mq_receive_result = shm_mq_receive(mqh, &msg_len, (void **) &msg, false); + mqh_attached = true; + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != offsetof(BgWorkerPids, pids) + msg->number*sizeof(pid_t)) goto mq_error; @@ -1012,19 +1011,37 @@ GetRemoteBackendWorkers(PGPROC *proc) continue; result = lcons(current_proc, result); } - + if (mqh_attached) + { #if PG_VERSION_NUM < 100000 - shm_mq_detach(mq); + shm_mq_detach(mq); #else - shm_mq_detach(mqh); + shm_mq_detach(mqh); #endif + } return result; signal_error: + if (mqh_attached) + { +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("invalid send signal"))); mq_error: + if (mqh_attached) + { +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("error in message queue data transmitting"))); @@ -1115,7 +1132,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, ExplainFormat format) { List *result = NIL; - List *alive_procs = NIL; ListCell *iter; int sig_result; shm_mq_handle *mqh; @@ -1141,6 +1157,7 @@ GetRemoteBackendQueryStates(PGPROC *leader, mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, leader); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); UnlockShmem(&tag); /* @@ -1159,39 +1176,14 @@ GetRemoteBackendQueryStates(PGPROC *leader, if (sig_result == -1) goto signal_error; - foreach(iter, pworkers) - { - PGPROC *proc = (PGPROC *) lfirst(iter); - if (!proc || !proc->pid) - continue; - - pg_atomic_add_fetch_u32(&counterpart_userid->n_peers, 1); - -#if PG_VERSION_NUM >= 170000 - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->vxid.procNumber); -#else - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->backendId); -#endif - - if (sig_result == -1) - { - if (errno != ESRCH) - goto signal_error; - continue; - } - - alive_procs = lappend(alive_procs, proc); - } /* extract query state from leader process */ mqh = shm_mq_attach(mq, NULL, NULL); elog(DEBUG1, "Wait response from leader %d", leader->pid); - mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg, - 0, NULL, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS) goto mq_error; if (msg->reqid != reqid) @@ -1204,13 +1196,14 @@ GetRemoteBackendQueryStates(PGPROC *leader, #else shm_mq_detach(mqh); #endif - /* * collect results from all alived parallel workers */ - foreach(iter, alive_procs) + foreach(iter, pworkers) { PGPROC *proc = (PGPROC *) lfirst(iter); + if (!proc || !proc->pid) + continue; /* prepare message queue to transfer data */ elog(DEBUG1, "Wait response from worker %d", proc->pid); @@ -1220,8 +1213,26 @@ GetRemoteBackendQueryStates(PGPROC *leader, shm_mq_set_receiver(mq, MyProc); /* this function notifies the counterpart to come into data transfer */ + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); UnlockShmem(&tag); +#if PG_VERSION_NUM >= 170000 + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->vxid.procNumber); +#else + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->backendId); +#endif + + if (sig_result == -1) + { + if (errno != ESRCH) + goto signal_error; + continue; + } + /* retrieve result data from message queue */ mqh = shm_mq_attach(mq, NULL, NULL); mq_receive_result = shm_mq_receive_with_timeout(mqh, @@ -1263,15 +1274,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, return NIL; } -void -DetachPeer(void) -{ - int n_peers = pg_atomic_fetch_sub_u32(&counterpart_userid->n_peers, 1); - if (n_peers <= 0) - ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("pg_query_state peer is not responding"))); -} - /* * Extract the number of actual rows and planned rows from * the plan for one node in text format. Returns their ratio, @@ -1471,6 +1473,12 @@ pg_progress_bar(PG_FUNCTION_ARGS) errmsg("backend with pid=%d not found", pid))); LockShmem(&tag, PG_QS_RCV_KEY); + + if (SRF_IS_FIRSTCALL()) + { + reqid = pg_atomic_add_fetch_u32(¶ms->cur_reqid, 1); + } + counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1478,11 +1486,6 @@ pg_progress_bar(PG_FUNCTION_ARGS) old_progress = 0; progress = 0; - if (SRF_IS_FIRSTCALL()) - { - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - } bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, diff --git a/pg_query_state.h b/pg_query_state.h index 8272069..b6c0c07 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -53,6 +53,15 @@ typedef enum QS_RETURNED /* Backend succx[esfully returned its query state */ } PG_QS_RequestResult; +/* + * An self-explanarory enum describing the send_msg_by_parts results + */ +typedef enum +{ + MSG_BY_PARTS_SUCCEEDED, + MSG_BY_PARTS_FAILED +} msg_by_parts_result; + /* * Format of transmited data through message queue */ @@ -68,12 +77,21 @@ typedef struct text records */ } shm_mq_msg; +/* + * User id transmit format. + */ +typedef struct +{ + Oid userid; + uint32 reqid; +} shm_mq_userid_msg; + #define BASE_SIZEOF_SHM_MQ_MSG (offsetof(shm_mq_msg, stack_depth)) /* pg_query_state arguments */ typedef struct { - int reqid; + pg_atomic_uint32 cur_reqid; bool verbose; bool costs; bool timing; @@ -83,17 +101,19 @@ typedef struct } pg_qs_params; /* pg_query_state */ -extern bool pg_qs_enable; -extern bool pg_qs_timing; -extern bool pg_qs_buffers; -extern List *QueryDescStack; -extern pg_qs_params *params; -extern shm_mq *mq; +extern bool pg_qs_enable; +extern bool pg_qs_timing; +extern bool pg_qs_buffers; +extern List *QueryDescStack; +extern pg_qs_params * params; +extern shm_mq *mq; +extern uint32 *mq_req_id; /* signal_handler.c */ extern void SendQueryState(void); -extern void DetachPeer(void); +extern void SendCurrentUserId(void); extern void UnlockShmem(LOCKTAG *tag); extern void LockShmem(LOCKTAG *tag, uint32 key); +extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); #endif diff --git a/signal_handler.c b/signal_handler.c index 09b1cf2..443966f 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -31,17 +31,6 @@ typedef struct char *plan; } stack_frame; -/* - * An self-explanarory enum describing the send_msg_by_parts results - */ -typedef enum -{ - MSG_BY_PARTS_SUCCEEDED, - MSG_BY_PARTS_FAILED -} msg_by_parts_result; - -static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); - /* * Get List of stack_frames as a stack of function calls starting from outermost call. * Each entry contains query text and query state in form of EXPLAIN ANALYZE output. @@ -197,7 +186,7 @@ shm_mq_send_nonblocking(shm_mq_handle *mqh, Size nbytes, const void *data, Size * send_msg_by_parts sends data through the queue as a bunch of messages * of smaller size */ -static msg_by_parts_result +msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data) { int bytes_left; @@ -229,55 +218,23 @@ void SendQueryState(void) { shm_mq_handle *mqh; - instr_time start_time; - instr_time cur_time; - int64 delay = MAX_SND_TIMEOUT; - int reqid = params->reqid; LOCKTAG tag; - INSTR_TIME_SET_CURRENT(start_time); - - /* wait until caller sets this process as sender to message queue */ - for (;;) - { - if (shm_mq_get_sender(mq) == MyProc) - break; - -#if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay, PG_WAIT_IPC); -#else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, delay, PG_WAIT_IPC); -#endif - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - delay = MAX_SND_TIMEOUT - (int64) INSTR_TIME_GET_MILLISEC(cur_time); - if (delay <= 0) - { - elog(WARNING, "pg_query_state: failed to receive request from leader"); - DetachPeer(); - return; - } - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); - } - LockShmem(&tag, PG_QS_SND_KEY); elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); mqh = shm_mq_attach(mq, NULL, NULL); - if (reqid != params->reqid || shm_mq_get_sender(mq) != MyProc) + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) { + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); UnlockShmem(&tag); return; } /* check if module is enabled */ if (!pg_qs_enable) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -286,7 +243,7 @@ SendQueryState(void) /* check if backend doesn't execute any query */ else if (list_length(QueryDescStack) == 0) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -299,7 +256,7 @@ SendQueryState(void) int msglen = sizeof(shm_mq_msg) + serialized_stack_length(qs_stack); shm_mq_msg *msg = palloc(msglen); - msg->reqid = reqid; + msg->reqid = *mq_req_id; msg->length = msglen; msg->proc = MyProc; msg->result_code = QS_RETURNED; @@ -320,7 +277,6 @@ SendQueryState(void) } } elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); - DetachPeer(); UnlockShmem(&tag); return; @@ -331,6 +287,30 @@ SendQueryState(void) #else shm_mq_detach(mqh); #endif - DetachPeer(); + UnlockShmem(&tag); +} + +void +SendCurrentUserId(void) +{ + shm_mq_handle *mqh; + shm_mq_userid_msg msg; + LOCKTAG tag; + + msg.userid = GetUserId(); + LockShmem(&tag, PG_QS_SND_KEY); + + mqh = shm_mq_attach(mq, NULL, NULL); + msg.reqid = *mq_req_id; + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + else if (send_msg_by_parts(mqh, sizeof(msg), &msg) != MSG_BY_PARTS_SUCCEEDED) + elog(WARNING, "could not send message queue to shared-memory queue."); + +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif UnlockShmem(&tag); } From 767818ba4f9ea04db7bb4701147b682ef8e6c963 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Thu, 26 Feb 2026 16:00:17 +0300 Subject: [PATCH 3/3] LWLocks are used instead locks. Change deadlock test logic, error response are normal in such cases. --- pg_query_state.c | 83 ++++++++++++++++++++++----------------------- pg_query_state.h | 4 +-- signal_handler.c | 14 ++++---- tests/test_cases.py | 17 +++++----- 4 files changed, 58 insertions(+), 60 deletions(-) diff --git a/pg_query_state.c b/pg_query_state.c index 0b8b59f..df01903 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -91,6 +91,7 @@ static shm_toc *toc = NULL; pg_qs_params *params = NULL; shm_mq *mq = NULL; uint32 *mq_req_id = NULL; +static LWLock *shmem_locks; /* * Estimate amount of shared memory needed. @@ -158,6 +159,7 @@ pg_qs_shmem_startup(void) mq_req_id = shm_toc_lookup(toc, num_toc++, false); #endif } + shmem_locks = &(GetNamedLWLockTranche("pg_query_state"))->lock; LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook) @@ -185,6 +187,7 @@ _PG_init(void) shmem_request_hook = pg_qs_shmem_request; #else RequestAddinShmemSpace(pg_qs_shmem_size()); + RequestNamedLWLockTranche("pg_query_state", 2); #endif /* Register interrupt on custom signal of polling query state */ @@ -252,6 +255,7 @@ pg_qs_shmem_request(void) prev_shmem_request_hook(); RequestAddinShmemSpace(pg_qs_shmem_size()); + RequestNamedLWLockTranche("pg_query_state", 2); } #endif @@ -407,24 +411,25 @@ search_be_status(int pid) void -UnlockShmem(LOCKTAG *tag) +UnlockShmem(uint32 key) { - LockRelease(tag, ExclusiveLock, false); + LWLock *lock; + + Assert(key <= PG_QS_SND_KEY); + lock = (LWLock *) ((LWLockPadded *) shmem_locks + key); + Assert(LWLockHeldByMe(lock)); + LWLockRelease(lock); } void -LockShmem(LOCKTAG *tag, uint32 key) +LockShmem(uint32 key) { - LockAcquireResult result; - tag->locktag_field1 = PG_QS_MODULE_KEY; - tag->locktag_field2 = key; - tag->locktag_field3 = 0; - tag->locktag_field4 = 0; - tag->locktag_type = LOCKTAG_USERLOCK; - tag->locktag_lockmethodid = USER_LOCKMETHOD; - result = LockAcquire(tag, ExclusiveLock, false, false); - Assert(result == LOCKACQUIRE_OK); - elog(DEBUG1, "LockAcquireResult is not OK %d", result); + LWLock *lock; + + Assert(key <= PG_QS_SND_KEY); + lock = (LWLock *) ((LWLockPadded *) shmem_locks + key); + Assert(!LWLockHeldByMe(lock)); + LWLockAcquire(lock, LW_EXCLUSIVE); } @@ -507,7 +512,6 @@ pg_query_state(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { - LOCKTAG tag; bool verbose = PG_GETARG_BOOL(1), costs = PG_GETARG_BOOL(2), timing = PG_GETARG_BOOL(3), @@ -556,14 +560,14 @@ pg_query_state(PG_FUNCTION_ARGS) * init and acquire lock so that any other concurrent calls of this fuction * can not occupy shared queue for transfering query state */ - LockShmem(&tag, PG_QS_RCV_KEY); + LockShmem(PG_QS_RCV_KEY); reqid = pg_atomic_add_fetch_u32(¶ms->cur_reqid, 1); counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) { - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied"))); } @@ -583,7 +587,7 @@ pg_query_state(PG_FUNCTION_ARGS) if (list_length(msgs) == 0) { elog(WARNING, "backend does not reply"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); } @@ -600,12 +604,12 @@ pg_query_state(PG_FUNCTION_ARGS) else elog(INFO, "backend is not running query"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); } case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); case QS_RETURNED: { @@ -667,7 +671,7 @@ pg_query_state(PG_FUNCTION_ARGS) TupleDescInitEntry(tupdesc, (AttrNumber) 5, "leader_pid", INT4OID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); MemoryContextSwitchTo(oldcontext); } break; @@ -736,7 +740,6 @@ GetRemoteBackendUserId(PGPROC *proc) shm_mq_result mq_receive_result; shm_mq_userid_msg *msg; Size msg_len; - LOCKTAG tag; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -747,12 +750,12 @@ GetRemoteBackendUserId(PGPROC *proc) Assert(UserIdPollReason != INVALID_PROCSIGNAL); Assert(mq); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); #if PG_VERSION_NUM >= 170000 sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); @@ -897,10 +900,9 @@ SendBgWorkerPids(void) int msg_len; int i; shm_mq_handle *mqh; - LOCKTAG tag; msg_by_parts_result result; - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mqh = shm_mq_attach(mq, NULL, NULL); if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) @@ -911,7 +913,7 @@ SendBgWorkerPids(void) #else shm_mq_detach(mqh); #endif - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); return; } @@ -950,7 +952,7 @@ SendBgWorkerPids(void) shm_mq_detach(mqh); #endif } - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); } /* @@ -966,7 +968,6 @@ GetRemoteBackendWorkers(PGPROC *proc) Size msg_len; int i; List *result = NIL; - LOCKTAG tag; bool mqh_attached = false; #if PG_VERSION_NUM >= 170000 @@ -978,12 +979,12 @@ GetRemoteBackendWorkers(PGPROC *proc) Assert(WorkerPollReason != INVALID_PROCSIGNAL); Assert(mq); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); #if PG_VERSION_NUM >= 170000 sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->vxid.procNumber); @@ -1138,7 +1139,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, shm_mq_result mq_receive_result; shm_mq_msg *msg; Size len; - LOCKTAG tag; Assert(QueryStatePollReason != INVALID_PROCSIGNAL); Assert(mq); @@ -1153,12 +1153,12 @@ GetRemoteBackendQueryStates(PGPROC *leader, pg_write_barrier(); /* initialize message queue that will transfer query states */ - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, leader); shm_mq_set_receiver(mq, MyProc); *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); /* * send signal `QueryStatePollReason` to all processes and define all alive @@ -1207,14 +1207,14 @@ GetRemoteBackendQueryStates(PGPROC *leader, /* prepare message queue to transfer data */ elog(DEBUG1, "Wait response from worker %d", proc->pid); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); /* this function notifies the counterpart to come into data transfer */ *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); #if PG_VERSION_NUM >= 170000 sig_result = SendProcSignal(proc->pid, @@ -1438,7 +1438,6 @@ pg_progress_bar(PG_FUNCTION_ARGS) List *msgs; double progress; double old_progress; - LOCKTAG tag; if (PG_NARGS() == 2) { @@ -1472,7 +1471,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("backend with pid=%d not found", pid))); - LockShmem(&tag, PG_QS_RCV_KEY); + LockShmem(PG_QS_RCV_KEY); if (SRF_IS_FIRSTCALL()) { @@ -1500,19 +1499,19 @@ pg_progress_bar(PG_FUNCTION_ARGS) { case QUERY_NOT_RUNNING: elog(INFO, "query not runing"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); break; case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); default: break; } if (msg->result_code == QS_RETURNED && delay == 0) { - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); progress = GetCurrentNumericState(msg); if (progress < 0) { @@ -1555,9 +1554,9 @@ pg_progress_bar(PG_FUNCTION_ARGS) } if (progress > -1) elog(INFO, "\rProgress = 1.000000"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) 1); } - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); } diff --git a/pg_query_state.h b/pg_query_state.h index b6c0c07..0daf8b4 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -112,8 +112,8 @@ extern uint32 *mq_req_id; /* signal_handler.c */ extern void SendQueryState(void); extern void SendCurrentUserId(void); -extern void UnlockShmem(LOCKTAG *tag); -extern void LockShmem(LOCKTAG *tag, uint32 key); +extern void UnlockShmem(uint32 key); +extern void LockShmem(uint32 key); extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); #endif diff --git a/signal_handler.c b/signal_handler.c index 443966f..5878b11 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -218,9 +218,8 @@ void SendQueryState(void) { shm_mq_handle *mqh; - LOCKTAG tag; - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); mqh = shm_mq_attach(mq, NULL, NULL); @@ -228,7 +227,7 @@ SendQueryState(void) if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) { elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); return; } /* check if module is enabled */ @@ -277,7 +276,7 @@ SendQueryState(void) } } elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); return; @@ -287,7 +286,7 @@ SendQueryState(void) #else shm_mq_detach(mqh); #endif - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); } void @@ -295,10 +294,9 @@ SendCurrentUserId(void) { shm_mq_handle *mqh; shm_mq_userid_msg msg; - LOCKTAG tag; msg.userid = GetUserId(); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mqh = shm_mq_attach(mq, NULL, NULL); msg.reqid = *mq_req_id; @@ -312,5 +310,5 @@ SendCurrentUserId(void) #else shm_mq_detach(mqh); #endif - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); } diff --git a/tests/test_cases.py b/tests/test_cases.py index 46d0cef..2ac5605 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -20,8 +20,9 @@ def test_deadlock(config): acon1, acon2 = common.n_async_connect(config, 2) acurs1 = acon1.cursor() acurs2 = acon2.cursor() + checkFurther = True - while True: + while checkFurther: acurs1.callproc('pg_query_state', (acon2.get_backend_pid(),)) acurs2.callproc('pg_query_state', (acon1.get_backend_pid(),)) @@ -29,13 +30,13 @@ def test_deadlock(config): r, w, x = select.select([acon1.fileno(), acon2.fileno()], [], [], 10) assert (r or w or x), "Deadlock is happened under cross reading of query states" - common.wait(acon1) - common.wait(acon2) - - # exit from loop if one backend could read state of execution 'pg_query_state' - # from other backend - if acurs1.fetchone() or acurs2.fetchone(): - break + for acon in [acon1, acon2]: + try: + common.wait(acon) + except psycopg2.errors.InternalError as e: + assert(checkFurther), "Failure should occur once" + assert(e.pgcode == "XX000") + checkFurther = False common.n_close((acon1, acon2))