pg_probackup

Форк
0
/
checkdb.c 
779 строк · 20.8 Кб
1
/*-------------------------------------------------------------------------
2
 *
3
 * src/checkdb.c
4
 * pg_probackup checkdb subcommand
5
 *
6
 * It allows to validate all data files located in PGDATA
7
 * via block checksums matching and page header sanity checks.
8
 * Optionally all indexes in all databases in PostgreSQL
9
 * instance can be logically verified using extensions
10
 * amcheck or amcheck_next.
11
 *
12
 * Portions Copyright (c) 2019-2019, Postgres Professional
13
 *
14
 *-------------------------------------------------------------------------
15
 */
16

17
#include "pg_probackup.h"
18

19
#include <sys/stat.h>
20
#include <time.h>
21
#include <unistd.h>
22

23
#include "utils/thread.h"
24
#include "utils/file.h"
25

26

27
typedef struct
28
{
29
	/* list of files to validate */
30
	parray	   *files_list;
31
	/* if page checksums are enabled in this postgres instance? */
32
	uint32 checksum_version;
33
	/*
34
	 * conn and cancel_conn
35
	 * to use in check_data_file
36
	 * to connect to postgres if we've failed to validate page
37
	 * and want to read it via buffer cache to ensure
38
	 */
39
	ConnectionArgs conn_arg;
40
	/* number of thread for debugging */
41
	int			thread_num;
42
	/* pgdata path */
43
	const char	*from_root;
44
	/*
45
	 * Return value from the thread:
46
	 * 0 everything is ok
47
	 * 1 thread errored during execution, e.g. interruption (default value)
48
	 * 2 corruption is definitely(!) found
49
	 */
50
	int			ret;
51
} check_files_arg;
52

53

54
typedef struct
55
{
56
	/* list of indexes to amcheck */
57
	parray	   *index_list;
58
	/*
59
	 * credentials to connect to postgres instance
60
	 * used for compatibility checks of blocksize,
61
	 * server version and so on
62
	 */
63
	ConnectionOptions conn_opt;
64
	/*
65
	 * conn and cancel_conn
66
	 * to use in threads to connect to databases
67
	 */
68
	ConnectionArgs conn_arg;
69
	/* number of thread for debugging */
70
	int			thread_num;
71
	/*
72
	 * Return value from the thread:
73
	 * 0 everything is ok
74
	 * 1 thread errored during execution, e.g. interruption (default value)
75
	 * 2 corruption is definitely(!) found
76
	 */
77
	int			ret;
78
} check_indexes_arg;
79

80
typedef struct pg_indexEntry
81
{
82
	Oid indexrelid;
83
	char *name;
84
	char *namespace;
85
	bool heapallindexed_is_supported;
86
	bool checkunique_is_supported;
87
	/* schema where amcheck extension is located */
88
	char *amcheck_nspname;
89
	/* lock for synchronization of parallel threads  */
90
	volatile pg_atomic_flag lock;
91
} pg_indexEntry;
92

93
static void
94
pg_indexEntry_free(void *index)
95
{
96
	pg_indexEntry *index_ptr;
97

98
	if (index == NULL)
99
		return;
100

101
	index_ptr = (pg_indexEntry *) index;
102

103
	if (index_ptr->name)
104
		free(index_ptr->name);
105
	if (index_ptr->name)
106
		free(index_ptr->namespace);
107
	if (index_ptr->amcheck_nspname)
108
		free(index_ptr->amcheck_nspname);
109

110
	free(index_ptr);
111
}
112

113

114
static void *check_files(void *arg);
115
static void do_block_validation(char *pgdata, uint32 checksum_version);
116

117
static void *check_indexes(void *arg);
118
static parray* get_index_list(const char *dbname, bool first_db_with_amcheck,
119
							  PGconn *db_conn);
120
static bool amcheck_one_index(check_indexes_arg *arguments,
121
				 pg_indexEntry *ind);
122
static void do_amcheck(ConnectionOptions conn_opt, PGconn *conn);
123

124
/*
125
 * Check files in PGDATA.
126
 * Read all files listed in files_list.
127
 * If the file is 'datafile' (regular relation's main fork), read it page by page,
128
 * verify checksum and copy.
129
 */
130
static void *
131
check_files(void *arg)
132
{
133
	int			i;
134
	check_files_arg *arguments = (check_files_arg *) arg;
135
	int			n_files_list = 0;
136
	char		from_fullpath[MAXPGPATH];
137

138
	if (arguments->files_list)
139
		n_files_list = parray_num(arguments->files_list);
140

141
	/* check a file */
142
	for (i = 0; i < n_files_list; i++)
143
	{
144
		pgFile	   *file = (pgFile *) parray_get(arguments->files_list, i);
145

146
		/* check for interrupt */
147
		if (interrupted || thread_interrupted)
148
			elog(ERROR, "Interrupted during checkdb");
149

150
		/* No need to check directories */
151
		if (S_ISDIR(file->mode))
152
			continue;
153

154
		if (!pg_atomic_test_set_flag(&file->lock))
155
			continue;
156

157
		join_path_components(from_fullpath, arguments->from_root, file->rel_path);
158

159
		elog(VERBOSE, "Checking file:  \"%s\" ", from_fullpath);
160

161
		if (progress)
162
			elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
163
				 i + 1, n_files_list, from_fullpath);
164

165
		if (S_ISREG(file->mode))
166
		{
167
			/* check only uncompressed by cfs datafiles */
168
			if (file->is_datafile && !file->is_cfs)
169
			{
170
				/*
171
				 * TODO deep inside check_data_file
172
				 * uses global variables to set connections.
173
				 * Need refactoring.
174
				 */
175
				if (!check_data_file(&(arguments->conn_arg),
176
									 file, from_fullpath,
177
									 arguments->checksum_version))
178
					arguments->ret = 2; /* corruption found */
179
			}
180
		}
181
		else
182
			elog(WARNING, "unexpected file type %d", file->mode);
183
	}
184

185
	/* Ret values:
186
	 * 0 everything is ok
187
	 * 1 thread errored during execution, e.g. interruption (default value)
188
	 * 2 corruption is definitely(!) found
189
	 */
190
	if (arguments->ret == 1)
191
		arguments->ret = 0;
192

193
	return NULL;
194
}
195

196
/* collect list of files and run threads to check files in the instance */
197
static void
198
do_block_validation(char *pgdata, uint32 checksum_version)
199
{
200
	int			i;
201
	/* arrays with meta info for multi threaded check */
202
	pthread_t	*threads;
203
	check_files_arg *threads_args;
204
	bool		check_isok = true;
205
	parray *files_list = NULL;
206

207
	/* initialize file list */
208
	files_list = parray_new();
209

210
	/* list files with the logical path. omit $PGDATA */
211
	dir_list_file(files_list, pgdata, true, true,
212
				  false, false, true, 0, FIO_DB_HOST);
213

214
	/*
215
	 * Sort pathname ascending.
216
	 *
217
	 * For example:
218
	 * 1 - create 'base'
219
	 * 2 - create 'base/1'
220
	 */
221
	parray_qsort(files_list, pgFileCompareRelPathWithExternal);
222
	/* Extract information about files in pgdata parsing their names:*/
223
	parse_filelist_filenames(files_list, pgdata);
224

225
	/* setup threads */
226
	for (i = 0; i < parray_num(files_list); i++)
227
	{
228
		pgFile	   *file = (pgFile *) parray_get(files_list, i);
229
		pg_atomic_init_flag(&file->lock);
230
	}
231

232
	/* Sort by size for load balancing */
233
	parray_qsort(files_list, pgFileCompareSize);
234

235
	/* init thread args with own file lists */
236
	threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
237
	threads_args = (check_files_arg *) palloc(sizeof(check_files_arg)*num_threads);
238

239
	for (i = 0; i < num_threads; i++)
240
	{
241
		check_files_arg *arg = &(threads_args[i]);
242

243
		arg->files_list = files_list;
244
		arg->checksum_version = checksum_version;
245
		arg->from_root = pgdata;
246

247
		arg->conn_arg.conn = NULL;
248
		arg->conn_arg.cancel_conn = NULL;
249

250
		arg->thread_num = i + 1;
251
		/* By default there is some error */
252
		arg->ret = 1;
253
	}
254

255
	elog(INFO, "Start checking data files");
256

257
	/* Run threads */
258
	for (i = 0; i < num_threads; i++)
259
	{
260
		check_files_arg *arg = &(threads_args[i]);
261

262
		elog(VERBOSE, "Start thread num: %i", i);
263

264
		pthread_create(&threads[i], NULL, check_files, arg);
265
	}
266

267
	/* Wait threads */
268
	for (i = 0; i < num_threads; i++)
269
	{
270
		pthread_join(threads[i], NULL);
271
		if (threads_args[i].ret > 0)
272
			check_isok = false;
273
	}
274

275
	/* cleanup */
276
	if (files_list)
277
	{
278
		parray_walk(files_list, pgFileFree);
279
		parray_free(files_list);
280
		files_list = NULL;
281
	}
282

283
	if (check_isok)
284
		elog(INFO, "Data files are valid");
285
	else
286
		elog(ERROR, "Checkdb failed");
287
}
288

289
/* Check indexes with amcheck */
290
static void *
291
check_indexes(void *arg)
292
{
293
	int			i;
294
	check_indexes_arg *arguments = (check_indexes_arg *) arg;
295
	int			n_indexes = 0;
296
	my_thread_num = arguments->thread_num;
297

298
	if (arguments->index_list)
299
		n_indexes = parray_num(arguments->index_list);
300

301
	for (i = 0; i < n_indexes; i++)
302
	{
303
		pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i);
304

305
		if (!pg_atomic_test_set_flag(&ind->lock))
306
			continue;
307

308
		/* check for interrupt */
309
		if (interrupted || thread_interrupted)
310
			elog(ERROR, "Thread [%d]: interrupted during checkdb --amcheck",
311
				arguments->thread_num);
312

313
		if (progress)
314
			elog(INFO, "Thread [%d]. Progress: (%d/%d). Amchecking index '%s.%s'",
315
				 arguments->thread_num, i + 1, n_indexes,
316
				 ind->namespace, ind->name);
317

318
		if (arguments->conn_arg.conn == NULL)
319
		{
320

321
			arguments->conn_arg.conn = pgut_connect(arguments->conn_opt.pghost,
322
												arguments->conn_opt.pgport,
323
												arguments->conn_opt.pgdatabase,
324
												arguments->conn_opt.pguser);
325
			arguments->conn_arg.cancel_conn = PQgetCancel(arguments->conn_arg.conn);
326
		}
327

328
		/* remember that we have a failed check */
329
		if (!amcheck_one_index(arguments, ind))
330
			arguments->ret = 2; /* corruption found */
331
	}
332

333
	/* Close connection. */
334
	if (arguments->conn_arg.conn)
335
		pgut_disconnect(arguments->conn_arg.conn);
336

337
	/* Ret values:
338
	 * 0 everything is ok
339
	 * 1 thread errored during execution, e.g. interruption (default value)
340
	 * 2 corruption is definitely(!) found
341
	 */
342
	if (arguments->ret == 1)
343
		arguments->ret = 0;
344

345
	return NULL;
346
}
347

348
/* Get index list for given database */
349
static parray*
350
get_index_list(const char *dbname, bool first_db_with_amcheck,
351
			   PGconn *db_conn)
352
{
353
	PGresult   *res;
354
	char *amcheck_nspname = NULL;
355
	char *amcheck_extname = NULL;
356
	char *amcheck_extversion = NULL;
357
	int i;
358
	bool heapallindexed_is_supported = false;
359
	bool checkunique_is_supported = false;
360
	parray *index_list = NULL;
361

362
	/* Check amcheck extension version */
363
	res = pgut_execute(db_conn, "SELECT "
364
								"extname, nspname, extversion "
365
								"FROM pg_catalog.pg_namespace n "
366
								"JOIN pg_catalog.pg_extension e "
367
								"ON n.oid=e.extnamespace "
368
								"WHERE e.extname IN ('amcheck'::name, 'amcheck_next'::name) "
369
								"ORDER BY extversion DESC "
370
								"LIMIT 1",
371
								0, NULL);
372

373
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
374
	{
375
		PQclear(res);
376
		elog(ERROR, "Cannot check if amcheck is installed in database %s: %s",
377
			dbname, PQerrorMessage(db_conn));
378
	}
379

380
	if (PQntuples(res) < 1)
381
	{
382
		elog(WARNING, "Extension 'amcheck' or 'amcheck_next' are "
383
					  "not installed in database %s", dbname);
384
		return NULL;
385
	}
386

387
	amcheck_extname = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1);
388
	strcpy(amcheck_extname, PQgetvalue(res, 0, 0));
389
	amcheck_nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1);
390
	strcpy(amcheck_nspname, PQgetvalue(res, 0, 1));
391
	amcheck_extversion = pgut_malloc(strlen(PQgetvalue(res, 0, 2)) + 1);
392
	strcpy(amcheck_extversion, PQgetvalue(res, 0, 2));
393
	PQclear(res);
394

395
	/* heapallindexed_is_supported is database specific */
396
	/* TODO this is wrong check, heapallindexed supported also in 1.1.1, 1.2 and 1.2.1... */
397
	if (strcmp(amcheck_extversion, "1.0") != 0 &&
398
		strcmp(amcheck_extversion, "1") != 0)
399
			heapallindexed_is_supported = true;
400

401
	elog(INFO, "Amchecking database '%s' using extension '%s' "
402
			   "version %s from schema '%s'",
403
				dbname, amcheck_extname,
404
				amcheck_extversion, amcheck_nspname);
405

406
	if (!heapallindexed_is_supported && heapallindexed)
407
		elog(WARNING, "Extension '%s' version %s in schema '%s'"
408
					  "do not support 'heapallindexed' option",
409
					   amcheck_extname, amcheck_extversion,
410
					   amcheck_nspname);
411

412
#ifndef PGPRO_EE
413
	/*
414
	 * Will support when the vanilla patch will commited https://commitfest.postgresql.org/32/2976/
415
	 */
416
	checkunique_is_supported = false;
417
#else
418
	/*
419
	 * Check bt_index_check function signature to determine support of checkunique parameter
420
	 * This can't be exactly checked by checking extension version,
421
	 * For example, 1.1.1 and 1.2.1 supports this parameter, but 1.2 doesn't (PGPROEE-12.4.1)
422
	 */
423
	res = pgut_execute(db_conn, "SELECT "
424
								"    oid "
425
								"FROM pg_catalog.pg_proc "
426
								"WHERE "
427
								"    pronamespace = $1::regnamespace "
428
								"AND proname = 'bt_index_check' "
429
								"AND 'checkunique' = ANY(proargnames) "
430
								"AND (pg_catalog.string_to_array(proargtypes::text, ' ')::regtype[])[pg_catalog.array_position(proargnames, 'checkunique')] = 'bool'::regtype",
431
								1, (const char **) &amcheck_nspname);
432

433
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
434
	{
435
		PQclear(res);
436
		elog(ERROR, "Cannot check 'checkunique' option is supported in bt_index_check function %s: %s",
437
			dbname, PQerrorMessage(db_conn));
438
	}
439

440
	checkunique_is_supported = PQntuples(res) >= 1;
441
	PQclear(res);
442
#endif
443

444
	if (!checkunique_is_supported && checkunique)
445
		elog(WARNING, "Extension '%s' version %s in schema '%s' "
446
					  "do not support 'checkunique' parameter",
447
					   amcheck_extname, amcheck_extversion,
448
					   amcheck_nspname);
449

450
	/*
451
	 * In order to avoid duplicates, select global indexes
452
	 * (tablespace pg_global with oid 1664) only once.
453
	 *
454
	 * select only persistent btree indexes.
455
	 */
456
	if (first_db_with_amcheck)
457
	{
458

459
		res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname, nmspc.nspname "
460
									"FROM pg_catalog.pg_index idx "
461
									"LEFT JOIN pg_catalog.pg_class cls ON idx.indexrelid=cls.oid "
462
									"LEFT JOIN pg_catalog.pg_namespace nmspc ON cls.relnamespace=nmspc.oid "
463
									"LEFT JOIN pg_catalog.pg_am am ON cls.relam=am.oid "
464
									"WHERE am.amname='btree' "
465
									"AND cls.relpersistence != 't' "
466
									"AND cls.relkind != 'I' "
467
									"ORDER BY nmspc.nspname DESC",
468
									0, NULL);
469
	}
470
	else
471
	{
472

473
		res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname, nmspc.nspname "
474
									"FROM pg_catalog.pg_index idx "
475
									"LEFT JOIN pg_catalog.pg_class cls ON idx.indexrelid=cls.oid "
476
									"LEFT JOIN pg_catalog.pg_namespace nmspc ON cls.relnamespace=nmspc.oid "
477
									"LEFT JOIN pg_catalog.pg_am am ON cls.relam=am.oid "
478
									"WHERE am.amname='btree' "
479
									"AND cls.relpersistence != 't' "
480
									"AND cls.relkind != 'I' "
481
									"AND (cls.reltablespace IN "
482
									"(SELECT oid from pg_catalog.pg_tablespace where spcname <> 'pg_global') "
483
									"OR cls.reltablespace = 0) "
484
									"ORDER BY nmspc.nspname DESC",
485
									0, NULL);
486
	}
487

488
	/* add info needed to check indexes into index_list */
489
	for (i = 0; i < PQntuples(res); i++)
490
	{
491
		pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry));
492
		char *name = NULL;
493
		char *namespace = NULL;
494

495
		/* index oid */
496
		ind->indexrelid = atoll(PQgetvalue(res, i, 0));
497

498
		/* index relname */
499
		name = PQgetvalue(res, i, 1);
500
		ind->name = pgut_malloc(strlen(name) + 1);
501
		strcpy(ind->name, name);	/* enough buffer size guaranteed */
502

503
		/* index namespace */
504
		namespace = PQgetvalue(res, i, 2);
505
		ind->namespace = pgut_malloc(strlen(namespace) + 1);
506
		strcpy(ind->namespace, namespace);	/* enough buffer size guaranteed */
507

508
		ind->heapallindexed_is_supported = heapallindexed_is_supported;
509
		ind->checkunique_is_supported = checkunique_is_supported;
510
		ind->amcheck_nspname = pgut_malloc(strlen(amcheck_nspname) + 1);
511
		strcpy(ind->amcheck_nspname, amcheck_nspname);
512
		pg_atomic_clear_flag(&ind->lock);
513

514
		if (index_list == NULL)
515
			index_list = parray_new();
516

517
		parray_append(index_list, ind);
518
	}
519

520
	PQclear(res);
521
	free(amcheck_extversion);
522
	free(amcheck_nspname);
523
	free(amcheck_extname);
524

525
	return index_list;
526
}
527

528
/* check one index. Return true if everything is ok, false otherwise. */
529
static bool
530
amcheck_one_index(check_indexes_arg *arguments,
531
				 pg_indexEntry *ind)
532
{
533
	PGresult	*res;
534
	char		*params[3];
535
	static const char	*queries[] = {
536
			"SELECT %s.bt_index_check(index => $1)",
537
			"SELECT %s.bt_index_check(index => $1, heapallindexed => $2)",
538
			"SELECT %s.bt_index_check(index => $1, heapallindexed => $2, checkunique => $3)",
539
	};
540
	int			params_count;
541
	char		*query = NULL;
542

543
	if (interrupted)
544
		elog(ERROR, "Interrupted");
545

546
#define INDEXRELID 0
547
#define HEAPALLINDEXED 1
548
#define CHECKUNIQUE 2
549
	/* first argument is index oid */
550
	params[INDEXRELID] = palloc(64);
551
	sprintf(params[INDEXRELID], "%u", ind->indexrelid);
552
	/* second argument is heapallindexed */
553
	params[HEAPALLINDEXED] = heapallindexed ? "true" : "false";
554
	/* third optional argument is checkunique */
555
	params[CHECKUNIQUE] = checkunique ? "true" : "false";
556
#undef CHECKUNIQUE
557
#undef HEAPALLINDEXED
558

559
	params_count = ind->checkunique_is_supported ?
560
			3 :
561
			( ind->heapallindexed_is_supported ? 2 : 1 );
562

563
	/*
564
	 * Prepare query text with schema name
565
	 * +1 for \0 and -2 for %s
566
	 */
567
	query = palloc(strlen(ind->amcheck_nspname) + strlen(queries[params_count - 1]) + 1 - 2);
568
	sprintf(query, queries[params_count - 1], ind->amcheck_nspname);
569

570
	res = pgut_execute_parallel(arguments->conn_arg.conn,
571
								arguments->conn_arg.cancel_conn,
572
								query, params_count, (const char **)params, true, true, true);
573

574
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
575
	{
576
		elog(WARNING, "Thread [%d]. Amcheck failed in database '%s' for index: '%s.%s': %s",
577
					   arguments->thread_num, arguments->conn_opt.pgdatabase,
578
					   ind->namespace, ind->name, PQresultErrorMessage(res));
579

580
		pfree(params[INDEXRELID]);
581
		pfree(query);
582
		PQclear(res);
583
		return false;
584
	}
585
	else
586
		elog(LOG, "Thread [%d]. Amcheck succeeded in database '%s' for index: '%s.%s'",
587
				arguments->thread_num,
588
				arguments->conn_opt.pgdatabase, ind->namespace, ind->name);
589

590
	pfree(params[INDEXRELID]);
591
#undef INDEXRELID
592
	pfree(query);
593
	PQclear(res);
594
	return true;
595
}
596

597
/*
598
 * Entry point of checkdb --amcheck.
599
 *
600
 * Connect to all databases in the cluster
601
 * and get list of persistent indexes,
602
 * then run parallel threads to perform bt_index_check()
603
 * for all indexes from the list.
604
 *
605
 * If amcheck extension is not installed in the database,
606
 * skip this database and report it via warning message.
607
 */
608
static void
609
do_amcheck(ConnectionOptions conn_opt, PGconn *conn)
610
{
611
	int			i;
612
	/* arrays with meta info for multi threaded amcheck */
613
	pthread_t	*threads;
614
	check_indexes_arg *threads_args;
615
	bool		check_isok = true;
616
	PGresult   *res_db;
617
	int n_databases = 0;
618
	bool first_db_with_amcheck = true;
619
	bool db_skipped = false;
620

621
	elog(INFO, "Start amchecking PostgreSQL instance");
622

623
	res_db = pgut_execute(conn,
624
						"SELECT datname, oid, dattablespace "
625
						"FROM pg_catalog.pg_database "
626
						"WHERE datname NOT IN ('template0'::name, 'template1'::name)",
627
						  0, NULL);
628

629
	/* we don't need this connection anymore */
630
	if (conn)
631
		pgut_disconnect(conn);
632

633
	n_databases =  PQntuples(res_db);
634

635
	/* For each database check indexes. In parallel. */
636
	for(i = 0; i < n_databases; i++)
637
	{
638
		int j;
639
		const char 	*dbname;
640
		PGconn 		*db_conn = NULL;
641
		parray 		*index_list = NULL;
642

643
		dbname = PQgetvalue(res_db, i, 0);
644
		db_conn = pgut_connect(conn_opt.pghost, conn_opt.pgport,
645
								dbname, conn_opt.pguser);
646

647
		index_list = get_index_list(dbname, first_db_with_amcheck,
648
									db_conn);
649

650
		/* we don't need this connection anymore */
651
		if (db_conn)
652
			pgut_disconnect(db_conn);
653

654
		if (index_list == NULL)
655
		{
656
			db_skipped = true;
657
			continue;
658
		}
659

660
		first_db_with_amcheck = false;
661

662
		/* init thread args with own index lists */
663
		threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
664
		threads_args = (check_indexes_arg *) palloc(sizeof(check_indexes_arg)*num_threads);
665

666
		for (j = 0; j < num_threads; j++)
667
		{
668
			check_indexes_arg *arg = &(threads_args[j]);
669

670
			arg->index_list = index_list;
671
			arg->conn_arg.conn = NULL;
672
			arg->conn_arg.cancel_conn = NULL;
673

674
			arg->conn_opt.pghost = conn_opt.pghost;
675
			arg->conn_opt.pgport = conn_opt.pgport;
676
			arg->conn_opt.pgdatabase = dbname;
677
			arg->conn_opt.pguser = conn_opt.pguser;
678

679
			arg->thread_num = j + 1;
680
			/* By default there are some error */
681
			arg->ret = 1;
682
		}
683

684
		/* Run threads */
685
		for (j = 0; j < num_threads; j++)
686
		{
687
			check_indexes_arg *arg = &(threads_args[j]);
688
			elog(VERBOSE, "Start thread num: %i", j);
689
			pthread_create(&threads[j], NULL, check_indexes, arg);
690
		}
691

692
		/* Wait threads */
693
		for (j = 0; j < num_threads; j++)
694
		{
695
			pthread_join(threads[j], NULL);
696
			if (threads_args[j].ret > 0)
697
				check_isok = false;
698
		}
699

700
		if (check_isok)
701
			elog(INFO, "Amcheck succeeded for database '%s'", dbname);
702
		else
703
			elog(WARNING, "Amcheck failed for database '%s'", dbname);
704

705
		parray_walk(index_list, pg_indexEntry_free);
706
		parray_free(index_list);
707

708
		if (interrupted)
709
			break;
710
	}
711

712
	/* cleanup */
713
	PQclear(res_db);
714

715
	/* Inform user about amcheck results */
716
	if (interrupted)
717
		elog(ERROR, "checkdb --amcheck is interrupted.");
718

719
	if (check_isok)
720
	{
721
		elog(INFO, "checkdb --amcheck finished successfully. "
722
					   "All checked indexes are valid.");
723

724
		if (db_skipped)
725
			elog(ERROR, "Some databases were not amchecked.");
726
		else
727
			elog(INFO, "All databases were amchecked.");
728
	}
729
	else
730
		elog(ERROR, "checkdb --amcheck finished with failure. "
731
					"Not all checked indexes are valid. %s",
732
					db_skipped?"Some databases were not amchecked.":
733
							   "All databases were amchecked.");
734
}
735

736
/* Entry point of pg_probackup CHECKDB subcommand */
737
void
738
do_checkdb(bool need_amcheck,
739
		   ConnectionOptions conn_opt, char *pgdata)
740
{
741
	PGNodeInfo nodeInfo;
742
	PGconn *cur_conn;
743

744
	/* Initialize PGInfonode */
745
	pgNodeInit(&nodeInfo);
746

747
	if (skip_block_validation && !need_amcheck)
748
		elog(ERROR, "Option '--skip-block-validation' must be used with '--amcheck' option");
749

750
	if (!skip_block_validation)
751
	{
752
		if (!pgdata)
753
			elog(ERROR, "Required parameter not specified: PGDATA "
754
						 "(-D, --pgdata)");
755

756
		/* get node info */
757
		cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);
758

759
		/* ensure that conn credentials and pgdata are consistent */
760
		check_system_identifiers(cur_conn, pgdata);
761

762
		/*
763
		 * we don't need this connection anymore.
764
		 * block validation can last long time,
765
		 * so we don't hold the connection open,
766
		 * rather open new connection for amcheck
767
		 */
768
		if (cur_conn)
769
			pgut_disconnect(cur_conn);
770

771
		do_block_validation(pgdata, nodeInfo.checksum_version);
772
	}
773

774
	if (need_amcheck)
775
	{
776
		cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);
777
		do_amcheck(conn_opt, cur_conn);
778
	}
779
}
780

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.