diff --git a/pg_query_state.c b/pg_query_state.c index 739a44e..df01903 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -70,15 +70,6 @@ static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; static bool module_initialized = false; static int reqid = 0; -typedef struct -{ - slock_t mutex; /* protect concurrent access to `userid` */ - Oid userid; - Latch *caller; - pg_atomic_uint32 n_peers; -} RemoteUserIdResult; - -static void SendCurrentUserId(void); static void SendBgWorkerPids(void); static Oid GetRemoteBackendUserId(PGPROC *proc); static List *GetRemoteBackendWorkers(PGPROC *proc); @@ -90,12 +81,17 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, bool buffers, bool triggers, ExplainFormat format); +static shm_mq_result shm_mq_receive_with_timeout(shm_mq_handle *mqh, + Size *nbytesp, + void **datap, + int64 timeout); /* Shared memory variables */ static shm_toc *toc = NULL; -static RemoteUserIdResult *counterpart_userid = NULL; pg_qs_params *params = NULL; shm_mq *mq = NULL; +uint32 *mq_req_id = NULL; +static LWLock *shmem_locks; /* * Estimate amount of shared memory needed. @@ -111,9 +107,9 @@ pg_qs_shmem_size() nkeys = 3; - shm_toc_estimate_chunk(&e, sizeof(RemoteUserIdResult)); shm_toc_estimate_chunk(&e, sizeof(pg_qs_params)); shm_toc_estimate_chunk(&e, (Size) QUEUE_SIZE); + shm_toc_estimate_chunk(&e, sizeof(uint32)); shm_toc_estimate_keys(&e, nkeys); size = shm_toc_estimate(&e); @@ -132,36 +128,39 @@ pg_qs_shmem_startup(void) void *shmem; int num_toc = 0; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); shmem = ShmemInitStruct("pg_query_state", shmem_size, &found); if (!found) { toc = shm_toc_create(PG_QS_MODULE_KEY, shmem, shmem_size); - counterpart_userid = shm_toc_allocate(toc, sizeof(RemoteUserIdResult)); - shm_toc_insert(toc, num_toc++, counterpart_userid); - SpinLockInit(&counterpart_userid->mutex); - pg_atomic_init_u32(&counterpart_userid->n_peers, 0); - params = shm_toc_allocate(toc, sizeof(pg_qs_params)); shm_toc_insert(toc, num_toc++, params); + pg_atomic_init_u32(¶ms->cur_reqid, 0); mq = shm_toc_allocate(toc, QUEUE_SIZE); shm_toc_insert(toc, num_toc++, mq); + + mq_req_id = shm_toc_allocate(toc, sizeof(uint32)); + shm_toc_insert(toc, num_toc++, mq_req_id); + *mq_req_id = 0; } else { toc = shm_toc_attach(PG_QS_MODULE_KEY, shmem); #if PG_VERSION_NUM < 100000 - counterpart_userid = shm_toc_lookup(toc, num_toc++); params = shm_toc_lookup(toc, num_toc++); mq = shm_toc_lookup(toc, num_toc++); + mq_req_id = shm_toc_lookup(toc, num_toc++); #else - counterpart_userid = shm_toc_lookup(toc, num_toc++, false); params = shm_toc_lookup(toc, num_toc++, false); mq = shm_toc_lookup(toc, num_toc++, false); + mq_req_id = shm_toc_lookup(toc, num_toc++, false); #endif } + shmem_locks = &(GetNamedLWLockTranche("pg_query_state"))->lock; + LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -188,6 +187,7 @@ _PG_init(void) shmem_request_hook = pg_qs_shmem_request; #else RequestAddinShmemSpace(pg_qs_shmem_size()); + RequestNamedLWLockTranche("pg_query_state", 2); #endif /* Register interrupt on custom signal of polling query state */ @@ -255,6 +255,7 @@ pg_qs_shmem_request(void) prev_shmem_request_hook(); RequestAddinShmemSpace(pg_qs_shmem_size()); + RequestNamedLWLockTranche("pg_query_state", 2); } #endif @@ -410,24 +411,25 @@ search_be_status(int pid) void -UnlockShmem(LOCKTAG *tag) +UnlockShmem(uint32 key) { - LockRelease(tag, ExclusiveLock, false); + LWLock *lock; + + Assert(key <= PG_QS_SND_KEY); + lock = (LWLock *) ((LWLockPadded *) shmem_locks + key); + Assert(LWLockHeldByMe(lock)); + LWLockRelease(lock); } void -LockShmem(LOCKTAG *tag, uint32 key) +LockShmem(uint32 key) { - LockAcquireResult result; - tag->locktag_field1 = PG_QS_MODULE_KEY; - tag->locktag_field2 = key; - tag->locktag_field3 = 0; - tag->locktag_field4 = 0; - tag->locktag_type = LOCKTAG_USERLOCK; - tag->locktag_lockmethodid = USER_LOCKMETHOD; - result = LockAcquire(tag, ExclusiveLock, false, false); - Assert(result == LOCKACQUIRE_OK); - elog(DEBUG1, "LockAcquireResult is not OK %d", result); + LWLock *lock; + + Assert(key <= PG_QS_SND_KEY); + lock = (LWLock *) ((LWLockPadded *) shmem_locks + key); + Assert(!LWLockHeldByMe(lock)); + LWLockAcquire(lock, LW_EXCLUSIVE); } @@ -510,7 +512,6 @@ pg_query_state(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { - LOCKTAG tag; bool verbose = PG_GETARG_BOOL(1), costs = PG_GETARG_BOOL(2), timing = PG_GETARG_BOOL(3), @@ -523,8 +524,6 @@ pg_query_state(PG_FUNCTION_ARGS) shm_mq_msg *msg; List *bg_worker_procs = NIL; List *msgs; - instr_time start_time; - instr_time cur_time; if (!module_initialized) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -561,37 +560,18 @@ pg_query_state(PG_FUNCTION_ARGS) * init and acquire lock so that any other concurrent calls of this fuction * can not occupy shared queue for transfering query state */ - LockShmem(&tag, PG_QS_RCV_KEY); - - INSTR_TIME_SET_CURRENT(start_time); - - while (pg_atomic_read_u32(&counterpart_userid->n_peers) != 0) - { - pg_usleep(1000000); /* wait one second */ - CHECK_FOR_INTERRUPTS(); + LockShmem(PG_QS_RCV_KEY); - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - if (INSTR_TIME_GET_MILLISEC(cur_time) > MAX_RCV_TIMEOUT) - { - elog(WARNING, "pg_query_state: last request was interrupted"); - break; - } - } + reqid = pg_atomic_add_fetch_u32(¶ms->cur_reqid, 1); counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) { - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied"))); } - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - pg_write_barrier(); - bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -607,7 +587,7 @@ pg_query_state(PG_FUNCTION_ARGS) if (list_length(msgs) == 0) { elog(WARNING, "backend does not reply"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); } @@ -624,12 +604,12 @@ pg_query_state(PG_FUNCTION_ARGS) else elog(INFO, "backend is not running query"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); } case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); SRF_RETURN_DONE(funcctx); case QS_RETURNED: { @@ -691,7 +671,7 @@ pg_query_state(PG_FUNCTION_ARGS) TupleDescInitEntry(tupdesc, (AttrNumber) 5, "leader_pid", INT4OID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); - UnlockShmem(&tag); + UnlockShmem(PG_QS_RCV_KEY); MemoryContextSwitchTo(oldcontext); } break; @@ -744,16 +724,6 @@ pg_query_state(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -static void -SendCurrentUserId(void) -{ - SpinLockAcquire(&counterpart_userid->mutex); - counterpart_userid->userid = GetUserId(); - SpinLockRelease(&counterpart_userid->mutex); - - SetLatch(counterpart_userid->caller); -} - /* * Extract effective user id from backend on which `proc` points. * @@ -765,7 +735,11 @@ SendCurrentUserId(void) static Oid GetRemoteBackendUserId(PGPROC *proc) { - Oid result; + int sig_result; + shm_mq_handle *mqh; + shm_mq_result mq_receive_result; + shm_mq_userid_msg *msg; + Size msg_len; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -774,40 +748,51 @@ GetRemoteBackendUserId(PGPROC *proc) #endif Assert(UserIdPollReason != INVALID_PROCSIGNAL); - Assert(counterpart_userid); + Assert(mq); - counterpart_userid->userid = InvalidOid; - counterpart_userid->caller = MyLatch; - pg_write_barrier(); + LockShmem(PG_QS_SND_KEY); + mq = shm_mq_create(mq, QUEUE_SIZE); + shm_mq_set_sender(mq, proc); + shm_mq_set_receiver(mq, MyProc); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); + UnlockShmem(PG_QS_SND_KEY); #if PG_VERSION_NUM >= 170000 - SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); #else - SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); #endif - for (;;) + if (sig_result == -1) { - SpinLockAcquire(&counterpart_userid->mutex); - result = counterpart_userid->userid; - SpinLockRelease(&counterpart_userid->mutex); - - if (result != InvalidOid) - break; + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("can't send signal to get remote backend userid"))); + return InvalidOid; + } + mqh = shm_mq_attach(mq, NULL, NULL); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); + if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != sizeof(shm_mq_userid_msg)) + { #if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET, 0); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION); + shm_mq_detach(mq); #else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, - PG_WAIT_EXTENSION); + shm_mq_detach(mqh); #endif - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("error in message queue data transmitting"))); + return InvalidOid; } - return result; +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + return msg->userid; } /* @@ -915,13 +900,23 @@ SendBgWorkerPids(void) int msg_len; int i; shm_mq_handle *mqh; - LOCKTAG tag; - shm_mq_result result; - - LockShmem(&tag, PG_QS_SND_KEY); + msg_by_parts_result result; + LockShmem(PG_QS_SND_KEY); mqh = shm_mq_attach(mq, NULL, NULL); + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) + { + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + UnlockShmem(PG_QS_SND_KEY); + return; + } + foreach(iter, QueryDescStack) { QueryDesc *curQueryDesc = (QueryDesc *) lfirst(iter); @@ -934,7 +929,7 @@ SendBgWorkerPids(void) msg_len = offsetof(BgWorkerPids, pids) + sizeof(pid_t) * list_length(all_workers); msg = palloc(msg_len); - msg->reqid = params->reqid; + msg->reqid = *mq_req_id; msg->number = list_length(all_workers); i = 0; foreach(iter, all_workers) @@ -945,17 +940,19 @@ SendBgWorkerPids(void) msg->pids[i++] = current_pid; } -#if PG_VERSION_NUM < 150000 - result = shm_mq_send(mqh, msg_len, msg, false); -#else - result = shm_mq_send(mqh, msg_len, msg, false, true); -#endif + result = send_msg_by_parts(mqh, msg_len, msg); /* Check for failure. */ - if(result == SHM_MQ_DETACHED) - elog(WARNING, "could not send message queue to shared-memory queue: receiver has been detached"); - - UnlockShmem(&tag); + if(result != MSG_BY_PARTS_SUCCEEDED) + { + elog(WARNING, "pg_query_state: peer seems to have detached"); +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } + UnlockShmem(PG_QS_SND_KEY); } /* @@ -971,7 +968,7 @@ GetRemoteBackendWorkers(PGPROC *proc) Size msg_len; int i; List *result = NIL; - LOCKTAG tag; + bool mqh_attached = false; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -982,11 +979,12 @@ GetRemoteBackendWorkers(PGPROC *proc) Assert(WorkerPollReason != INVALID_PROCSIGNAL); Assert(mq); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); - UnlockShmem(&tag); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); + UnlockShmem(PG_QS_SND_KEY); #if PG_VERSION_NUM >= 170000 sig_result = SendProcSignal(proc->pid, WorkerPollReason, proc->vxid.procNumber); @@ -998,7 +996,11 @@ GetRemoteBackendWorkers(PGPROC *proc) goto signal_error; mqh = shm_mq_attach(mq, NULL, NULL); - mq_receive_result = shm_mq_receive(mqh, &msg_len, (void **) &msg, false); + mqh_attached = true; + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != offsetof(BgWorkerPids, pids) + msg->number*sizeof(pid_t)) goto mq_error; @@ -1010,19 +1012,37 @@ GetRemoteBackendWorkers(PGPROC *proc) continue; result = lcons(current_proc, result); } - + if (mqh_attached) + { #if PG_VERSION_NUM < 100000 - shm_mq_detach(mq); + shm_mq_detach(mq); #else - shm_mq_detach(mqh); + shm_mq_detach(mqh); #endif + } return result; signal_error: + if (mqh_attached) + { +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("invalid send signal"))); mq_error: + if (mqh_attached) + { +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + } ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("error in message queue data transmitting"))); @@ -1113,14 +1133,12 @@ GetRemoteBackendQueryStates(PGPROC *leader, ExplainFormat format) { List *result = NIL; - List *alive_procs = NIL; ListCell *iter; int sig_result; shm_mq_handle *mqh; shm_mq_result mq_receive_result; shm_mq_msg *msg; Size len; - LOCKTAG tag; Assert(QueryStatePollReason != INVALID_PROCSIGNAL); Assert(mq); @@ -1135,11 +1153,12 @@ GetRemoteBackendQueryStates(PGPROC *leader, pg_write_barrier(); /* initialize message queue that will transfer query states */ - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, leader); shm_mq_set_receiver(mq, MyProc); - UnlockShmem(&tag); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); + UnlockShmem(PG_QS_SND_KEY); /* * send signal `QueryStatePollReason` to all processes and define all alive @@ -1157,39 +1176,14 @@ GetRemoteBackendQueryStates(PGPROC *leader, if (sig_result == -1) goto signal_error; - foreach(iter, pworkers) - { - PGPROC *proc = (PGPROC *) lfirst(iter); - if (!proc || !proc->pid) - continue; - - pg_atomic_add_fetch_u32(&counterpart_userid->n_peers, 1); - -#if PG_VERSION_NUM >= 170000 - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->vxid.procNumber); -#else - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->backendId); -#endif - - if (sig_result == -1) - { - if (errno != ESRCH) - goto signal_error; - continue; - } - - alive_procs = lappend(alive_procs, proc); - } /* extract query state from leader process */ mqh = shm_mq_attach(mq, NULL, NULL); elog(DEBUG1, "Wait response from leader %d", leader->pid); - mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg, - 0, NULL, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS) goto mq_error; if (msg->reqid != reqid) @@ -1202,23 +1196,42 @@ GetRemoteBackendQueryStates(PGPROC *leader, #else shm_mq_detach(mqh); #endif - /* * collect results from all alived parallel workers */ - foreach(iter, alive_procs) + foreach(iter, pworkers) { PGPROC *proc = (PGPROC *) lfirst(iter); + if (!proc || !proc->pid) + continue; /* prepare message queue to transfer data */ elog(DEBUG1, "Wait response from worker %d", proc->pid); - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); /* this function notifies the counterpart to come into data transfer */ - UnlockShmem(&tag); + *mq_req_id = pg_atomic_read_u32(¶ms->cur_reqid); + UnlockShmem(PG_QS_SND_KEY); + +#if PG_VERSION_NUM >= 170000 + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->vxid.procNumber); +#else + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->backendId); +#endif + + if (sig_result == -1) + { + if (errno != ESRCH) + goto signal_error; + continue; + } /* retrieve result data from message queue */ mqh = shm_mq_attach(mq, NULL, NULL); @@ -1261,15 +1274,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, return NIL; } -void -DetachPeer(void) -{ - int n_peers = pg_atomic_fetch_sub_u32(&counterpart_userid->n_peers, 1); - if (n_peers <= 0) - ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("pg_query_state peer is not responding"))); -} - /* * Extract the number of actual rows and planned rows from * the plan for one node in text format. Returns their ratio, @@ -1467,6 +1471,13 @@ pg_progress_bar(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("backend with pid=%d not found", pid))); + LockShmem(PG_QS_RCV_KEY); + + if (SRF_IS_FIRSTCALL()) + { + reqid = pg_atomic_add_fetch_u32(¶ms->cur_reqid, 1); + } + counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1474,11 +1485,6 @@ pg_progress_bar(PG_FUNCTION_ARGS) old_progress = 0; progress = 0; - if (SRF_IS_FIRSTCALL()) - { - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - } bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -1493,16 +1499,19 @@ pg_progress_bar(PG_FUNCTION_ARGS) { case QUERY_NOT_RUNNING: elog(INFO, "query not runing"); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); break; case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); default: break; } if (msg->result_code == QS_RETURNED && delay == 0) { + UnlockShmem(PG_QS_RCV_KEY); progress = GetCurrentNumericState(msg); if (progress < 0) { @@ -1545,7 +1554,9 @@ pg_progress_bar(PG_FUNCTION_ARGS) } if (progress > -1) elog(INFO, "\rProgress = 1.000000"); + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) 1); } + UnlockShmem(PG_QS_RCV_KEY); PG_RETURN_FLOAT8((float8) -1); } diff --git a/pg_query_state.h b/pg_query_state.h index 8272069..0daf8b4 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -53,6 +53,15 @@ typedef enum QS_RETURNED /* Backend succx[esfully returned its query state */ } PG_QS_RequestResult; +/* + * An self-explanarory enum describing the send_msg_by_parts results + */ +typedef enum +{ + MSG_BY_PARTS_SUCCEEDED, + MSG_BY_PARTS_FAILED +} msg_by_parts_result; + /* * Format of transmited data through message queue */ @@ -68,12 +77,21 @@ typedef struct text records */ } shm_mq_msg; +/* + * User id transmit format. + */ +typedef struct +{ + Oid userid; + uint32 reqid; +} shm_mq_userid_msg; + #define BASE_SIZEOF_SHM_MQ_MSG (offsetof(shm_mq_msg, stack_depth)) /* pg_query_state arguments */ typedef struct { - int reqid; + pg_atomic_uint32 cur_reqid; bool verbose; bool costs; bool timing; @@ -83,17 +101,19 @@ typedef struct } pg_qs_params; /* pg_query_state */ -extern bool pg_qs_enable; -extern bool pg_qs_timing; -extern bool pg_qs_buffers; -extern List *QueryDescStack; -extern pg_qs_params *params; -extern shm_mq *mq; +extern bool pg_qs_enable; +extern bool pg_qs_timing; +extern bool pg_qs_buffers; +extern List *QueryDescStack; +extern pg_qs_params * params; +extern shm_mq *mq; +extern uint32 *mq_req_id; /* signal_handler.c */ extern void SendQueryState(void); -extern void DetachPeer(void); -extern void UnlockShmem(LOCKTAG *tag); -extern void LockShmem(LOCKTAG *tag, uint32 key); +extern void SendCurrentUserId(void); +extern void UnlockShmem(uint32 key); +extern void LockShmem(uint32 key); +extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); #endif diff --git a/signal_handler.c b/signal_handler.c index 09b1cf2..5878b11 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -31,17 +31,6 @@ typedef struct char *plan; } stack_frame; -/* - * An self-explanarory enum describing the send_msg_by_parts results - */ -typedef enum -{ - MSG_BY_PARTS_SUCCEEDED, - MSG_BY_PARTS_FAILED -} msg_by_parts_result; - -static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); - /* * Get List of stack_frames as a stack of function calls starting from outermost call. * Each entry contains query text and query state in form of EXPLAIN ANALYZE output. @@ -197,7 +186,7 @@ shm_mq_send_nonblocking(shm_mq_handle *mqh, Size nbytes, const void *data, Size * send_msg_by_parts sends data through the queue as a bunch of messages * of smaller size */ -static msg_by_parts_result +msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data) { int bytes_left; @@ -229,55 +218,22 @@ void SendQueryState(void) { shm_mq_handle *mqh; - instr_time start_time; - instr_time cur_time; - int64 delay = MAX_SND_TIMEOUT; - int reqid = params->reqid; - LOCKTAG tag; - - INSTR_TIME_SET_CURRENT(start_time); - - /* wait until caller sets this process as sender to message queue */ - for (;;) - { - if (shm_mq_get_sender(mq) == MyProc) - break; -#if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay, PG_WAIT_IPC); -#else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, delay, PG_WAIT_IPC); -#endif - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - delay = MAX_SND_TIMEOUT - (int64) INSTR_TIME_GET_MILLISEC(cur_time); - if (delay <= 0) - { - elog(WARNING, "pg_query_state: failed to receive request from leader"); - DetachPeer(); - return; - } - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); - } - - LockShmem(&tag, PG_QS_SND_KEY); + LockShmem(PG_QS_SND_KEY); elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); mqh = shm_mq_attach(mq, NULL, NULL); - if (reqid != params->reqid || shm_mq_get_sender(mq) != MyProc) + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) { - UnlockShmem(&tag); + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + UnlockShmem(PG_QS_SND_KEY); return; } /* check if module is enabled */ if (!pg_qs_enable) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -286,7 +242,7 @@ SendQueryState(void) /* check if backend doesn't execute any query */ else if (list_length(QueryDescStack) == 0) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -299,7 +255,7 @@ SendQueryState(void) int msglen = sizeof(shm_mq_msg) + serialized_stack_length(qs_stack); shm_mq_msg *msg = palloc(msglen); - msg->reqid = reqid; + msg->reqid = *mq_req_id; msg->length = msglen; msg->proc = MyProc; msg->result_code = QS_RETURNED; @@ -320,8 +276,7 @@ SendQueryState(void) } } elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); - DetachPeer(); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); return; @@ -331,6 +286,29 @@ SendQueryState(void) #else shm_mq_detach(mqh); #endif - DetachPeer(); - UnlockShmem(&tag); + UnlockShmem(PG_QS_SND_KEY); +} + +void +SendCurrentUserId(void) +{ + shm_mq_handle *mqh; + shm_mq_userid_msg msg; + + msg.userid = GetUserId(); + LockShmem(PG_QS_SND_KEY); + + mqh = shm_mq_attach(mq, NULL, NULL); + msg.reqid = *mq_req_id; + if (*mq_req_id != pg_atomic_read_u32(¶ms->cur_reqid) || shm_mq_get_sender(mq) != MyProc) + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + else if (send_msg_by_parts(mqh, sizeof(msg), &msg) != MSG_BY_PARTS_SUCCEEDED) + elog(WARNING, "could not send message queue to shared-memory queue."); + +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + UnlockShmem(PG_QS_SND_KEY); } diff --git a/tests/test_cases.py b/tests/test_cases.py index 46d0cef..2ac5605 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -20,8 +20,9 @@ def test_deadlock(config): acon1, acon2 = common.n_async_connect(config, 2) acurs1 = acon1.cursor() acurs2 = acon2.cursor() + checkFurther = True - while True: + while checkFurther: acurs1.callproc('pg_query_state', (acon2.get_backend_pid(),)) acurs2.callproc('pg_query_state', (acon1.get_backend_pid(),)) @@ -29,13 +30,13 @@ def test_deadlock(config): r, w, x = select.select([acon1.fileno(), acon2.fileno()], [], [], 10) assert (r or w or x), "Deadlock is happened under cross reading of query states" - common.wait(acon1) - common.wait(acon2) - - # exit from loop if one backend could read state of execution 'pg_query_state' - # from other backend - if acurs1.fetchone() or acurs2.fetchone(): - break + for acon in [acon1, acon2]: + try: + common.wait(acon) + except psycopg2.errors.InternalError as e: + assert(checkFurther), "Failure should occur once" + assert(e.pgcode == "XX000") + checkFurther = False common.n_close((acon1, acon2))