PolarDB-for-PostgreSQL
295 строк · 8.7 Кб
1/*
2* contrib/pgrowlocks/pgrowlocks.c
3*
4* Copyright (c) 2005-2006 Tatsuo Ishii
5*
6* Permission to use, copy, modify, and distribute this software and
7* its documentation for any purpose, without fee, and without a
8* written agreement is hereby granted, provided that the above
9* copyright notice and this paragraph and the following two
10* paragraphs appear in all copies.
11*
12* IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT,
13* INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
14* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
15* DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED
16* OF THE POSSIBILITY OF SUCH DAMAGE.
17*
18* THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
19* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20* A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS
21* IS" BASIS, AND THE AUTHOR HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE,
22* SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
23*/
24
25#include "postgres.h"
26
27#include "access/multixact.h"
28#include "access/relscan.h"
29#include "access/xact.h"
30#include "catalog/namespace.h"
31#include "catalog/pg_authid.h"
32#include "funcapi.h"
33#include "miscadmin.h"
34#include "pgstat.h"
35#include "storage/bufmgr.h"
36#include "storage/procarray.h"
37#include "utils/acl.h"
38#include "utils/builtins.h"
39#include "utils/rel.h"
40#include "utils/snapmgr.h"
41#include "utils/tqual.h"
42#include "utils/varlena.h"
43
44PG_MODULE_MAGIC;
45
46PG_FUNCTION_INFO_V1(pgrowlocks);
47
48/* ----------
49* pgrowlocks:
50* returns tids of rows being locked
51* ----------
52*/
53
54#define NCHARS 32
55
56#define Atnum_tid 0
57#define Atnum_xmax 1
58#define Atnum_ismulti 2
59#define Atnum_xids 3
60#define Atnum_modes 4
61#define Atnum_pids 5
62
63Datum
64pgrowlocks(PG_FUNCTION_ARGS)
65{
66text *relname = PG_GETARG_TEXT_PP(0);
67ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
68bool randomAccess;
69TupleDesc tupdesc;
70Tuplestorestate *tupstore;
71AttInMetadata *attinmeta;
72Relation rel;
73RangeVar *relrv;
74HeapScanDesc scan;
75HeapTuple tuple;
76MemoryContext oldcontext;
77AclResult aclresult;
78char **values;
79
80/* check to see if caller supports us returning a tuplestore */
81if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
82ereport(ERROR,
83(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
84errmsg("set-valued function called in context that cannot accept a set")));
85if (!(rsinfo->allowedModes & SFRM_Materialize))
86ereport(ERROR,
87(errcode(ERRCODE_SYNTAX_ERROR),
88errmsg("materialize mode required, but it is not allowed in this context")));
89
90/* The tupdesc and tuplestore must be created in ecxt_per_query_memory */
91oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
92
93if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
94elog(ERROR, "return type must be a row type");
95
96randomAccess = (rsinfo->allowedModes & SFRM_Materialize_Random) != 0;
97tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
98rsinfo->returnMode = SFRM_Materialize;
99rsinfo->setResult = tupstore;
100rsinfo->setDesc = tupdesc;
101
102MemoryContextSwitchTo(oldcontext);
103
104/* Access the table */
105relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
106rel = relation_openrv(relrv, AccessShareLock);
107
108if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
109ereport(ERROR,
110(errcode(ERRCODE_WRONG_OBJECT_TYPE),
111errmsg("\"%s\" is a partitioned table",
112RelationGetRelationName(rel)),
113errdetail("Partitioned tables do not contain rows.")));
114else if (rel->rd_rel->relkind != RELKIND_RELATION)
115ereport(ERROR,
116(errcode(ERRCODE_WRONG_OBJECT_TYPE),
117errmsg("\"%s\" is not a table",
118RelationGetRelationName(rel))));
119
120/*
121* check permissions: must have SELECT on table or be in
122* pg_stat_scan_tables
123*/
124aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
125ACL_SELECT);
126if (aclresult != ACLCHECK_OK)
127aclresult = is_member_of_role(GetUserId(), DEFAULT_ROLE_STAT_SCAN_TABLES) ? ACLCHECK_OK : ACLCHECK_NO_PRIV;
128
129if (aclresult != ACLCHECK_OK)
130aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
131RelationGetRelationName(rel));
132
133/* Scan the relation */
134scan = heap_beginscan(rel, GetActiveSnapshot(), 0, NULL);
135
136attinmeta = TupleDescGetAttInMetadata(tupdesc);
137
138values = (char **) palloc(tupdesc->natts * sizeof(char *));
139
140while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
141{
142HTSU_Result htsu;
143TransactionId xmax;
144uint16 infomask;
145
146/* must hold a buffer lock to call HeapTupleSatisfiesUpdate */
147LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
148
149htsu = HeapTupleSatisfiesUpdate(tuple,
150GetCurrentCommandId(false),
151scan->rs_cbuf);
152xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);
153infomask = tuple->t_data->t_infomask;
154
155/*
156* A tuple is locked if HTSU returns BeingUpdated.
157*/
158if (htsu == HeapTupleBeingUpdated)
159{
160values[Atnum_tid] = (char *) DirectFunctionCall1(tidout,
161PointerGetDatum(&tuple->t_self));
162
163values[Atnum_xmax] = palloc(NCHARS * sizeof(char));
164snprintf(values[Atnum_xmax], NCHARS, "%d", xmax);
165if (infomask & HEAP_XMAX_IS_MULTI)
166{
167MultiXactMember *members;
168int nmembers;
169bool first = true;
170bool allow_old;
171
172values[Atnum_ismulti] = pstrdup("true");
173
174allow_old = HEAP_LOCKED_UPGRADED(infomask);
175nmembers = GetMultiXactIdMembers(xmax, &members, allow_old,
176false);
177if (nmembers == -1)
178{
179values[Atnum_xids] = "{0}";
180values[Atnum_modes] = "{transient upgrade status}";
181values[Atnum_pids] = "{0}";
182}
183else
184{
185int j;
186
187values[Atnum_xids] = palloc(NCHARS * nmembers);
188values[Atnum_modes] = palloc(NCHARS * nmembers);
189values[Atnum_pids] = palloc(NCHARS * nmembers);
190
191strcpy(values[Atnum_xids], "{");
192strcpy(values[Atnum_modes], "{");
193strcpy(values[Atnum_pids], "{");
194
195for (j = 0; j < nmembers; j++)
196{
197char buf[NCHARS];
198
199if (!first)
200{
201strcat(values[Atnum_xids], ",");
202strcat(values[Atnum_modes], ",");
203strcat(values[Atnum_pids], ",");
204}
205snprintf(buf, NCHARS, "%d", members[j].xid);
206strcat(values[Atnum_xids], buf);
207switch (members[j].status)
208{
209case MultiXactStatusUpdate:
210snprintf(buf, NCHARS, "Update");
211break;
212case MultiXactStatusNoKeyUpdate:
213snprintf(buf, NCHARS, "No Key Update");
214break;
215case MultiXactStatusForUpdate:
216snprintf(buf, NCHARS, "For Update");
217break;
218case MultiXactStatusForNoKeyUpdate:
219snprintf(buf, NCHARS, "For No Key Update");
220break;
221case MultiXactStatusForShare:
222snprintf(buf, NCHARS, "Share");
223break;
224case MultiXactStatusForKeyShare:
225snprintf(buf, NCHARS, "Key Share");
226break;
227}
228strcat(values[Atnum_modes], buf);
229snprintf(buf, NCHARS, "%d",
230polar_pgstat_get_virtual_pid(BackendXidGetPid(members[j].xid), false));
231strcat(values[Atnum_pids], buf);
232
233first = false;
234}
235
236strcat(values[Atnum_xids], "}");
237strcat(values[Atnum_modes], "}");
238strcat(values[Atnum_pids], "}");
239}
240}
241else
242{
243values[Atnum_ismulti] = pstrdup("false");
244
245values[Atnum_xids] = palloc(NCHARS * sizeof(char));
246snprintf(values[Atnum_xids], NCHARS, "{%d}", xmax);
247
248values[Atnum_modes] = palloc(NCHARS);
249if (infomask & HEAP_XMAX_LOCK_ONLY)
250{
251if (HEAP_XMAX_IS_SHR_LOCKED(infomask))
252snprintf(values[Atnum_modes], NCHARS, "{For Share}");
253else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask))
254snprintf(values[Atnum_modes], NCHARS, "{For Key Share}");
255else if (HEAP_XMAX_IS_EXCL_LOCKED(infomask))
256{
257if (tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED)
258snprintf(values[Atnum_modes], NCHARS, "{For Update}");
259else
260snprintf(values[Atnum_modes], NCHARS, "{For No Key Update}");
261}
262else
263/* neither keyshare nor exclusive bit it set */
264snprintf(values[Atnum_modes], NCHARS,
265"{transient upgrade status}");
266}
267else
268{
269if (tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED)
270snprintf(values[Atnum_modes], NCHARS, "{Update}");
271else
272snprintf(values[Atnum_modes], NCHARS, "{No Key Update}");
273}
274
275values[Atnum_pids] = palloc(NCHARS * sizeof(char));
276snprintf(values[Atnum_pids], NCHARS, "{%d}",
277polar_pgstat_get_virtual_pid(BackendXidGetPid(xmax), false));
278}
279
280LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
281
282/* build a tuple */
283tuple = BuildTupleFromCStrings(attinmeta, values);
284tuplestore_puttuple(tupstore, tuple);
285}
286else
287{
288LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
289}
290}
291
292heap_endscan(scan);
293heap_close(rel, AccessShareLock);
294return (Datum) 0;
295}
296