pg_probackup
1130 строк · 36.8 Кб
1/*-------------------------------------------------------------------------
2*
3* catchup.c: sync DB cluster
4*
5* Copyright (c) 2021-2022, Postgres Professional
6*
7*-------------------------------------------------------------------------
8*/
9
10#include "pg_probackup.h"11
12#if PG_VERSION_NUM < 11000013#include "catalog/catalog.h"14#endif15#include "catalog/pg_tablespace.h"16#include "access/timeline.h"17#include "pgtar.h"18#include "streamutil.h"19
20#include <sys/stat.h>21#include <time.h>22#include <unistd.h>23
24#include "utils/thread.h"25#include "utils/file.h"26
27/*
28* Catchup routines
29*/
30static PGconn *catchup_init_state(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata);31static void catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, const char *source_pgdata,32const char *dest_pgdata);33static void catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn);34static parray* catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli);35
36//REVIEW I'd also suggest to wrap all these fields into some CatchupState, but it isn't urgent.
37//REVIEW_ANSWER what for?
38/*
39* Prepare for work: fill some globals, open connection to source database
40*/
41static PGconn *42catchup_init_state(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata)43{
44PGconn *source_conn;45
46/* Initialize PGInfonode */47pgNodeInit(source_node_info);48
49/* Get WAL segments size and system ID of source PG instance */50instance_config.xlog_seg_size = get_xlog_seg_size(source_pgdata);51instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST, false);52current.start_time = time(NULL);53
54strlcpy(current.program_version, PROGRAM_VERSION, sizeof(current.program_version));55
56/* Do some compatibility checks and fill basic info about PG instance */57source_conn = pgdata_basic_setup(instance_config.conn_opt, source_node_info);58
59#if PG_VERSION_NUM >= 11000060if (!RetrieveWalSegSize(source_conn))61elog(ERROR, "Failed to retrieve wal_segment_size");62#endif63
64get_ptrack_version(source_conn, source_node_info);65if (source_node_info->ptrack_version_num > 0)66source_node_info->is_ptrack_enabled = pg_is_ptrack_enabled(source_conn, source_node_info->ptrack_version_num);67
68/* Obtain current timeline */69#if PG_VERSION_NUM >= 9060070current.tli = get_current_timeline(source_conn);71#else72instance_config.pgdata = source_pgdata;73current.tli = get_current_timeline_from_control(source_pgdata, FIO_DB_HOST, false);74#endif75
76elog(INFO, "Catchup start, pg_probackup version: %s, "77"PostgreSQL version: %s, "78"remote: %s, source-pgdata: %s, destination-pgdata: %s",79PROGRAM_VERSION, source_node_info->server_version_str,80IsSshProtocol() ? "true" : "false",81source_pgdata, dest_pgdata);82
83if (current.from_replica)84elog(INFO, "Running catchup from standby");85
86return source_conn;87}
88
89/*
90* Check that catchup can be performed on source and dest
91* this function is for checks, that can be performed without modification of data on disk
92*/
93static void94catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn,95const char *source_pgdata, const char *dest_pgdata)96{
97/* TODO98* gsmol - fallback to FULL mode if dest PGDATA is empty
99* kulaginm -- I think this is a harmful feature. If user requested an incremental catchup, then
100* he expects that this will be done quickly and efficiently. If, for example, he made a mistake
101* with dest_dir, then he will receive a second full copy instead of an error message, and I think
102* that in some cases he would prefer the error.
103* I propose in future versions to offer a backup_mode auto, in which we will look to the dest_dir
104* and decide which of the modes will be the most effective.
105* I.e.:
106* if(requested_backup_mode == BACKUP_MODE_DIFF_AUTO)
107* {
108* if(dest_pgdata_is_empty)
109* backup_mode = BACKUP_MODE_FULL;
110* else
111* if(ptrack supported and applicable)
112* backup_mode = BACKUP_MODE_DIFF_PTRACK;
113* else
114* backup_mode = BACKUP_MODE_DIFF_DELTA;
115* }
116*/
117
118if (dir_is_empty(dest_pgdata, FIO_LOCAL_HOST))119{120if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK ||121current.backup_mode == BACKUP_MODE_DIFF_DELTA)122elog(ERROR, "\"%s\" is empty, but incremental catchup mode requested.",123dest_pgdata);124}125else /* dest dir not empty */126{127if (current.backup_mode == BACKUP_MODE_FULL)128elog(ERROR, "Can't perform full catchup into non-empty directory \"%s\".",129dest_pgdata);130}131
132/* check that postmaster is not running in destination */133if (current.backup_mode != BACKUP_MODE_FULL)134{135pid_t pid;136pid = fio_check_postmaster(dest_pgdata, FIO_LOCAL_HOST);137if (pid == 1) /* postmaster.pid is mangled */138{139char pid_filename[MAXPGPATH];140join_path_components(pid_filename, dest_pgdata, "postmaster.pid");141elog(ERROR, "Pid file \"%s\" is mangled, cannot determine whether postmaster is running or not",142pid_filename);143}144else if (pid > 1) /* postmaster is up */145{146elog(ERROR, "Postmaster with pid %u is running in destination directory \"%s\"",147pid, dest_pgdata);148}149}150
151/* check backup_label absence in dest */152if (current.backup_mode != BACKUP_MODE_FULL)153{154char backup_label_filename[MAXPGPATH];155
156join_path_components(backup_label_filename, dest_pgdata, PG_BACKUP_LABEL_FILE);157if (fio_access(backup_label_filename, F_OK, FIO_LOCAL_HOST) == 0)158elog(ERROR, "Destination directory contains \"" PG_BACKUP_LABEL_FILE "\" file");159}160
161/* Check that connected PG instance, source and destination PGDATA are the same */162{163uint64 source_conn_id, source_id, dest_id;164
165source_conn_id = get_remote_system_identifier(source_conn);166source_id = get_system_identifier(source_pgdata, FIO_DB_HOST, false); /* same as instance_config.system_identifier */167
168if (source_conn_id != source_id)169elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",170source_conn_id, source_pgdata, source_id);171
172if (current.backup_mode != BACKUP_MODE_FULL)173{174dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST, false);175if (source_conn_id != dest_id)176elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",177source_conn_id, dest_pgdata, dest_id);178}179}180
181/* check PTRACK version */182if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)183{184if (source_node_info->ptrack_version_num == 0)185elog(ERROR, "This PostgreSQL instance does not support ptrack");186else if (source_node_info->ptrack_version_num < 200)187elog(ERROR, "Ptrack extension is too old.\n"188"Upgrade ptrack to version >= 2");189else if (!source_node_info->is_ptrack_enabled)190elog(ERROR, "Ptrack is disabled");191}192
193if (current.from_replica && exclusive_backup)194elog(ERROR, "Catchup from standby is only available for PostgreSQL >= 9.6");195
196/* check that we don't overwrite tablespace in source pgdata */197catchup_check_tablespaces_existance_in_tbsmapping(source_conn);198
199/* check timelines */200if (current.backup_mode != BACKUP_MODE_FULL)201{202RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 };203
204/* fill dest_redo.lsn and dest_redo.tli */205get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);206elog(LOG, "source.tli = %X, dest_redo.lsn = %X/%X, dest_redo.tli = %X",207current.tli, (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn, dest_redo.tli);208
209if (current.tli != 1)210{211parray *source_timelines; /* parray* of TimeLineHistoryEntry* */212source_timelines = catchup_get_tli_history(&instance_config.conn_opt, current.tli);213
214if (source_timelines == NULL)215elog(ERROR, "Cannot get source timeline history");216
217if (!satisfy_timeline(source_timelines, dest_redo.tli, dest_redo.lsn))218elog(ERROR, "Destination is not in source timeline history");219
220parray_walk(source_timelines, pfree);221parray_free(source_timelines);222}223else /* special case -- no history files in source */224{225if (dest_redo.tli != 1)226elog(ERROR, "Source is behind destination in timeline history");227}228}229}
230
231/*
232* Check that all tablespaces exists in tablespace mapping (--tablespace-mapping option)
233* Check that all local mapped directories is empty if it is local FULL catchup
234* Emit fatal error if that (not existent in map or not empty) tablespace found
235*/
236static void237catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn)238{
239PGresult *res;240int i;241char *tablespace_path = NULL;242const char *linked_path = NULL;243char *query = "SELECT pg_catalog.pg_tablespace_location(oid) "244"FROM pg_catalog.pg_tablespace "245"WHERE pg_catalog.pg_tablespace_location(oid) <> '';";246
247res = pgut_execute(conn, query, 0, NULL);248
249if (!res)250elog(ERROR, "Failed to get list of tablespaces");251
252for (i = 0; i < res->ntups; i++)253{254tablespace_path = PQgetvalue(res, i, 0);255Assert (strlen(tablespace_path) > 0);256
257canonicalize_path(tablespace_path);258linked_path = get_tablespace_mapping(tablespace_path);259
260if (strcmp(tablespace_path, linked_path) == 0)261/* same result -> not found in mapping */262{263if (!fio_is_remote(FIO_DB_HOST))264elog(ERROR, "Local catchup executed, but source database contains "265"tablespace (\"%s\"), that is not listed in the map", tablespace_path);266else267elog(WARNING, "Remote catchup executed and source database contains "268"tablespace (\"%s\"), that is not listed in the map", tablespace_path);269}270
271if (!is_absolute_path(linked_path))272elog(ERROR, "Tablespace directory path must be an absolute path: \"%s\"",273linked_path);274
275if (current.backup_mode == BACKUP_MODE_FULL276&& !dir_is_empty(linked_path, FIO_LOCAL_HOST))277elog(ERROR, "Target mapped tablespace directory (\"%s\") is not empty in FULL catchup",278linked_path);279}280PQclear(res);281}
282
283/*
284* Get timeline history via replication connection
285* returns parray* of TimeLineHistoryEntry*
286*/
287static parray*288catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli)289{
290PGresult *res;291PGconn *conn;292char *history;293char query[128];294parray *result = NULL;295TimeLineHistoryEntry *entry = NULL;296
297snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", tli);298
299/*300* Connect in replication mode to the server.
301*/
302conn = pgut_connect_replication(conn_opt->pghost,303conn_opt->pgport,304conn_opt->pgdatabase,305conn_opt->pguser,306false);307
308if (!conn)309return NULL;310
311res = PQexec(conn, query);312PQfinish(conn);313
314if (PQresultStatus(res) != PGRES_TUPLES_OK)315{316elog(WARNING, "Could not send replication command \"%s\": %s",317query, PQresultErrorMessage(res));318PQclear(res);319return NULL;320}321
322/*323* The response to TIMELINE_HISTORY is a single row result set
324* with two fields: filename and content
325*/
326if (PQnfields(res) != 2 || PQntuples(res) != 1)327{328elog(ERROR, "Unexpected response to TIMELINE_HISTORY command: "329"got %d rows and %d fields, expected %d rows and %d fields",330PQntuples(res), PQnfields(res), 1, 2);331PQclear(res);332return NULL;333}334
335history = pgut_strdup(PQgetvalue(res, 0, 1));336result = parse_tli_history_buffer(history, tli);337
338/* some cleanup */339pg_free(history);340PQclear(res);341
342/* append last timeline entry (as read_timeline_history() do) */343entry = pgut_new(TimeLineHistoryEntry);344entry->tli = tli;345entry->end = InvalidXLogRecPtr;346parray_insert(result, 0, entry);347
348return result;349}
350
351/*
352* catchup multithreaded copy rountine and helper structure and function
353*/
354
355/* parameters for catchup_thread_runner() passed from catchup_multithreaded_copy() */
356typedef struct357{
358PGNodeInfo *nodeInfo;359const char *from_root;360const char *to_root;361parray *source_filelist;362parray *dest_filelist;363XLogRecPtr sync_lsn;364BackupMode backup_mode;365int thread_num;366size_t transfered_bytes;367bool completed;368} catchup_thread_runner_arg;369
370/* Catchup file copier executed in separate thread */
371static void *372catchup_thread_runner(void *arg)373{
374int i;375char from_fullpath[MAXPGPATH];376char to_fullpath[MAXPGPATH];377
378catchup_thread_runner_arg *arguments = (catchup_thread_runner_arg *) arg;379int n_files = parray_num(arguments->source_filelist);380
381/* catchup a file */382for (i = 0; i < n_files; i++)383{384pgFile *file = (pgFile *) parray_get(arguments->source_filelist, i);385pgFile *dest_file = NULL;386
387/* We have already copied all directories */388if (S_ISDIR(file->mode))389continue;390
391if (file->excluded)392continue;393
394if (!pg_atomic_test_set_flag(&file->lock))395continue;396
397/* check for interrupt */398if (interrupted || thread_interrupted)399elog(ERROR, "Interrupted during catchup");400
401elog(progress ? INFO : LOG, "Progress: (%d/%d). Process file \"%s\"",402i + 1, n_files, file->rel_path);403
404/* construct destination filepath */405Assert(file->external_dir_num == 0);406join_path_components(from_fullpath, arguments->from_root, file->rel_path);407join_path_components(to_fullpath, arguments->to_root, file->rel_path);408
409/* Encountered some strange beast */410if (!S_ISREG(file->mode))411elog(WARNING, "Unexpected type %d of file \"%s\", skipping",412file->mode, from_fullpath);413
414/* Check that file exist in dest pgdata */415if (arguments->backup_mode != BACKUP_MODE_FULL)416{417pgFile **dest_file_tmp = NULL;418dest_file_tmp = (pgFile **) parray_bsearch(arguments->dest_filelist,419file, pgFileCompareRelPathWithExternal);420if (dest_file_tmp)421{422/* File exists in destination PGDATA */423file->exists_in_prev = true;424dest_file = *dest_file_tmp;425}426}427
428/* Do actual work */429if (file->is_datafile && !file->is_cfs)430{431catchup_data_file(file, from_fullpath, to_fullpath,432arguments->sync_lsn,433arguments->backup_mode,434arguments->nodeInfo->checksum_version,435dest_file != NULL ? dest_file->size : 0);436}437else438{439backup_non_data_file(file, dest_file, from_fullpath, to_fullpath,440arguments->backup_mode, current.parent_backup, true);441}442
443/* file went missing during catchup */444if (file->write_size == FILE_NOT_FOUND)445continue;446
447if (file->write_size == BYTES_INVALID)448{449elog(LOG, "Skipping the unchanged file: \"%s\", read %li bytes", from_fullpath, file->read_size);450continue;451}452
453arguments->transfered_bytes += file->write_size;454elog(LOG, "File \"%s\". Copied "INT64_FORMAT " bytes",455from_fullpath, file->write_size);456}457
458/* ssh connection to longer needed */459fio_disconnect();460
461/* Data files transferring is successful */462arguments->completed = true;463
464return NULL;465}
466
467/*
468* main multithreaded copier
469* returns size of transfered data file
470* or -1 in case of error
471*/
472static ssize_t473catchup_multithreaded_copy(int num_threads,474PGNodeInfo *source_node_info,475const char *source_pgdata_path,476const char *dest_pgdata_path,477parray *source_filelist,478parray *dest_filelist,479XLogRecPtr sync_lsn,480BackupMode backup_mode)481{
482/* arrays with meta info for multi threaded catchup */483catchup_thread_runner_arg *threads_args;484pthread_t *threads;485
486bool all_threads_successful = true;487ssize_t transfered_bytes_result = 0;488int i;489
490/* init thread args */491threads_args = (catchup_thread_runner_arg *) palloc(sizeof(catchup_thread_runner_arg) * num_threads);492for (i = 0; i < num_threads; i++)493threads_args[i] = (catchup_thread_runner_arg){494.nodeInfo = source_node_info,495.from_root = source_pgdata_path,496.to_root = dest_pgdata_path,497.source_filelist = source_filelist,498.dest_filelist = dest_filelist,499.sync_lsn = sync_lsn,500.backup_mode = backup_mode,501.thread_num = i + 1,502.transfered_bytes = 0,503.completed = false,504};505
506/* Run threads */507thread_interrupted = false;508threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);509if (!dry_run)510{511for (i = 0; i < num_threads; i++)512{513elog(VERBOSE, "Start thread num: %i", i);514pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));515}516}517
518/* Wait threads */519for (i = 0; i < num_threads; i++)520{521if (!dry_run)522pthread_join(threads[i], NULL);523all_threads_successful &= threads_args[i].completed;524transfered_bytes_result += threads_args[i].transfered_bytes;525}526
527free(threads);528free(threads_args);529return all_threads_successful ? transfered_bytes_result : -1;530}
531
532/*
533* Sync every file in destination directory to disk
534*/
535static void536catchup_sync_destination_files(const char* pgdata_path, fio_location location, parray *filelist, pgFile *pg_control_file)537{
538char fullpath[MAXPGPATH];539time_t start_time, end_time;540char pretty_time[20];541int i;542
543elog(INFO, "Syncing copied files to disk");544time(&start_time);545
546for (i = 0; i < parray_num(filelist); i++)547{548pgFile *file = (pgFile *) parray_get(filelist, i);549
550/* TODO: sync directory ?551* - at first glance we can rely on fs journaling,
552* which is enabled by default on most platforms
553* - but PG itself is not relying on fs, its durable_sync
554* includes directory sync
555*/
556if (S_ISDIR(file->mode) || file->excluded)557continue;558
559Assert(file->external_dir_num == 0);560join_path_components(fullpath, pgdata_path, file->rel_path);561if (fio_sync(fullpath, location) != 0)562elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));563}564
565/*566* sync pg_control file
567*/
568join_path_components(fullpath, pgdata_path, pg_control_file->rel_path);569if (fio_sync(fullpath, location) != 0)570elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));571
572time(&end_time);573pretty_time_interval(difftime(end_time, start_time),574pretty_time, lengthof(pretty_time));575elog(INFO, "Files are synced, time elapsed: %s", pretty_time);576}
577
578/*
579* Filter filelist helper function (used to process --exclude-path's)
580* filelist -- parray of pgFile *, can't be NULL
581* exclude_absolute_paths_list -- sorted parray of char * (absolute paths, starting with '/'), can be NULL
582* exclude_relative_paths_list -- sorted parray of char * (relative paths), can be NULL
583* logging_string -- helper parameter, used for generating verbose log messages ("Source" or "Destination")
584*/
585static void586filter_filelist(parray *filelist, const char *pgdata,587parray *exclude_absolute_paths_list, parray *exclude_relative_paths_list,588const char *logging_string)589{
590int i;591
592if (exclude_absolute_paths_list == NULL && exclude_relative_paths_list == NULL)593return;594
595for (i = 0; i < parray_num(filelist); ++i)596{597char full_path[MAXPGPATH];598pgFile *file = (pgFile *) parray_get(filelist, i);599join_path_components(full_path, pgdata, file->rel_path);600
601if (602(exclude_absolute_paths_list != NULL603&& parray_bsearch(exclude_absolute_paths_list, full_path, pgPrefixCompareString)!= NULL604) || (605exclude_relative_paths_list != NULL606&& parray_bsearch(exclude_relative_paths_list, file->rel_path, pgPrefixCompareString)!= NULL)607)608{609elog(INFO, "%s file \"%s\" excluded with --exclude-path option", logging_string, full_path);610file->excluded = true;611}612}613}
614
615/*
616* Entry point of pg_probackup CATCHUP subcommand.
617* exclude_*_paths_list are parray's of char *
618*/
619int
620do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files,621parray *exclude_absolute_paths_list, parray *exclude_relative_paths_list)622{
623PGconn *source_conn = NULL;624PGNodeInfo source_node_info;625bool backup_logs = false;626parray *source_filelist = NULL;627pgFile *source_pg_control_file = NULL;628parray *dest_filelist = NULL;629char dest_xlog_path[MAXPGPATH];630
631RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 };632PGStopBackupResult stop_backup_result;633bool catchup_isok = true;634
635int i;636
637/* for fancy reporting */638time_t start_time, end_time;639ssize_t transfered_datafiles_bytes = 0;640ssize_t transfered_walfiles_bytes = 0;641char pretty_source_bytes[20];642
643source_conn = catchup_init_state(&source_node_info, source_pgdata, dest_pgdata);644catchup_preflight_checks(&source_node_info, source_conn, source_pgdata, dest_pgdata);645
646/* we need to sort --exclude_path's for future searching */647if (exclude_absolute_paths_list != NULL)648parray_qsort(exclude_absolute_paths_list, pgCompareString);649if (exclude_relative_paths_list != NULL)650parray_qsort(exclude_relative_paths_list, pgCompareString);651
652elog(INFO, "Database catchup start");653
654if (current.backup_mode != BACKUP_MODE_FULL)655{656dest_filelist = parray_new();657dir_list_file(dest_filelist, dest_pgdata,658true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);659filter_filelist(dest_filelist, dest_pgdata, exclude_absolute_paths_list, exclude_relative_paths_list, "Destination");660
661// fill dest_redo.lsn and dest_redo.tli662get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);663elog(INFO, "syncLSN = %X/%X", (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn);664
665/*666* Future improvement to catch partial catchup:
667* 1. rename dest pg_control into something like pg_control.pbk
668* (so user can't start partial catchup'ed instance from this point)
669* 2. try to read by get_redo() pg_control and pg_control.pbk (to detect partial catchup)
670* 3. at the end (after copy of correct pg_control), remove pg_control.pbk
671*/
672}673
674/*675* Make sure that sync point is withing ptrack tracking range
676* TODO: move to separate function to use in both backup.c and catchup.c
677*/
678if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)679{680XLogRecPtr ptrack_lsn = get_last_ptrack_lsn(source_conn, &source_node_info);681
682if (ptrack_lsn > dest_redo.lsn || ptrack_lsn == InvalidXLogRecPtr)683elog(ERROR, "LSN from ptrack_control in source %X/%X is greater than checkpoint LSN in destination %X/%X.\n"684"You can perform only FULL catchup.",685(uint32) (ptrack_lsn >> 32), (uint32) (ptrack_lsn),686(uint32) (dest_redo.lsn >> 32),687(uint32) (dest_redo.lsn));688}689
690{691char label[1024];692/* notify start of backup to PostgreSQL server */693time2iso(label, lengthof(label), current.start_time, false);694strncat(label, " with pg_probackup", lengthof(label) -695strlen(" with pg_probackup"));696
697/* Call pg_start_backup function in PostgreSQL connect */698pg_start_backup(label, smooth_checkpoint, ¤t, &source_node_info, source_conn);699elog(INFO, "pg_start_backup START LSN %X/%X", (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn));700}701
702/* Sanity: source cluster must be "in future" relatively to dest cluster */703if (current.backup_mode != BACKUP_MODE_FULL &&704dest_redo.lsn > current.start_lsn)705elog(ERROR, "Current START LSN %X/%X is lower than SYNC LSN %X/%X, "706"it may indicate that we are trying to catchup with PostgreSQL instance from the past",707(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),708(uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn));709
710/* Start stream replication */711join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);712if (!dry_run)713{714fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);715start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,716current.start_lsn, current.tli, false);717}718else719elog(INFO, "WAL streaming skipping with --dry-run option");720
721source_filelist = parray_new();722
723/* list files with the logical path. omit $PGDATA */724if (fio_is_remote(FIO_DB_HOST))725fio_list_dir(source_filelist, source_pgdata,726true, true, false, backup_logs, true, 0);727else728dir_list_file(source_filelist, source_pgdata,729true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);730
731//REVIEW FIXME. Let's fix that before release.732// TODO what if wal is not a dir (symlink to a dir)?733// - Currently backup/restore transform pg_wal symlink to directory734// so the problem is not only with catchup.735// if we want to make it right - we must provide the way736// for symlink remapping during restore and catchup.737// By default everything must be left as it is.738
739/* close ssh session in main thread */740fio_disconnect();741
742/*743* Sort pathname ascending. It is necessary to create intermediate
744* directories sequentially.
745*
746* For example:
747* 1 - create 'base'
748* 2 - create 'base/1'
749*
750* Sorted array is used at least in parse_filelist_filenames(),
751* extractPageMap(), make_pagemap_from_ptrack().
752*/
753parray_qsort(source_filelist, pgFileCompareRelPathWithExternal);754
755//REVIEW Do we want to do similar calculation for dest?756//REVIEW_ANSWER what for?757{758ssize_t source_bytes = 0;759char pretty_bytes[20];760
761source_bytes += calculate_datasize_of_filelist(source_filelist);762
763/* Extract information about files in source_filelist parsing their names:*/764parse_filelist_filenames(source_filelist, source_pgdata);765filter_filelist(source_filelist, source_pgdata, exclude_absolute_paths_list, exclude_relative_paths_list, "Source");766
767current.pgdata_bytes += calculate_datasize_of_filelist(source_filelist);768
769pretty_size(current.pgdata_bytes, pretty_source_bytes, lengthof(pretty_source_bytes));770pretty_size(source_bytes - current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));771elog(INFO, "Source PGDATA size: %s (excluded %s)", pretty_source_bytes, pretty_bytes);772}773
774elog(INFO, "Start LSN (source): %X/%X, TLI: %X",775(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),776current.tli);777if (current.backup_mode != BACKUP_MODE_FULL)778elog(INFO, "LSN in destination: %X/%X, TLI: %X",779(uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn),780dest_redo.tli);781
782/* Build page mapping in PTRACK mode */783if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)784{785time(&start_time);786elog(INFO, "Extracting pagemap of changed blocks");787
788/* Build the page map from ptrack information */789make_pagemap_from_ptrack_2(source_filelist, source_conn,790source_node_info.ptrack_schema,791source_node_info.ptrack_version_num,792dest_redo.lsn);793time(&end_time);794elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",795difftime(end_time, start_time));796}797
798/*799* Make directories before catchup
800*/
801/*802* We iterate over source_filelist and for every directory with parent 'pg_tblspc'
803* we must lookup this directory name in tablespace map.
804* If we got a match, we treat this directory as tablespace.
805* It means that we create directory specified in tablespace map and
806* original directory created as symlink to it.
807*/
808for (i = 0; i < parray_num(source_filelist); i++)809{810pgFile *file = (pgFile *) parray_get(source_filelist, i);811char parent_dir[MAXPGPATH];812
813if (!S_ISDIR(file->mode) || file->excluded)814continue;815
816/*817* check if it is fake "directory" and is a tablespace link
818* this is because we passed the follow_symlink when building the list
819*/
820/* get parent dir of rel_path */821strncpy(parent_dir, file->rel_path, MAXPGPATH);822get_parent_directory(parent_dir);823
824/* check if directory is actually link to tablespace */825if (strcmp(parent_dir, PG_TBLSPC_DIR) != 0)826{827/* if the entry is a regular directory, create it in the destination */828char dirpath[MAXPGPATH];829
830join_path_components(dirpath, dest_pgdata, file->rel_path);831
832elog(LOG, "Create directory '%s'", dirpath);833if (!dry_run)834fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);835}836else837{838/* this directory located in pg_tblspc */839const char *linked_path = NULL;840char to_path[MAXPGPATH];841
842// TODO perform additional check that this is actually symlink?843{ /* get full symlink path and map this path to new location */844char source_full_path[MAXPGPATH];845char symlink_content[MAXPGPATH];846join_path_components(source_full_path, source_pgdata, file->rel_path);847fio_readlink(source_full_path, symlink_content, sizeof(symlink_content), FIO_DB_HOST);848/* we checked that mapping exists in preflight_checks for local catchup */849linked_path = get_tablespace_mapping(symlink_content);850elog(INFO, "Map tablespace full_path: \"%s\" old_symlink_content: \"%s\" new_symlink_content: \"%s\"\n",851source_full_path,852symlink_content,853linked_path);854}855
856if (!is_absolute_path(linked_path))857elog(ERROR, "Tablespace directory path must be an absolute path: %s\n",858linked_path);859
860join_path_components(to_path, dest_pgdata, file->rel_path);861
862elog(INFO, "Create directory \"%s\" and symbolic link \"%s\"",863linked_path, to_path);864
865if (!dry_run)866{867/* create tablespace directory */868if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)869elog(ERROR, "Could not create tablespace directory \"%s\": %s",870linked_path, strerror(errno));871
872/* create link to linked_path */873if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)874elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",875linked_path, to_path, strerror(errno));876}877}878}879
880/*881* find pg_control file (in already sorted source_filelist)
882* and exclude it from list for future special processing
883*/
884{885int control_file_elem_index;886pgFile search_key;887MemSet(&search_key, 0, sizeof(pgFile));888/* pgFileCompareRelPathWithExternal uses only .rel_path and .external_dir_num for comparision */889search_key.rel_path = XLOG_CONTROL_FILE;890search_key.external_dir_num = 0;891control_file_elem_index = parray_bsearch_index(source_filelist, &search_key, pgFileCompareRelPathWithExternal);892if(control_file_elem_index < 0)893elog(ERROR, "\"%s\" not found in \"%s\"\n", XLOG_CONTROL_FILE, source_pgdata);894source_pg_control_file = parray_remove(source_filelist, control_file_elem_index);895}896
897/* TODO before public release: must be more careful with pg_control.898* when running catchup or incremental restore
899* cluster is actually in two states
900* simultaneously - old and new, so
901* it must contain both pg_control files
902* describing those states: global/pg_control_old, global/pg_control_new
903* 1. This approach will provide us with means of
904* robust detection of previos failures and thus correct operation retrying (or forbidding).
905* 2. We will have the ability of preventing instance from starting
906* in the middle of our operations.
907*/
908
909/*910* remove absent source files in dest (dropped tables, etc...)
911* note: global/pg_control will also be deleted here
912* mark dest files (that excluded with source --exclude-path) also for exclusion
913*/
914if (current.backup_mode != BACKUP_MODE_FULL)915{916elog(INFO, "Removing redundant files in destination directory");917parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc);918for (i = 0; i < parray_num(dest_filelist); i++)919{920bool redundant = true;921pgFile *file = (pgFile *) parray_get(dest_filelist, i);922pgFile **src_file = NULL;923
924//TODO optimize it and use some merge-like algorithm925//instead of bsearch for each file.926src_file = (pgFile **) parray_bsearch(source_filelist, file, pgFileCompareRelPathWithExternal);927
928if (src_file!= NULL && !(*src_file)->excluded && file->excluded)929(*src_file)->excluded = true;930
931if (src_file!= NULL || file->excluded)932redundant = false;933
934/* pg_filenode.map are always copied, because it's crc cannot be trusted */935Assert(file->external_dir_num == 0);936if (pg_strcasecmp(file->name, RELMAPPER_FILENAME) == 0)937redundant = true;938
939/* if file does not exists in destination list, then we can safely unlink it */940if (redundant)941{942char fullpath[MAXPGPATH];943
944join_path_components(fullpath, dest_pgdata, file->rel_path);945if (!dry_run)946{947fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);948}949elog(LOG, "Deleted file \"%s\"", fullpath);950
951/* shrink dest pgdata list */952pgFileFree(file);953parray_remove(dest_filelist, i);954i--;955}956}957}958
959/* clear file locks */960pfilearray_clear_locks(source_filelist);961
962/* Sort by size for load balancing */963parray_qsort(source_filelist, pgFileCompareSizeDesc);964
965/* Sort the array for binary search */966if (dest_filelist)967parray_qsort(dest_filelist, pgFileCompareRelPathWithExternal);968
969/* run copy threads */970elog(INFO, "Start transferring data files");971time(&start_time);972transfered_datafiles_bytes = catchup_multithreaded_copy(num_threads, &source_node_info,973source_pgdata, dest_pgdata,974source_filelist, dest_filelist,975dest_redo.lsn, current.backup_mode);976catchup_isok = transfered_datafiles_bytes != -1;977
978/* at last copy control file */979if (catchup_isok && !dry_run)980{981char from_fullpath[MAXPGPATH];982char to_fullpath[MAXPGPATH];983join_path_components(from_fullpath, source_pgdata, source_pg_control_file->rel_path);984join_path_components(to_fullpath, dest_pgdata, source_pg_control_file->rel_path);985copy_pgcontrol_file(from_fullpath, FIO_DB_HOST,986to_fullpath, FIO_LOCAL_HOST, source_pg_control_file);987transfered_datafiles_bytes += source_pg_control_file->size;988}989
990if (!catchup_isok && !dry_run)991{992char pretty_time[20];993char pretty_transfered_data_bytes[20];994
995time(&end_time);996pretty_time_interval(difftime(end_time, start_time),997pretty_time, lengthof(pretty_time));998pretty_size(transfered_datafiles_bytes, pretty_transfered_data_bytes, lengthof(pretty_transfered_data_bytes));999
1000elog(ERROR, "Catchup failed. Transfered: %s, time elapsed: %s",1001pretty_transfered_data_bytes, pretty_time);1002}1003
1004/* Notify end of backup */1005{1006//REVIEW Is it relevant to catchup? I suppose it isn't, since catchup is a new code.1007//If we do need it, please write a comment explaining that.1008/* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */1009int timeout = (instance_config.archive_timeout > 0) ?1010instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT;1011char *stop_backup_query_text = NULL;1012
1013pg_silent_client_messages(source_conn);1014
1015/* Execute pg_stop_backup using PostgreSQL connection */1016pg_stop_backup_send(source_conn, source_node_info.server_version, current.from_replica, exclusive_backup, &stop_backup_query_text);1017
1018/*1019* Wait for the result of pg_stop_backup(), but no longer than
1020* archive_timeout seconds
1021*/
1022pg_stop_backup_consume(source_conn, source_node_info.server_version, exclusive_backup, timeout, stop_backup_query_text, &stop_backup_result);1023
1024/* Cleanup */1025pg_free(stop_backup_query_text);1026}1027
1028if (!dry_run)1029wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, ¤t);1030
1031#if PG_VERSION_NUM >= 906001032/* Write backup_label */1033Assert(stop_backup_result.backup_label_content != NULL);1034if (!dry_run)1035{1036pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",1037stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,1038NULL);1039}1040free(stop_backup_result.backup_label_content);1041stop_backup_result.backup_label_content = NULL;1042stop_backup_result.backup_label_content_len = 0;1043
1044/* tablespace_map */1045if (stop_backup_result.tablespace_map_content != NULL)1046{1047// TODO what if tablespace is created during catchup?1048/* Because we have already created symlinks in pg_tblspc earlier,1049* we do not need to write the tablespace_map file.
1050* So this call is unnecessary:
1051* pg_stop_backup_write_file_helper(dest_pgdata, PG_TABLESPACE_MAP_FILE, "tablespace map",
1052* stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len,
1053* NULL);
1054*/
1055free(stop_backup_result.tablespace_map_content);1056stop_backup_result.tablespace_map_content = NULL;1057stop_backup_result.tablespace_map_content_len = 0;1058}1059#endif1060
1061/* wait for end of wal streaming and calculate wal size transfered */1062if (!dry_run)1063{1064parray *wal_files_list = NULL;1065wal_files_list = parray_new();1066
1067if (wait_WAL_streaming_end(wal_files_list))1068elog(ERROR, "WAL streaming failed");1069
1070for (i = 0; i < parray_num(wal_files_list); i++)1071{1072pgFile *file = (pgFile *) parray_get(wal_files_list, i);1073transfered_walfiles_bytes += file->size;1074}1075
1076parray_walk(wal_files_list, pgFileFree);1077parray_free(wal_files_list);1078wal_files_list = NULL;1079}1080
1081/*1082* In case of backup from replica >= 9.6 we must fix minRecPoint
1083*/
1084if (current.from_replica && !exclusive_backup)1085{1086set_min_recovery_point(source_pg_control_file, dest_pgdata, current.stop_lsn);1087}1088
1089/* close ssh session in main thread */1090fio_disconnect();1091
1092/* fancy reporting */1093{1094char pretty_transfered_data_bytes[20];1095char pretty_transfered_wal_bytes[20];1096char pretty_time[20];1097
1098time(&end_time);1099pretty_time_interval(difftime(end_time, start_time),1100pretty_time, lengthof(pretty_time));1101pretty_size(transfered_datafiles_bytes, pretty_transfered_data_bytes, lengthof(pretty_transfered_data_bytes));1102pretty_size(transfered_walfiles_bytes, pretty_transfered_wal_bytes, lengthof(pretty_transfered_wal_bytes));1103
1104elog(INFO, "Databases synchronized. Transfered datafiles size: %s, transfered wal size: %s, time elapsed: %s",1105pretty_transfered_data_bytes, pretty_transfered_wal_bytes, pretty_time);1106
1107if (current.backup_mode != BACKUP_MODE_FULL)1108elog(INFO, "Catchup incremental ratio (less is better): %.f%% (%s/%s)",1109((float) transfered_datafiles_bytes / current.pgdata_bytes) * 100,1110pretty_transfered_data_bytes, pretty_source_bytes);1111}1112
1113/* Sync all copied files unless '--no-sync' flag is used */1114if (sync_dest_files && !dry_run)1115catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);1116else1117elog(WARNING, "Files are not synced to disk");1118
1119/* Cleanup */1120if (dest_filelist && !dry_run)1121{1122parray_walk(dest_filelist, pgFileFree);1123}1124parray_free(dest_filelist);1125parray_walk(source_filelist, pgFileFree);1126parray_free(source_filelist);1127pgFileFree(source_pg_control_file);1128
1129return 0;1130}
1131