pg_probackup

Форк
0
/
catchup.c 
1130 строк · 36.8 Кб
1
/*-------------------------------------------------------------------------
2
 *
3
 * catchup.c: sync DB cluster
4
 *
5
 * Copyright (c) 2021-2022, Postgres Professional
6
 *
7
 *-------------------------------------------------------------------------
8
 */
9

10
#include "pg_probackup.h"
11

12
#if PG_VERSION_NUM < 110000
13
#include "catalog/catalog.h"
14
#endif
15
#include "catalog/pg_tablespace.h"
16
#include "access/timeline.h"
17
#include "pgtar.h"
18
#include "streamutil.h"
19

20
#include <sys/stat.h>
21
#include <time.h>
22
#include <unistd.h>
23

24
#include "utils/thread.h"
25
#include "utils/file.h"
26

27
/*
28
 * Catchup routines
29
 */
30
static PGconn *catchup_init_state(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata);
31
static void catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, const char *source_pgdata, 
32
					const char *dest_pgdata);
33
static void catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn);
34
static parray* catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli);
35

36
//REVIEW I'd also suggest to wrap all these fields into some CatchupState, but it isn't urgent.
37
//REVIEW_ANSWER what for?
38
/*
39
 * Prepare for work: fill some globals, open connection to source database
40
 */
41
static PGconn *
42
catchup_init_state(PGNodeInfo	*source_node_info, const char *source_pgdata, const char *dest_pgdata)
43
{
44
	PGconn		*source_conn;
45

46
	/* Initialize PGInfonode */
47
	pgNodeInit(source_node_info);
48

49
	/* Get WAL segments size and system ID of source PG instance */
50
	instance_config.xlog_seg_size = get_xlog_seg_size(source_pgdata);
51
	instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST, false);
52
	current.start_time = time(NULL);
53

54
	strlcpy(current.program_version, PROGRAM_VERSION, sizeof(current.program_version));
55

56
	/* Do some compatibility checks and fill basic info about PG instance */
57
	source_conn = pgdata_basic_setup(instance_config.conn_opt, source_node_info);
58

59
#if PG_VERSION_NUM >= 110000
60
	if (!RetrieveWalSegSize(source_conn))
61
		elog(ERROR, "Failed to retrieve wal_segment_size");
62
#endif
63

64
	get_ptrack_version(source_conn, source_node_info);
65
	if (source_node_info->ptrack_version_num > 0)
66
		source_node_info->is_ptrack_enabled = pg_is_ptrack_enabled(source_conn, source_node_info->ptrack_version_num);
67

68
	/* Obtain current timeline */
69
#if PG_VERSION_NUM >= 90600
70
	current.tli = get_current_timeline(source_conn);
71
#else
72
	instance_config.pgdata = source_pgdata;
73
	current.tli = get_current_timeline_from_control(source_pgdata, FIO_DB_HOST, false);
74
#endif
75

76
	elog(INFO, "Catchup start, pg_probackup version: %s, "
77
			"PostgreSQL version: %s, "
78
			"remote: %s, source-pgdata: %s, destination-pgdata: %s",
79
			PROGRAM_VERSION, source_node_info->server_version_str,
80
			IsSshProtocol()  ? "true" : "false",
81
			source_pgdata, dest_pgdata);
82

83
	if (current.from_replica)
84
		elog(INFO, "Running catchup from standby");
85

86
	return source_conn;
87
}
88

89
/*
90
 * Check that catchup can be performed on source and dest
91
 * this function is for checks, that can be performed without modification of data on disk
92
 */
93
static void
94
catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn,
95
		const char *source_pgdata, const char *dest_pgdata)
96
{
97
	/*  TODO
98
	 *  gsmol - fallback to FULL mode if dest PGDATA is empty
99
	 *  kulaginm -- I think this is a harmful feature. If user requested an incremental catchup, then
100
	 * he expects that this will be done quickly and efficiently. If, for example, he made a mistake
101
	 * with dest_dir, then he will receive a second full copy instead of an error message, and I think
102
	 * that in some cases he would prefer the error.
103
	 * I propose in future versions to offer a backup_mode auto, in which we will look to the dest_dir
104
	 * and decide which of the modes will be the most effective.
105
	 * I.e.:
106
	 *   if(requested_backup_mode == BACKUP_MODE_DIFF_AUTO)
107
	 *   {
108
	 *     if(dest_pgdata_is_empty)
109
	 *       backup_mode = BACKUP_MODE_FULL;
110
	 *     else
111
	 *       if(ptrack supported and applicable)
112
	 *         backup_mode = BACKUP_MODE_DIFF_PTRACK;
113
	 *       else
114
	 *         backup_mode = BACKUP_MODE_DIFF_DELTA;
115
	 *   }
116
	 */
117

118
	if (dir_is_empty(dest_pgdata, FIO_LOCAL_HOST))
119
	{
120
		if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK ||
121
			 current.backup_mode == BACKUP_MODE_DIFF_DELTA)
122
			elog(ERROR, "\"%s\" is empty, but incremental catchup mode requested.",
123
				dest_pgdata);
124
	}
125
	else /* dest dir not empty */
126
	{
127
		if (current.backup_mode == BACKUP_MODE_FULL)
128
			elog(ERROR, "Can't perform full catchup into non-empty directory \"%s\".",
129
				dest_pgdata);
130
	}
131

132
	/* check that postmaster is not running in destination */
133
	if (current.backup_mode != BACKUP_MODE_FULL)
134
	{
135
		pid_t   pid;
136
		pid = fio_check_postmaster(dest_pgdata, FIO_LOCAL_HOST);
137
		if (pid == 1) /* postmaster.pid is mangled */
138
		{
139
			char	pid_filename[MAXPGPATH];
140
			join_path_components(pid_filename, dest_pgdata, "postmaster.pid");
141
			elog(ERROR, "Pid file \"%s\" is mangled, cannot determine whether postmaster is running or not",
142
				pid_filename);
143
		}
144
		else if (pid > 1) /* postmaster is up */
145
		{
146
			elog(ERROR, "Postmaster with pid %u is running in destination directory \"%s\"",
147
				pid, dest_pgdata);
148
		}
149
	}
150

151
	/* check backup_label absence in dest */
152
	if (current.backup_mode != BACKUP_MODE_FULL)
153
	{
154
		char	backup_label_filename[MAXPGPATH];
155

156
		join_path_components(backup_label_filename, dest_pgdata, PG_BACKUP_LABEL_FILE);
157
		if (fio_access(backup_label_filename, F_OK, FIO_LOCAL_HOST) == 0)
158
			elog(ERROR, "Destination directory contains \"" PG_BACKUP_LABEL_FILE "\" file");
159
	}
160

161
	/* Check that connected PG instance, source and destination PGDATA are the same */
162
	{
163
		uint64	source_conn_id, source_id, dest_id;
164

165
		source_conn_id = get_remote_system_identifier(source_conn);
166
		source_id = get_system_identifier(source_pgdata, FIO_DB_HOST, false); /* same as instance_config.system_identifier */
167

168
		if (source_conn_id != source_id)
169
			elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
170
				source_conn_id, source_pgdata, source_id);
171

172
		if (current.backup_mode != BACKUP_MODE_FULL)
173
		{
174
			dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST, false);
175
			if (source_conn_id != dest_id)
176
			elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
177
				source_conn_id, dest_pgdata, dest_id);
178
		}
179
	}
180

181
	/* check PTRACK version */
182
	if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
183
	{
184
		if (source_node_info->ptrack_version_num == 0)
185
			elog(ERROR, "This PostgreSQL instance does not support ptrack");
186
		else if (source_node_info->ptrack_version_num < 200)
187
			elog(ERROR, "Ptrack extension is too old.\n"
188
					"Upgrade ptrack to version >= 2");
189
		else if (!source_node_info->is_ptrack_enabled)
190
			elog(ERROR, "Ptrack is disabled");
191
	}
192

193
	if (current.from_replica && exclusive_backup)
194
		elog(ERROR, "Catchup from standby is only available for PostgreSQL >= 9.6");
195

196
	/* check that we don't overwrite tablespace in source pgdata */
197
	catchup_check_tablespaces_existance_in_tbsmapping(source_conn);
198

199
	/* check timelines */
200
	if (current.backup_mode != BACKUP_MODE_FULL)
201
	{
202
		RedoParams	dest_redo = { 0, InvalidXLogRecPtr, 0 };
203

204
		/* fill dest_redo.lsn and dest_redo.tli */
205
		get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);
206
		elog(LOG, "source.tli = %X, dest_redo.lsn = %X/%X, dest_redo.tli = %X",
207
			current.tli, (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn, dest_redo.tli);
208

209
		if (current.tli != 1)
210
		{
211
			parray	*source_timelines; /* parray* of TimeLineHistoryEntry* */
212
			source_timelines = catchup_get_tli_history(&instance_config.conn_opt, current.tli);
213

214
			if (source_timelines == NULL)
215
				elog(ERROR, "Cannot get source timeline history");
216

217
			if (!satisfy_timeline(source_timelines, dest_redo.tli, dest_redo.lsn))
218
				elog(ERROR, "Destination is not in source timeline history");
219

220
			parray_walk(source_timelines, pfree);
221
			parray_free(source_timelines);
222
		}
223
		else /* special case -- no history files in source */
224
		{
225
			if (dest_redo.tli != 1)
226
				elog(ERROR, "Source is behind destination in timeline history");
227
		}
228
	}
229
}
230

231
/*
232
 * Check that all tablespaces exists in tablespace mapping (--tablespace-mapping option)
233
 * Check that all local mapped directories is empty if it is local FULL catchup
234
 * Emit fatal error if that (not existent in map or not empty) tablespace found
235
 */
236
static void
237
catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn)
238
{
239
	PGresult	*res;
240
	int		i;
241
	char		*tablespace_path = NULL;
242
	const char	*linked_path = NULL;
243
	char		*query = "SELECT pg_catalog.pg_tablespace_location(oid) "
244
						"FROM pg_catalog.pg_tablespace "
245
						"WHERE pg_catalog.pg_tablespace_location(oid) <> '';";
246

247
	res = pgut_execute(conn, query, 0, NULL);
248

249
	if (!res)
250
		elog(ERROR, "Failed to get list of tablespaces");
251

252
	for (i = 0; i < res->ntups; i++)
253
	{
254
		tablespace_path = PQgetvalue(res, i, 0);
255
		Assert (strlen(tablespace_path) > 0);
256

257
		canonicalize_path(tablespace_path);
258
		linked_path = get_tablespace_mapping(tablespace_path);
259

260
		if (strcmp(tablespace_path, linked_path) == 0)
261
		/* same result -> not found in mapping */
262
		{
263
			if (!fio_is_remote(FIO_DB_HOST))
264
				elog(ERROR, "Local catchup executed, but source database contains "
265
					"tablespace (\"%s\"), that is not listed in the map", tablespace_path);
266
			else
267
				elog(WARNING, "Remote catchup executed and source database contains "
268
					"tablespace (\"%s\"), that is not listed in the map", tablespace_path);
269
		}
270

271
		if (!is_absolute_path(linked_path))
272
			elog(ERROR, "Tablespace directory path must be an absolute path: \"%s\"",
273
				linked_path);
274

275
		if (current.backup_mode == BACKUP_MODE_FULL
276
				&& !dir_is_empty(linked_path, FIO_LOCAL_HOST))
277
			elog(ERROR, "Target mapped tablespace directory (\"%s\") is not empty in FULL catchup",
278
				linked_path);
279
	}
280
	PQclear(res);
281
}
282

283
/*
284
 * Get timeline history via replication connection
285
 * returns parray* of TimeLineHistoryEntry*
286
 */
287
static parray*
288
catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli)
289
{
290
	PGresult             *res;
291
	PGconn	             *conn;
292
	char                 *history;
293
	char                  query[128];
294
	parray	             *result = NULL;
295
	TimeLineHistoryEntry *entry = NULL;
296

297
	snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", tli);
298

299
	/*
300
	 * Connect in replication mode to the server.
301
	 */
302
	conn = pgut_connect_replication(conn_opt->pghost,
303
									conn_opt->pgport,
304
									conn_opt->pgdatabase,
305
									conn_opt->pguser,
306
									false);
307

308
	if (!conn)
309
		return NULL;
310

311
	res = PQexec(conn, query);
312
	PQfinish(conn);
313

314
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
315
	{
316
		elog(WARNING, "Could not send replication command \"%s\": %s",
317
					query, PQresultErrorMessage(res));
318
		PQclear(res);
319
		return NULL;
320
	}
321

322
	/*
323
	 * The response to TIMELINE_HISTORY is a single row result set
324
	 * with two fields: filename and content
325
	 */
326
	if (PQnfields(res) != 2 || PQntuples(res) != 1)
327
	{
328
		elog(ERROR, "Unexpected response to TIMELINE_HISTORY command: "
329
				"got %d rows and %d fields, expected %d rows and %d fields",
330
				PQntuples(res), PQnfields(res), 1, 2);
331
		PQclear(res);
332
		return NULL;
333
	}
334

335
	history = pgut_strdup(PQgetvalue(res, 0, 1));
336
	result = parse_tli_history_buffer(history, tli);
337

338
	/* some cleanup */
339
	pg_free(history);
340
	PQclear(res);
341

342
	/* append last timeline entry (as read_timeline_history() do) */
343
	entry = pgut_new(TimeLineHistoryEntry);
344
	entry->tli = tli;
345
	entry->end = InvalidXLogRecPtr;
346
	parray_insert(result, 0, entry);
347

348
	return result;
349
}
350

351
/*
352
 * catchup multithreaded copy rountine and helper structure and function
353
 */
354

355
/* parameters for catchup_thread_runner() passed from catchup_multithreaded_copy() */
356
typedef struct
357
{
358
	PGNodeInfo *nodeInfo;
359
	const char *from_root;
360
	const char *to_root;
361
	parray	   *source_filelist;
362
	parray	   *dest_filelist;
363
	XLogRecPtr	sync_lsn;
364
	BackupMode	backup_mode;
365
	int	thread_num;
366
	size_t	transfered_bytes;
367
	bool	completed;
368
} catchup_thread_runner_arg;
369

370
/* Catchup file copier executed in separate thread */
371
static void *
372
catchup_thread_runner(void *arg)
373
{
374
	int			i;
375
	char		from_fullpath[MAXPGPATH];
376
	char		to_fullpath[MAXPGPATH];
377

378
	catchup_thread_runner_arg *arguments = (catchup_thread_runner_arg *) arg;
379
	int 		n_files = parray_num(arguments->source_filelist);
380

381
	/* catchup a file */
382
	for (i = 0; i < n_files; i++)
383
	{
384
		pgFile	*file = (pgFile *) parray_get(arguments->source_filelist, i);
385
		pgFile	*dest_file = NULL;
386

387
		/* We have already copied all directories */
388
		if (S_ISDIR(file->mode))
389
			continue;
390

391
		if (file->excluded)
392
			continue;
393

394
		if (!pg_atomic_test_set_flag(&file->lock))
395
			continue;
396

397
		/* check for interrupt */
398
		if (interrupted || thread_interrupted)
399
			elog(ERROR, "Interrupted during catchup");
400

401
		elog(progress ? INFO : LOG, "Progress: (%d/%d). Process file \"%s\"",
402
			 i + 1, n_files, file->rel_path);
403

404
		/* construct destination filepath */
405
		Assert(file->external_dir_num == 0);
406
		join_path_components(from_fullpath, arguments->from_root, file->rel_path);
407
		join_path_components(to_fullpath, arguments->to_root, file->rel_path);
408

409
		/* Encountered some strange beast */
410
		if (!S_ISREG(file->mode))
411
			elog(WARNING, "Unexpected type %d of file \"%s\", skipping",
412
							file->mode, from_fullpath);
413

414
		/* Check that file exist in dest pgdata */
415
		if (arguments->backup_mode != BACKUP_MODE_FULL)
416
		{
417
			pgFile	**dest_file_tmp = NULL;
418
			dest_file_tmp = (pgFile **) parray_bsearch(arguments->dest_filelist,
419
											file, pgFileCompareRelPathWithExternal);
420
			if (dest_file_tmp)
421
			{
422
				/* File exists in destination PGDATA */
423
				file->exists_in_prev = true;
424
				dest_file = *dest_file_tmp;
425
			}
426
		}
427

428
		/* Do actual work */
429
		if (file->is_datafile && !file->is_cfs)
430
		{
431
			catchup_data_file(file, from_fullpath, to_fullpath,
432
								 arguments->sync_lsn,
433
								 arguments->backup_mode,
434
								 arguments->nodeInfo->checksum_version,
435
								 dest_file != NULL ? dest_file->size : 0);
436
		}
437
		else
438
		{
439
			backup_non_data_file(file, dest_file, from_fullpath, to_fullpath,
440
								 arguments->backup_mode, current.parent_backup, true);
441
		}
442

443
		/* file went missing during catchup */
444
		if (file->write_size == FILE_NOT_FOUND)
445
			continue;
446

447
		if (file->write_size == BYTES_INVALID)
448
		{
449
			elog(LOG, "Skipping the unchanged file: \"%s\", read %li bytes", from_fullpath, file->read_size);
450
			continue;
451
		}
452

453
		arguments->transfered_bytes += file->write_size;
454
		elog(LOG, "File \"%s\". Copied "INT64_FORMAT " bytes",
455
						from_fullpath, file->write_size);
456
	}
457

458
	/* ssh connection to longer needed */
459
	fio_disconnect();
460

461
	/* Data files transferring is successful */
462
	arguments->completed = true;
463

464
	return NULL;
465
}
466

467
/*
468
 * main multithreaded copier
469
 * returns size of transfered data file
470
 * or -1 in case of error
471
 */
472
static ssize_t
473
catchup_multithreaded_copy(int num_threads,
474
	PGNodeInfo *source_node_info,
475
	const char *source_pgdata_path,
476
	const char *dest_pgdata_path,
477
	parray	   *source_filelist,
478
	parray	   *dest_filelist,
479
	XLogRecPtr	sync_lsn,
480
	BackupMode	backup_mode)
481
{
482
	/* arrays with meta info for multi threaded catchup */
483
	catchup_thread_runner_arg *threads_args;
484
	pthread_t	*threads;
485

486
	bool all_threads_successful = true;
487
	ssize_t transfered_bytes_result = 0;
488
	int	i;
489

490
	/* init thread args */
491
	threads_args = (catchup_thread_runner_arg *) palloc(sizeof(catchup_thread_runner_arg) * num_threads);
492
	for (i = 0; i < num_threads; i++)
493
		threads_args[i] = (catchup_thread_runner_arg){
494
			.nodeInfo = source_node_info,
495
			.from_root = source_pgdata_path,
496
			.to_root = dest_pgdata_path,
497
			.source_filelist = source_filelist,
498
			.dest_filelist = dest_filelist,
499
			.sync_lsn = sync_lsn,
500
			.backup_mode = backup_mode,
501
			.thread_num = i + 1,
502
			.transfered_bytes = 0,
503
			.completed = false,
504
		};
505

506
	/* Run threads */
507
	thread_interrupted = false;
508
	threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
509
	if (!dry_run)
510
	{
511
		for (i = 0; i < num_threads; i++)
512
		{
513
			elog(VERBOSE, "Start thread num: %i", i);
514
			pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
515
		}
516
	}
517

518
	/* Wait threads */
519
	for (i = 0; i < num_threads; i++)
520
	{
521
		if (!dry_run)
522
			pthread_join(threads[i], NULL);
523
		all_threads_successful &= threads_args[i].completed;
524
		transfered_bytes_result += threads_args[i].transfered_bytes;
525
	}
526

527
	free(threads);
528
	free(threads_args);
529
	return all_threads_successful ? transfered_bytes_result : -1;
530
}
531

532
/*
533
 * Sync every file in destination directory to disk
534
 */
535
static void
536
catchup_sync_destination_files(const char* pgdata_path, fio_location location, parray *filelist, pgFile *pg_control_file)
537
{
538
	char    fullpath[MAXPGPATH];
539
	time_t	start_time, end_time;
540
	char	pretty_time[20];
541
	int	i;
542

543
	elog(INFO, "Syncing copied files to disk");
544
	time(&start_time);
545

546
	for (i = 0; i < parray_num(filelist); i++)
547
	{
548
		pgFile *file = (pgFile *) parray_get(filelist, i);
549

550
		/* TODO: sync directory ?
551
		 * - at first glance we can rely on fs journaling,
552
		 *   which is enabled by default on most platforms
553
		 * - but PG itself is not relying on fs, its durable_sync
554
		 *   includes directory sync
555
		 */
556
		if (S_ISDIR(file->mode) || file->excluded)
557
			continue;
558

559
		Assert(file->external_dir_num == 0);
560
		join_path_components(fullpath, pgdata_path, file->rel_path);
561
		if (fio_sync(fullpath, location) != 0)
562
			elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));
563
	}
564

565
	/*
566
	 * sync pg_control file
567
	 */
568
	join_path_components(fullpath, pgdata_path, pg_control_file->rel_path);
569
	if (fio_sync(fullpath, location) != 0)
570
		elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));
571

572
	time(&end_time);
573
	pretty_time_interval(difftime(end_time, start_time),
574
						 pretty_time, lengthof(pretty_time));
575
	elog(INFO, "Files are synced, time elapsed: %s", pretty_time);
576
}
577

578
/*
579
 * Filter filelist helper function (used to process --exclude-path's)
580
 * filelist -- parray of pgFile *, can't be NULL
581
 * exclude_absolute_paths_list -- sorted parray of char * (absolute paths, starting with '/'), can be NULL
582
 * exclude_relative_paths_list -- sorted parray of char * (relative paths), can be NULL
583
 * logging_string -- helper parameter, used for generating verbose log messages ("Source" or "Destination")
584
 */
585
static void
586
filter_filelist(parray *filelist, const char *pgdata,
587
	parray *exclude_absolute_paths_list, parray *exclude_relative_paths_list,
588
	const char *logging_string)
589
{
590
	int i;
591

592
	if (exclude_absolute_paths_list == NULL && exclude_relative_paths_list == NULL)
593
		return;
594

595
	for (i = 0; i < parray_num(filelist); ++i)
596
	{
597
		char	full_path[MAXPGPATH];
598
		pgFile *file = (pgFile *) parray_get(filelist, i);
599
		join_path_components(full_path, pgdata, file->rel_path);
600

601
		if (
602
			(exclude_absolute_paths_list != NULL
603
			&& parray_bsearch(exclude_absolute_paths_list, full_path, pgPrefixCompareString)!= NULL
604
			) || (
605
			exclude_relative_paths_list != NULL
606
			&& parray_bsearch(exclude_relative_paths_list, file->rel_path, pgPrefixCompareString)!= NULL)
607
			)
608
		{
609
			elog(INFO, "%s file \"%s\" excluded with --exclude-path option", logging_string, full_path);
610
			file->excluded = true;
611
		}
612
	}
613
}
614

615
/*
616
 * Entry point of pg_probackup CATCHUP subcommand.
617
 * exclude_*_paths_list are parray's of char *
618
 */
619
int
620
do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files,
621
	parray *exclude_absolute_paths_list, parray *exclude_relative_paths_list)
622
{
623
	PGconn		*source_conn = NULL;
624
	PGNodeInfo	source_node_info;
625
	bool		backup_logs = false;
626
	parray	*source_filelist = NULL;
627
	pgFile	*source_pg_control_file = NULL;
628
	parray	*dest_filelist = NULL;
629
	char	dest_xlog_path[MAXPGPATH];
630

631
	RedoParams	dest_redo = { 0, InvalidXLogRecPtr, 0 };
632
	PGStopBackupResult	stop_backup_result;
633
	bool		catchup_isok = true;
634

635
	int			i;
636

637
	/* for fancy reporting */
638
	time_t		start_time, end_time;
639
	ssize_t		transfered_datafiles_bytes = 0;
640
	ssize_t		transfered_walfiles_bytes = 0;
641
	char		pretty_source_bytes[20];
642

643
	source_conn = catchup_init_state(&source_node_info, source_pgdata, dest_pgdata);
644
	catchup_preflight_checks(&source_node_info, source_conn, source_pgdata, dest_pgdata);
645

646
	/* we need to sort --exclude_path's for future searching */
647
	if (exclude_absolute_paths_list != NULL)
648
		parray_qsort(exclude_absolute_paths_list, pgCompareString);
649
	if (exclude_relative_paths_list != NULL)
650
		parray_qsort(exclude_relative_paths_list, pgCompareString);
651

652
	elog(INFO, "Database catchup start");
653

654
	if (current.backup_mode != BACKUP_MODE_FULL)
655
	{
656
		dest_filelist = parray_new();
657
		dir_list_file(dest_filelist, dest_pgdata,
658
			true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);
659
		filter_filelist(dest_filelist, dest_pgdata, exclude_absolute_paths_list, exclude_relative_paths_list, "Destination");
660

661
		// fill dest_redo.lsn and dest_redo.tli
662
		get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);
663
		elog(INFO, "syncLSN = %X/%X", (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn);
664

665
		/*
666
		 * Future improvement to catch partial catchup:
667
		 *  1. rename dest pg_control into something like pg_control.pbk
668
		 *   (so user can't start partial catchup'ed instance from this point)
669
		 *  2. try to read by get_redo() pg_control and pg_control.pbk (to detect partial catchup)
670
		 *  3. at the end (after copy of correct pg_control), remove pg_control.pbk
671
		 */
672
	}
673

674
	/*
675
	 * Make sure that sync point is withing ptrack tracking range
676
	 * TODO: move to separate function to use in both backup.c and catchup.c
677
	 */
678
	if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
679
	{
680
		XLogRecPtr	ptrack_lsn = get_last_ptrack_lsn(source_conn, &source_node_info);
681

682
		if (ptrack_lsn > dest_redo.lsn || ptrack_lsn == InvalidXLogRecPtr)
683
			elog(ERROR, "LSN from ptrack_control in source %X/%X is greater than checkpoint LSN in destination %X/%X.\n"
684
						"You can perform only FULL catchup.",
685
						(uint32) (ptrack_lsn >> 32), (uint32) (ptrack_lsn),
686
						(uint32) (dest_redo.lsn >> 32),
687
						(uint32) (dest_redo.lsn));
688
	}
689

690
	{
691
		char		label[1024];
692
		/* notify start of backup to PostgreSQL server */
693
		time2iso(label, lengthof(label), current.start_time, false);
694
		strncat(label, " with pg_probackup", lengthof(label) -
695
				strlen(" with pg_probackup"));
696

697
		/* Call pg_start_backup function in PostgreSQL connect */
698
		pg_start_backup(label, smooth_checkpoint, &current, &source_node_info, source_conn);
699
		elog(INFO, "pg_start_backup START LSN %X/%X", (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn));
700
	}
701

702
	/* Sanity: source cluster must be "in future" relatively to dest cluster */
703
	if (current.backup_mode != BACKUP_MODE_FULL &&
704
		dest_redo.lsn > current.start_lsn)
705
			elog(ERROR, "Current START LSN %X/%X is lower than SYNC LSN %X/%X, "
706
				"it may indicate that we are trying to catchup with PostgreSQL instance from the past",
707
				(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
708
				(uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn));
709

710
	/* Start stream replication */
711
	join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
712
	if (!dry_run)
713
	{
714
		fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
715
		start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
716
										current.start_lsn, current.tli, false);
717
	}
718
	else
719
		elog(INFO, "WAL streaming skipping with --dry-run option");
720

721
	source_filelist = parray_new();
722

723
	/* list files with the logical path. omit $PGDATA */
724
	if (fio_is_remote(FIO_DB_HOST))
725
		fio_list_dir(source_filelist, source_pgdata,
726
					 true, true, false, backup_logs, true, 0);
727
	else
728
		dir_list_file(source_filelist, source_pgdata,
729
					  true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);
730

731
	//REVIEW FIXME. Let's fix that before release.
732
	// TODO what if wal is not a dir (symlink to a dir)?
733
	// - Currently backup/restore transform pg_wal symlink to directory
734
	//   so the problem is not only with catchup.
735
	//   if we want to make it right - we must provide the way
736
	//   for symlink remapping during restore and catchup.
737
	//   By default everything must be left as it is.
738

739
	/* close ssh session in main thread */
740
	fio_disconnect();
741

742
	/*
743
	 * Sort pathname ascending. It is necessary to create intermediate
744
	 * directories sequentially.
745
	 *
746
	 * For example:
747
	 * 1 - create 'base'
748
	 * 2 - create 'base/1'
749
	 *
750
	 * Sorted array is used at least in parse_filelist_filenames(),
751
	 * extractPageMap(), make_pagemap_from_ptrack().
752
	 */
753
	parray_qsort(source_filelist, pgFileCompareRelPathWithExternal);
754

755
	//REVIEW Do we want to do similar calculation for dest?
756
	//REVIEW_ANSWER what for?
757
	{
758
		ssize_t	source_bytes = 0;
759
		char	pretty_bytes[20];
760

761
		source_bytes += calculate_datasize_of_filelist(source_filelist);
762

763
		/* Extract information about files in source_filelist parsing their names:*/
764
		parse_filelist_filenames(source_filelist, source_pgdata);
765
		filter_filelist(source_filelist, source_pgdata, exclude_absolute_paths_list, exclude_relative_paths_list, "Source");
766

767
		current.pgdata_bytes += calculate_datasize_of_filelist(source_filelist);
768

769
		pretty_size(current.pgdata_bytes, pretty_source_bytes, lengthof(pretty_source_bytes));
770
		pretty_size(source_bytes - current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));
771
		elog(INFO, "Source PGDATA size: %s (excluded %s)", pretty_source_bytes, pretty_bytes);
772
	}
773

774
	elog(INFO, "Start LSN (source): %X/%X, TLI: %X",
775
			(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
776
			current.tli);
777
	if (current.backup_mode != BACKUP_MODE_FULL)
778
		elog(INFO, "LSN in destination: %X/%X, TLI: %X",
779
			 (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn),
780
			 dest_redo.tli);
781

782
	/* Build page mapping in PTRACK mode */
783
	if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
784
	{
785
		time(&start_time);
786
		elog(INFO, "Extracting pagemap of changed blocks");
787

788
		/* Build the page map from ptrack information */
789
		make_pagemap_from_ptrack_2(source_filelist, source_conn,
790
									source_node_info.ptrack_schema,
791
									source_node_info.ptrack_version_num,
792
									dest_redo.lsn);
793
		time(&end_time);
794
		elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
795
			 difftime(end_time, start_time));
796
	}
797

798
	/*
799
	 * Make directories before catchup
800
	 */
801
	/*
802
	 * We iterate over source_filelist and for every directory with parent 'pg_tblspc'
803
	 * we must lookup this directory name in tablespace map.
804
	 * If we got a match, we treat this directory as tablespace.
805
	 * It means that we create directory specified in tablespace map and
806
	 * original directory created as symlink to it.
807
	 */
808
	for (i = 0; i < parray_num(source_filelist); i++)
809
	{
810
		pgFile	   *file = (pgFile *) parray_get(source_filelist, i);
811
		char parent_dir[MAXPGPATH];
812

813
		if (!S_ISDIR(file->mode) || file->excluded)
814
			continue;
815

816
		/*
817
		 * check if it is fake "directory" and is a tablespace link
818
		 * this is because we passed the follow_symlink when building the list
819
		 */
820
		/* get parent dir of rel_path */
821
		strncpy(parent_dir, file->rel_path, MAXPGPATH);
822
		get_parent_directory(parent_dir);
823

824
		/* check if directory is actually link to tablespace */
825
		if (strcmp(parent_dir, PG_TBLSPC_DIR) != 0)
826
		{
827
			/* if the entry is a regular directory, create it in the destination */
828
			char		dirpath[MAXPGPATH];
829

830
			join_path_components(dirpath, dest_pgdata, file->rel_path);
831

832
			elog(LOG, "Create directory '%s'", dirpath);
833
			if (!dry_run)
834
				fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
835
		}
836
		else
837
		{
838
			/* this directory located in pg_tblspc */
839
			const char *linked_path = NULL;
840
			char	to_path[MAXPGPATH];
841

842
			// TODO perform additional check that this is actually symlink?
843
			{ /* get full symlink path and map this path to new location */
844
				char	source_full_path[MAXPGPATH];
845
				char	symlink_content[MAXPGPATH];
846
				join_path_components(source_full_path, source_pgdata, file->rel_path);
847
				fio_readlink(source_full_path, symlink_content, sizeof(symlink_content), FIO_DB_HOST);
848
				/* we checked that mapping exists in preflight_checks for local catchup */
849
				linked_path = get_tablespace_mapping(symlink_content);
850
				elog(INFO, "Map tablespace full_path: \"%s\" old_symlink_content: \"%s\" new_symlink_content: \"%s\"\n",
851
					source_full_path,
852
					symlink_content,
853
					linked_path);
854
			}
855

856
			if (!is_absolute_path(linked_path))
857
				elog(ERROR, "Tablespace directory path must be an absolute path: %s\n",
858
						 linked_path);
859

860
			join_path_components(to_path, dest_pgdata, file->rel_path);
861

862
			elog(INFO, "Create directory \"%s\" and symbolic link \"%s\"",
863
					 linked_path, to_path);
864

865
			if (!dry_run)
866
			{
867
				/* create tablespace directory */
868
				if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
869
					elog(ERROR, "Could not create tablespace directory \"%s\": %s",
870
						 linked_path, strerror(errno));
871

872
				/* create link to linked_path */
873
				if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
874
					elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
875
						 linked_path, to_path, strerror(errno));
876
			}
877
		}
878
	}
879

880
	/*
881
	 * find pg_control file (in already sorted source_filelist)
882
	 * and exclude it from list for future special processing
883
	 */
884
	{
885
		int control_file_elem_index;
886
		pgFile search_key;
887
		MemSet(&search_key, 0, sizeof(pgFile));
888
		/* pgFileCompareRelPathWithExternal uses only .rel_path and .external_dir_num for comparision */
889
		search_key.rel_path = XLOG_CONTROL_FILE;
890
		search_key.external_dir_num = 0;
891
		control_file_elem_index = parray_bsearch_index(source_filelist, &search_key, pgFileCompareRelPathWithExternal);
892
		if(control_file_elem_index < 0)
893
			elog(ERROR, "\"%s\" not found in \"%s\"\n", XLOG_CONTROL_FILE, source_pgdata);
894
		source_pg_control_file = parray_remove(source_filelist, control_file_elem_index);
895
	}
896

897
	/* TODO before public release: must be more careful with pg_control.
898
	 *       when running catchup or incremental restore
899
	 *       cluster is actually in two states
900
	 *       simultaneously - old and new, so
901
	 *       it must contain both pg_control files
902
	 *       describing those states: global/pg_control_old, global/pg_control_new
903
	 *       1. This approach will provide us with means of
904
	 *          robust detection of previos failures and thus correct operation retrying (or forbidding).
905
	 *       2. We will have the ability of preventing instance from starting
906
	 *          in the middle of our operations.
907
	 */
908

909
	/*
910
	 * remove absent source files in dest (dropped tables, etc...)
911
	 * note: global/pg_control will also be deleted here
912
	 * mark dest files (that excluded with source --exclude-path) also for exclusion
913
	 */
914
	if (current.backup_mode != BACKUP_MODE_FULL)
915
	{
916
		elog(INFO, "Removing redundant files in destination directory");
917
		parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc);
918
		for (i = 0; i < parray_num(dest_filelist); i++)
919
		{
920
			bool     redundant = true;
921
			pgFile	*file = (pgFile *) parray_get(dest_filelist, i);
922
			pgFile	**src_file = NULL;
923

924
			//TODO optimize it and use some merge-like algorithm
925
			//instead of bsearch for each file.
926
			src_file = (pgFile **) parray_bsearch(source_filelist, file, pgFileCompareRelPathWithExternal);
927

928
			if (src_file!= NULL && !(*src_file)->excluded && file->excluded)
929
				(*src_file)->excluded = true;
930

931
			if (src_file!= NULL || file->excluded)
932
				redundant = false;
933

934
			/* pg_filenode.map are always copied, because it's crc cannot be trusted */
935
			Assert(file->external_dir_num == 0);
936
			if (pg_strcasecmp(file->name, RELMAPPER_FILENAME) == 0)
937
				redundant = true;
938

939
			/* if file does not exists in destination list, then we can safely unlink it */
940
			if (redundant)
941
			{
942
				char		fullpath[MAXPGPATH];
943

944
				join_path_components(fullpath, dest_pgdata, file->rel_path);
945
				if (!dry_run)
946
				{
947
					fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
948
				}
949
				elog(LOG, "Deleted file \"%s\"", fullpath);
950

951
				/* shrink dest pgdata list */
952
				pgFileFree(file);
953
				parray_remove(dest_filelist, i);
954
				i--;
955
			}
956
		}
957
	}
958

959
	/* clear file locks */
960
	pfilearray_clear_locks(source_filelist);
961

962
	/* Sort by size for load balancing */
963
	parray_qsort(source_filelist, pgFileCompareSizeDesc);
964

965
	/* Sort the array for binary search */
966
	if (dest_filelist)
967
		parray_qsort(dest_filelist, pgFileCompareRelPathWithExternal);
968

969
	/* run copy threads */
970
	elog(INFO, "Start transferring data files");
971
	time(&start_time);
972
	transfered_datafiles_bytes = catchup_multithreaded_copy(num_threads, &source_node_info,
973
		source_pgdata, dest_pgdata,
974
		source_filelist, dest_filelist,
975
		dest_redo.lsn, current.backup_mode);
976
	catchup_isok = transfered_datafiles_bytes != -1;
977

978
	/* at last copy control file */
979
	if (catchup_isok && !dry_run)
980
	{
981
		char	from_fullpath[MAXPGPATH];
982
		char	to_fullpath[MAXPGPATH];
983
		join_path_components(from_fullpath, source_pgdata, source_pg_control_file->rel_path);
984
		join_path_components(to_fullpath, dest_pgdata, source_pg_control_file->rel_path);
985
		copy_pgcontrol_file(from_fullpath, FIO_DB_HOST,
986
				to_fullpath, FIO_LOCAL_HOST, source_pg_control_file);
987
		transfered_datafiles_bytes += source_pg_control_file->size;
988
	}
989

990
	if (!catchup_isok && !dry_run)
991
	{
992
		char	pretty_time[20];
993
		char	pretty_transfered_data_bytes[20];
994

995
		time(&end_time);
996
		pretty_time_interval(difftime(end_time, start_time),
997
						 pretty_time, lengthof(pretty_time));
998
		pretty_size(transfered_datafiles_bytes, pretty_transfered_data_bytes, lengthof(pretty_transfered_data_bytes));
999

1000
		elog(ERROR, "Catchup failed. Transfered: %s, time elapsed: %s",
1001
			pretty_transfered_data_bytes, pretty_time);
1002
	}
1003

1004
	/* Notify end of backup */
1005
	{
1006
		//REVIEW Is it relevant to catchup? I suppose it isn't, since catchup is a new code.
1007
		//If we do need it, please write a comment explaining that.
1008
		/* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */
1009
		int	     timeout = (instance_config.archive_timeout > 0) ?
1010
					instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT;
1011
		char    *stop_backup_query_text = NULL;
1012

1013
		pg_silent_client_messages(source_conn);
1014

1015
		/* Execute pg_stop_backup using PostgreSQL connection */
1016
		pg_stop_backup_send(source_conn, source_node_info.server_version, current.from_replica, exclusive_backup, &stop_backup_query_text);
1017

1018
		/*
1019
		 * Wait for the result of pg_stop_backup(), but no longer than
1020
		 * archive_timeout seconds
1021
		 */
1022
		pg_stop_backup_consume(source_conn, source_node_info.server_version, exclusive_backup, timeout, stop_backup_query_text, &stop_backup_result);
1023

1024
		/* Cleanup */
1025
		pg_free(stop_backup_query_text);
1026
	}
1027

1028
	if (!dry_run)
1029
		wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
1030

1031
#if PG_VERSION_NUM >= 90600
1032
	/* Write backup_label */
1033
	Assert(stop_backup_result.backup_label_content != NULL);
1034
	if (!dry_run)
1035
	{
1036
		pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
1037
			stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
1038
			NULL);
1039
	}
1040
	free(stop_backup_result.backup_label_content);
1041
	stop_backup_result.backup_label_content = NULL;
1042
	stop_backup_result.backup_label_content_len = 0;
1043

1044
	/* tablespace_map */
1045
	if (stop_backup_result.tablespace_map_content != NULL)
1046
	{
1047
		// TODO what if tablespace is created during catchup?
1048
		/* Because we have already created symlinks in pg_tblspc earlier,
1049
		 * we do not need to write the tablespace_map file.
1050
		 * So this call is unnecessary:
1051
		 * pg_stop_backup_write_file_helper(dest_pgdata, PG_TABLESPACE_MAP_FILE, "tablespace map",
1052
		 *	stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len,
1053
		 *	NULL);
1054
		 */
1055
		free(stop_backup_result.tablespace_map_content);
1056
		stop_backup_result.tablespace_map_content = NULL;
1057
		stop_backup_result.tablespace_map_content_len = 0;
1058
	}
1059
#endif
1060

1061
	/* wait for end of wal streaming and calculate wal size transfered */
1062
	if (!dry_run)
1063
	{
1064
		parray *wal_files_list = NULL;
1065
		wal_files_list = parray_new();
1066

1067
		if (wait_WAL_streaming_end(wal_files_list))
1068
			elog(ERROR, "WAL streaming failed");
1069

1070
		for (i = 0; i < parray_num(wal_files_list); i++)
1071
		{
1072
			pgFile *file = (pgFile *) parray_get(wal_files_list, i);
1073
			transfered_walfiles_bytes += file->size;
1074
		}
1075

1076
		parray_walk(wal_files_list, pgFileFree);
1077
		parray_free(wal_files_list);
1078
		wal_files_list = NULL;
1079
	}
1080

1081
	/*
1082
	 * In case of backup from replica >= 9.6 we must fix minRecPoint
1083
	 */
1084
	if (current.from_replica && !exclusive_backup)
1085
	{
1086
		set_min_recovery_point(source_pg_control_file, dest_pgdata, current.stop_lsn);
1087
	}
1088

1089
	/* close ssh session in main thread */
1090
	fio_disconnect();
1091

1092
	/* fancy reporting */
1093
	{
1094
		char	pretty_transfered_data_bytes[20];
1095
		char	pretty_transfered_wal_bytes[20];
1096
		char	pretty_time[20];
1097

1098
		time(&end_time);
1099
		pretty_time_interval(difftime(end_time, start_time),
1100
							 pretty_time, lengthof(pretty_time));
1101
		pretty_size(transfered_datafiles_bytes, pretty_transfered_data_bytes, lengthof(pretty_transfered_data_bytes));
1102
		pretty_size(transfered_walfiles_bytes, pretty_transfered_wal_bytes, lengthof(pretty_transfered_wal_bytes));
1103

1104
		elog(INFO, "Databases synchronized. Transfered datafiles size: %s, transfered wal size: %s, time elapsed: %s",
1105
			pretty_transfered_data_bytes, pretty_transfered_wal_bytes, pretty_time);
1106

1107
		if (current.backup_mode != BACKUP_MODE_FULL)
1108
			elog(INFO, "Catchup incremental ratio (less is better): %.f%% (%s/%s)",
1109
				((float) transfered_datafiles_bytes / current.pgdata_bytes) * 100,
1110
				pretty_transfered_data_bytes, pretty_source_bytes);
1111
	}
1112

1113
	/* Sync all copied files unless '--no-sync' flag is used */
1114
	if (sync_dest_files && !dry_run)
1115
		catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
1116
	else
1117
		elog(WARNING, "Files are not synced to disk");
1118

1119
	/* Cleanup */
1120
	if (dest_filelist && !dry_run)
1121
	{
1122
		parray_walk(dest_filelist, pgFileFree);
1123
	}
1124
	parray_free(dest_filelist);
1125
	parray_walk(source_filelist, pgFileFree);
1126
	parray_free(source_filelist);
1127
	pgFileFree(source_pg_control_file);
1128

1129
	return 0;
1130
}
1131

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.