pg_probackup
779 строк · 20.8 Кб
1/*-------------------------------------------------------------------------
2*
3* src/checkdb.c
4* pg_probackup checkdb subcommand
5*
6* It allows to validate all data files located in PGDATA
7* via block checksums matching and page header sanity checks.
8* Optionally all indexes in all databases in PostgreSQL
9* instance can be logically verified using extensions
10* amcheck or amcheck_next.
11*
12* Portions Copyright (c) 2019-2019, Postgres Professional
13*
14*-------------------------------------------------------------------------
15*/
16
17#include "pg_probackup.h"18
19#include <sys/stat.h>20#include <time.h>21#include <unistd.h>22
23#include "utils/thread.h"24#include "utils/file.h"25
26
27typedef struct28{
29/* list of files to validate */30parray *files_list;31/* if page checksums are enabled in this postgres instance? */32uint32 checksum_version;33/*34* conn and cancel_conn
35* to use in check_data_file
36* to connect to postgres if we've failed to validate page
37* and want to read it via buffer cache to ensure
38*/
39ConnectionArgs conn_arg;40/* number of thread for debugging */41int thread_num;42/* pgdata path */43const char *from_root;44/*45* Return value from the thread:
46* 0 everything is ok
47* 1 thread errored during execution, e.g. interruption (default value)
48* 2 corruption is definitely(!) found
49*/
50int ret;51} check_files_arg;52
53
54typedef struct55{
56/* list of indexes to amcheck */57parray *index_list;58/*59* credentials to connect to postgres instance
60* used for compatibility checks of blocksize,
61* server version and so on
62*/
63ConnectionOptions conn_opt;64/*65* conn and cancel_conn
66* to use in threads to connect to databases
67*/
68ConnectionArgs conn_arg;69/* number of thread for debugging */70int thread_num;71/*72* Return value from the thread:
73* 0 everything is ok
74* 1 thread errored during execution, e.g. interruption (default value)
75* 2 corruption is definitely(!) found
76*/
77int ret;78} check_indexes_arg;79
80typedef struct pg_indexEntry81{
82Oid indexrelid;83char *name;84char *namespace;85bool heapallindexed_is_supported;86bool checkunique_is_supported;87/* schema where amcheck extension is located */88char *amcheck_nspname;89/* lock for synchronization of parallel threads */90volatile pg_atomic_flag lock;91} pg_indexEntry;92
93static void94pg_indexEntry_free(void *index)95{
96pg_indexEntry *index_ptr;97
98if (index == NULL)99return;100
101index_ptr = (pg_indexEntry *) index;102
103if (index_ptr->name)104free(index_ptr->name);105if (index_ptr->name)106free(index_ptr->namespace);107if (index_ptr->amcheck_nspname)108free(index_ptr->amcheck_nspname);109
110free(index_ptr);111}
112
113
114static void *check_files(void *arg);115static void do_block_validation(char *pgdata, uint32 checksum_version);116
117static void *check_indexes(void *arg);118static parray* get_index_list(const char *dbname, bool first_db_with_amcheck,119PGconn *db_conn);120static bool amcheck_one_index(check_indexes_arg *arguments,121pg_indexEntry *ind);122static void do_amcheck(ConnectionOptions conn_opt, PGconn *conn);123
124/*
125* Check files in PGDATA.
126* Read all files listed in files_list.
127* If the file is 'datafile' (regular relation's main fork), read it page by page,
128* verify checksum and copy.
129*/
130static void *131check_files(void *arg)132{
133int i;134check_files_arg *arguments = (check_files_arg *) arg;135int n_files_list = 0;136char from_fullpath[MAXPGPATH];137
138if (arguments->files_list)139n_files_list = parray_num(arguments->files_list);140
141/* check a file */142for (i = 0; i < n_files_list; i++)143{144pgFile *file = (pgFile *) parray_get(arguments->files_list, i);145
146/* check for interrupt */147if (interrupted || thread_interrupted)148elog(ERROR, "Interrupted during checkdb");149
150/* No need to check directories */151if (S_ISDIR(file->mode))152continue;153
154if (!pg_atomic_test_set_flag(&file->lock))155continue;156
157join_path_components(from_fullpath, arguments->from_root, file->rel_path);158
159elog(VERBOSE, "Checking file: \"%s\" ", from_fullpath);160
161if (progress)162elog(INFO, "Progress: (%d/%d). Process file \"%s\"",163i + 1, n_files_list, from_fullpath);164
165if (S_ISREG(file->mode))166{167/* check only uncompressed by cfs datafiles */168if (file->is_datafile && !file->is_cfs)169{170/*171* TODO deep inside check_data_file
172* uses global variables to set connections.
173* Need refactoring.
174*/
175if (!check_data_file(&(arguments->conn_arg),176file, from_fullpath,177arguments->checksum_version))178arguments->ret = 2; /* corruption found */179}180}181else182elog(WARNING, "unexpected file type %d", file->mode);183}184
185/* Ret values:186* 0 everything is ok
187* 1 thread errored during execution, e.g. interruption (default value)
188* 2 corruption is definitely(!) found
189*/
190if (arguments->ret == 1)191arguments->ret = 0;192
193return NULL;194}
195
196/* collect list of files and run threads to check files in the instance */
197static void198do_block_validation(char *pgdata, uint32 checksum_version)199{
200int i;201/* arrays with meta info for multi threaded check */202pthread_t *threads;203check_files_arg *threads_args;204bool check_isok = true;205parray *files_list = NULL;206
207/* initialize file list */208files_list = parray_new();209
210/* list files with the logical path. omit $PGDATA */211dir_list_file(files_list, pgdata, true, true,212false, false, true, 0, FIO_DB_HOST);213
214/*215* Sort pathname ascending.
216*
217* For example:
218* 1 - create 'base'
219* 2 - create 'base/1'
220*/
221parray_qsort(files_list, pgFileCompareRelPathWithExternal);222/* Extract information about files in pgdata parsing their names:*/223parse_filelist_filenames(files_list, pgdata);224
225/* setup threads */226for (i = 0; i < parray_num(files_list); i++)227{228pgFile *file = (pgFile *) parray_get(files_list, i);229pg_atomic_init_flag(&file->lock);230}231
232/* Sort by size for load balancing */233parray_qsort(files_list, pgFileCompareSize);234
235/* init thread args with own file lists */236threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);237threads_args = (check_files_arg *) palloc(sizeof(check_files_arg)*num_threads);238
239for (i = 0; i < num_threads; i++)240{241check_files_arg *arg = &(threads_args[i]);242
243arg->files_list = files_list;244arg->checksum_version = checksum_version;245arg->from_root = pgdata;246
247arg->conn_arg.conn = NULL;248arg->conn_arg.cancel_conn = NULL;249
250arg->thread_num = i + 1;251/* By default there is some error */252arg->ret = 1;253}254
255elog(INFO, "Start checking data files");256
257/* Run threads */258for (i = 0; i < num_threads; i++)259{260check_files_arg *arg = &(threads_args[i]);261
262elog(VERBOSE, "Start thread num: %i", i);263
264pthread_create(&threads[i], NULL, check_files, arg);265}266
267/* Wait threads */268for (i = 0; i < num_threads; i++)269{270pthread_join(threads[i], NULL);271if (threads_args[i].ret > 0)272check_isok = false;273}274
275/* cleanup */276if (files_list)277{278parray_walk(files_list, pgFileFree);279parray_free(files_list);280files_list = NULL;281}282
283if (check_isok)284elog(INFO, "Data files are valid");285else286elog(ERROR, "Checkdb failed");287}
288
289/* Check indexes with amcheck */
290static void *291check_indexes(void *arg)292{
293int i;294check_indexes_arg *arguments = (check_indexes_arg *) arg;295int n_indexes = 0;296my_thread_num = arguments->thread_num;297
298if (arguments->index_list)299n_indexes = parray_num(arguments->index_list);300
301for (i = 0; i < n_indexes; i++)302{303pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i);304
305if (!pg_atomic_test_set_flag(&ind->lock))306continue;307
308/* check for interrupt */309if (interrupted || thread_interrupted)310elog(ERROR, "Thread [%d]: interrupted during checkdb --amcheck",311arguments->thread_num);312
313if (progress)314elog(INFO, "Thread [%d]. Progress: (%d/%d). Amchecking index '%s.%s'",315arguments->thread_num, i + 1, n_indexes,316ind->namespace, ind->name);317
318if (arguments->conn_arg.conn == NULL)319{320
321arguments->conn_arg.conn = pgut_connect(arguments->conn_opt.pghost,322arguments->conn_opt.pgport,323arguments->conn_opt.pgdatabase,324arguments->conn_opt.pguser);325arguments->conn_arg.cancel_conn = PQgetCancel(arguments->conn_arg.conn);326}327
328/* remember that we have a failed check */329if (!amcheck_one_index(arguments, ind))330arguments->ret = 2; /* corruption found */331}332
333/* Close connection. */334if (arguments->conn_arg.conn)335pgut_disconnect(arguments->conn_arg.conn);336
337/* Ret values:338* 0 everything is ok
339* 1 thread errored during execution, e.g. interruption (default value)
340* 2 corruption is definitely(!) found
341*/
342if (arguments->ret == 1)343arguments->ret = 0;344
345return NULL;346}
347
348/* Get index list for given database */
349static parray*350get_index_list(const char *dbname, bool first_db_with_amcheck,351PGconn *db_conn)352{
353PGresult *res;354char *amcheck_nspname = NULL;355char *amcheck_extname = NULL;356char *amcheck_extversion = NULL;357int i;358bool heapallindexed_is_supported = false;359bool checkunique_is_supported = false;360parray *index_list = NULL;361
362/* Check amcheck extension version */363res = pgut_execute(db_conn, "SELECT "364"extname, nspname, extversion "365"FROM pg_catalog.pg_namespace n "366"JOIN pg_catalog.pg_extension e "367"ON n.oid=e.extnamespace "368"WHERE e.extname IN ('amcheck'::name, 'amcheck_next'::name) "369"ORDER BY extversion DESC "370"LIMIT 1",3710, NULL);372
373if (PQresultStatus(res) != PGRES_TUPLES_OK)374{375PQclear(res);376elog(ERROR, "Cannot check if amcheck is installed in database %s: %s",377dbname, PQerrorMessage(db_conn));378}379
380if (PQntuples(res) < 1)381{382elog(WARNING, "Extension 'amcheck' or 'amcheck_next' are "383"not installed in database %s", dbname);384return NULL;385}386
387amcheck_extname = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1);388strcpy(amcheck_extname, PQgetvalue(res, 0, 0));389amcheck_nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1);390strcpy(amcheck_nspname, PQgetvalue(res, 0, 1));391amcheck_extversion = pgut_malloc(strlen(PQgetvalue(res, 0, 2)) + 1);392strcpy(amcheck_extversion, PQgetvalue(res, 0, 2));393PQclear(res);394
395/* heapallindexed_is_supported is database specific */396/* TODO this is wrong check, heapallindexed supported also in 1.1.1, 1.2 and 1.2.1... */397if (strcmp(amcheck_extversion, "1.0") != 0 &&398strcmp(amcheck_extversion, "1") != 0)399heapallindexed_is_supported = true;400
401elog(INFO, "Amchecking database '%s' using extension '%s' "402"version %s from schema '%s'",403dbname, amcheck_extname,404amcheck_extversion, amcheck_nspname);405
406if (!heapallindexed_is_supported && heapallindexed)407elog(WARNING, "Extension '%s' version %s in schema '%s'"408"do not support 'heapallindexed' option",409amcheck_extname, amcheck_extversion,410amcheck_nspname);411
412#ifndef PGPRO_EE413/*414* Will support when the vanilla patch will commited https://commitfest.postgresql.org/32/2976/
415*/
416checkunique_is_supported = false;417#else418/*419* Check bt_index_check function signature to determine support of checkunique parameter
420* This can't be exactly checked by checking extension version,
421* For example, 1.1.1 and 1.2.1 supports this parameter, but 1.2 doesn't (PGPROEE-12.4.1)
422*/
423res = pgut_execute(db_conn, "SELECT "424" oid "425"FROM pg_catalog.pg_proc "426"WHERE "427" pronamespace = $1::regnamespace "428"AND proname = 'bt_index_check' "429"AND 'checkunique' = ANY(proargnames) "430"AND (pg_catalog.string_to_array(proargtypes::text, ' ')::regtype[])[pg_catalog.array_position(proargnames, 'checkunique')] = 'bool'::regtype",4311, (const char **) &amcheck_nspname);432
433if (PQresultStatus(res) != PGRES_TUPLES_OK)434{435PQclear(res);436elog(ERROR, "Cannot check 'checkunique' option is supported in bt_index_check function %s: %s",437dbname, PQerrorMessage(db_conn));438}439
440checkunique_is_supported = PQntuples(res) >= 1;441PQclear(res);442#endif443
444if (!checkunique_is_supported && checkunique)445elog(WARNING, "Extension '%s' version %s in schema '%s' "446"do not support 'checkunique' parameter",447amcheck_extname, amcheck_extversion,448amcheck_nspname);449
450/*451* In order to avoid duplicates, select global indexes
452* (tablespace pg_global with oid 1664) only once.
453*
454* select only persistent btree indexes.
455*/
456if (first_db_with_amcheck)457{458
459res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname, nmspc.nspname "460"FROM pg_catalog.pg_index idx "461"LEFT JOIN pg_catalog.pg_class cls ON idx.indexrelid=cls.oid "462"LEFT JOIN pg_catalog.pg_namespace nmspc ON cls.relnamespace=nmspc.oid "463"LEFT JOIN pg_catalog.pg_am am ON cls.relam=am.oid "464"WHERE am.amname='btree' "465"AND cls.relpersistence != 't' "466"AND cls.relkind != 'I' "467"ORDER BY nmspc.nspname DESC",4680, NULL);469}470else471{472
473res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname, nmspc.nspname "474"FROM pg_catalog.pg_index idx "475"LEFT JOIN pg_catalog.pg_class cls ON idx.indexrelid=cls.oid "476"LEFT JOIN pg_catalog.pg_namespace nmspc ON cls.relnamespace=nmspc.oid "477"LEFT JOIN pg_catalog.pg_am am ON cls.relam=am.oid "478"WHERE am.amname='btree' "479"AND cls.relpersistence != 't' "480"AND cls.relkind != 'I' "481"AND (cls.reltablespace IN "482"(SELECT oid from pg_catalog.pg_tablespace where spcname <> 'pg_global') "483"OR cls.reltablespace = 0) "484"ORDER BY nmspc.nspname DESC",4850, NULL);486}487
488/* add info needed to check indexes into index_list */489for (i = 0; i < PQntuples(res); i++)490{491pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry));492char *name = NULL;493char *namespace = NULL;494
495/* index oid */496ind->indexrelid = atoll(PQgetvalue(res, i, 0));497
498/* index relname */499name = PQgetvalue(res, i, 1);500ind->name = pgut_malloc(strlen(name) + 1);501strcpy(ind->name, name); /* enough buffer size guaranteed */502
503/* index namespace */504namespace = PQgetvalue(res, i, 2);505ind->namespace = pgut_malloc(strlen(namespace) + 1);506strcpy(ind->namespace, namespace); /* enough buffer size guaranteed */507
508ind->heapallindexed_is_supported = heapallindexed_is_supported;509ind->checkunique_is_supported = checkunique_is_supported;510ind->amcheck_nspname = pgut_malloc(strlen(amcheck_nspname) + 1);511strcpy(ind->amcheck_nspname, amcheck_nspname);512pg_atomic_clear_flag(&ind->lock);513
514if (index_list == NULL)515index_list = parray_new();516
517parray_append(index_list, ind);518}519
520PQclear(res);521free(amcheck_extversion);522free(amcheck_nspname);523free(amcheck_extname);524
525return index_list;526}
527
528/* check one index. Return true if everything is ok, false otherwise. */
529static bool530amcheck_one_index(check_indexes_arg *arguments,531pg_indexEntry *ind)532{
533PGresult *res;534char *params[3];535static const char *queries[] = {536"SELECT %s.bt_index_check(index => $1)",537"SELECT %s.bt_index_check(index => $1, heapallindexed => $2)",538"SELECT %s.bt_index_check(index => $1, heapallindexed => $2, checkunique => $3)",539};540int params_count;541char *query = NULL;542
543if (interrupted)544elog(ERROR, "Interrupted");545
546#define INDEXRELID 0547#define HEAPALLINDEXED 1548#define CHECKUNIQUE 2549/* first argument is index oid */550params[INDEXRELID] = palloc(64);551sprintf(params[INDEXRELID], "%u", ind->indexrelid);552/* second argument is heapallindexed */553params[HEAPALLINDEXED] = heapallindexed ? "true" : "false";554/* third optional argument is checkunique */555params[CHECKUNIQUE] = checkunique ? "true" : "false";556#undef CHECKUNIQUE557#undef HEAPALLINDEXED558
559params_count = ind->checkunique_is_supported ?5603 :561( ind->heapallindexed_is_supported ? 2 : 1 );562
563/*564* Prepare query text with schema name
565* +1 for \0 and -2 for %s
566*/
567query = palloc(strlen(ind->amcheck_nspname) + strlen(queries[params_count - 1]) + 1 - 2);568sprintf(query, queries[params_count - 1], ind->amcheck_nspname);569
570res = pgut_execute_parallel(arguments->conn_arg.conn,571arguments->conn_arg.cancel_conn,572query, params_count, (const char **)params, true, true, true);573
574if (PQresultStatus(res) != PGRES_TUPLES_OK)575{576elog(WARNING, "Thread [%d]. Amcheck failed in database '%s' for index: '%s.%s': %s",577arguments->thread_num, arguments->conn_opt.pgdatabase,578ind->namespace, ind->name, PQresultErrorMessage(res));579
580pfree(params[INDEXRELID]);581pfree(query);582PQclear(res);583return false;584}585else586elog(LOG, "Thread [%d]. Amcheck succeeded in database '%s' for index: '%s.%s'",587arguments->thread_num,588arguments->conn_opt.pgdatabase, ind->namespace, ind->name);589
590pfree(params[INDEXRELID]);591#undef INDEXRELID592pfree(query);593PQclear(res);594return true;595}
596
597/*
598* Entry point of checkdb --amcheck.
599*
600* Connect to all databases in the cluster
601* and get list of persistent indexes,
602* then run parallel threads to perform bt_index_check()
603* for all indexes from the list.
604*
605* If amcheck extension is not installed in the database,
606* skip this database and report it via warning message.
607*/
608static void609do_amcheck(ConnectionOptions conn_opt, PGconn *conn)610{
611int i;612/* arrays with meta info for multi threaded amcheck */613pthread_t *threads;614check_indexes_arg *threads_args;615bool check_isok = true;616PGresult *res_db;617int n_databases = 0;618bool first_db_with_amcheck = true;619bool db_skipped = false;620
621elog(INFO, "Start amchecking PostgreSQL instance");622
623res_db = pgut_execute(conn,624"SELECT datname, oid, dattablespace "625"FROM pg_catalog.pg_database "626"WHERE datname NOT IN ('template0'::name, 'template1'::name)",6270, NULL);628
629/* we don't need this connection anymore */630if (conn)631pgut_disconnect(conn);632
633n_databases = PQntuples(res_db);634
635/* For each database check indexes. In parallel. */636for(i = 0; i < n_databases; i++)637{638int j;639const char *dbname;640PGconn *db_conn = NULL;641parray *index_list = NULL;642
643dbname = PQgetvalue(res_db, i, 0);644db_conn = pgut_connect(conn_opt.pghost, conn_opt.pgport,645dbname, conn_opt.pguser);646
647index_list = get_index_list(dbname, first_db_with_amcheck,648db_conn);649
650/* we don't need this connection anymore */651if (db_conn)652pgut_disconnect(db_conn);653
654if (index_list == NULL)655{656db_skipped = true;657continue;658}659
660first_db_with_amcheck = false;661
662/* init thread args with own index lists */663threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);664threads_args = (check_indexes_arg *) palloc(sizeof(check_indexes_arg)*num_threads);665
666for (j = 0; j < num_threads; j++)667{668check_indexes_arg *arg = &(threads_args[j]);669
670arg->index_list = index_list;671arg->conn_arg.conn = NULL;672arg->conn_arg.cancel_conn = NULL;673
674arg->conn_opt.pghost = conn_opt.pghost;675arg->conn_opt.pgport = conn_opt.pgport;676arg->conn_opt.pgdatabase = dbname;677arg->conn_opt.pguser = conn_opt.pguser;678
679arg->thread_num = j + 1;680/* By default there are some error */681arg->ret = 1;682}683
684/* Run threads */685for (j = 0; j < num_threads; j++)686{687check_indexes_arg *arg = &(threads_args[j]);688elog(VERBOSE, "Start thread num: %i", j);689pthread_create(&threads[j], NULL, check_indexes, arg);690}691
692/* Wait threads */693for (j = 0; j < num_threads; j++)694{695pthread_join(threads[j], NULL);696if (threads_args[j].ret > 0)697check_isok = false;698}699
700if (check_isok)701elog(INFO, "Amcheck succeeded for database '%s'", dbname);702else703elog(WARNING, "Amcheck failed for database '%s'", dbname);704
705parray_walk(index_list, pg_indexEntry_free);706parray_free(index_list);707
708if (interrupted)709break;710}711
712/* cleanup */713PQclear(res_db);714
715/* Inform user about amcheck results */716if (interrupted)717elog(ERROR, "checkdb --amcheck is interrupted.");718
719if (check_isok)720{721elog(INFO, "checkdb --amcheck finished successfully. "722"All checked indexes are valid.");723
724if (db_skipped)725elog(ERROR, "Some databases were not amchecked.");726else727elog(INFO, "All databases were amchecked.");728}729else730elog(ERROR, "checkdb --amcheck finished with failure. "731"Not all checked indexes are valid. %s",732db_skipped?"Some databases were not amchecked.":733"All databases were amchecked.");734}
735
736/* Entry point of pg_probackup CHECKDB subcommand */
737void
738do_checkdb(bool need_amcheck,739ConnectionOptions conn_opt, char *pgdata)740{
741PGNodeInfo nodeInfo;742PGconn *cur_conn;743
744/* Initialize PGInfonode */745pgNodeInit(&nodeInfo);746
747if (skip_block_validation && !need_amcheck)748elog(ERROR, "Option '--skip-block-validation' must be used with '--amcheck' option");749
750if (!skip_block_validation)751{752if (!pgdata)753elog(ERROR, "Required parameter not specified: PGDATA "754"(-D, --pgdata)");755
756/* get node info */757cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);758
759/* ensure that conn credentials and pgdata are consistent */760check_system_identifiers(cur_conn, pgdata);761
762/*763* we don't need this connection anymore.
764* block validation can last long time,
765* so we don't hold the connection open,
766* rather open new connection for amcheck
767*/
768if (cur_conn)769pgut_disconnect(cur_conn);770
771do_block_validation(pgdata, nodeInfo.checksum_version);772}773
774if (need_amcheck)775{776cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);777do_amcheck(conn_opt, cur_conn);778}779}
780