pg_probackup
2619 строк · 78.2 Кб
1/*-------------------------------------------------------------------------
2*
3* backup.c: backup DB cluster, archived WAL
4*
5* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
6* Portions Copyright (c) 2015-2022, Postgres Professional
7*
8*-------------------------------------------------------------------------
9*/
10
11#include "pg_probackup.h"
12
13#if PG_VERSION_NUM < 110000
14#include "catalog/catalog.h"
15#endif
16#if PG_VERSION_NUM < 120000
17#include "access/transam.h"
18#endif
19#include "catalog/pg_tablespace.h"
20#include "pgtar.h"
21#include "streamutil.h"
22
23#include <sys/stat.h>
24#include <time.h>
25#include <unistd.h>
26
27#include "utils/thread.h"
28#include "utils/file.h"
29
30//const char *progname = "pg_probackup";
31
32/* list of files contained in backup */
33parray *backup_files_list = NULL;
34
35/* We need critical section for datapagemap_add() in case of using threads */
36static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER;
37
38// TODO: move to PGnodeInfo
39bool exclusive_backup = false;
40
41/* Is pg_start_backup() was executed */
42bool backup_in_progress = false;
43
44/*
45* Backup routines
46*/
47static void backup_cleanup(bool fatal, void *userdata);
48
49static void *backup_files(void *arg);
50
51static void do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
52PGNodeInfo *nodeInfo, bool no_sync, bool backup_logs);
53
54static void pg_switch_wal(PGconn *conn);
55
56static void pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo);
57
58static void check_external_for_tablespaces(parray *external_list,
59PGconn *backup_conn);
60static parray *get_database_map(PGconn *pg_startbackup_conn);
61
62/* pgpro specific functions */
63static bool pgpro_support(PGconn *conn);
64
65/* Check functions */
66static bool pg_is_checksum_enabled(PGconn *conn);
67static bool pg_is_in_recovery(PGconn *conn);
68static bool pg_is_superuser(PGconn *conn);
69static void check_server_version(PGconn *conn, PGNodeInfo *nodeInfo);
70static void confirm_block_size(PGconn *conn, const char *name, int blcksz);
71static void rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
72static bool remove_excluded_files_criterion(void *value, void *exclude_args);
73static void backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments);
74static void process_file(int i, pgFile *file, backup_files_arg *arguments);
75
76static StopBackupCallbackParams stop_callback_params;
77
78static void
79backup_stopbackup_callback(bool fatal, void *userdata)
80{
81StopBackupCallbackParams *st = (StopBackupCallbackParams *) userdata;
82/*
83* If backup is in progress, notify stop of backup to PostgreSQL
84*/
85if (backup_in_progress)
86{
87elog(WARNING, "A backup is in progress, stopping it.");
88/* don't care about stop_lsn in case of error */
89pg_stop_backup_send(st->conn, st->server_version, current.from_replica, exclusive_backup, NULL);
90}
91}
92
93/*
94* Take a backup of a single postgresql instance.
95* Move files from 'pgdata' to a subdirectory in backup catalog.
96*/
97static void
98do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
99PGNodeInfo *nodeInfo, bool no_sync, bool backup_logs)
100{
101int i;
102char external_prefix[MAXPGPATH]; /* Temp value. Used as template */
103char label[1024];
104XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
105
106/* arrays with meta info for multi threaded backup */
107pthread_t *threads;
108backup_files_arg *threads_args;
109bool backup_isok = true;
110
111pgBackup *prev_backup = NULL;
112parray *prev_backup_filelist = NULL;
113parray *backup_list = NULL;
114parray *external_dirs = NULL;
115parray *database_map = NULL;
116
117/* used for multitimeline incremental backup */
118parray *tli_list = NULL;
119
120/* for fancy reporting */
121time_t start_time, end_time;
122char pretty_time[20];
123char pretty_bytes[20];
124
125elog(INFO, "Database backup start");
126if(current.external_dir_str)
127{
128external_dirs = make_external_directory_list(current.external_dir_str,
129false);
130check_external_for_tablespaces(external_dirs, backup_conn);
131}
132
133/* notify start of backup to PostgreSQL server */
134time2iso(label, lengthof(label), current.start_time, false);
135strncat(label, " with pg_probackup", lengthof(label) -
136strlen(" with pg_probackup"));
137
138/* Call pg_start_backup function in PostgreSQL connect */
139pg_start_backup(label, smooth_checkpoint, ¤t, nodeInfo, backup_conn);
140
141/* Obtain current timeline */
142#if PG_VERSION_NUM >= 90600
143current.tli = get_current_timeline(backup_conn);
144#else
145current.tli = get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
146#endif
147
148/*
149* In incremental backup mode ensure that already-validated
150* backup on current timeline exists and get its filelist.
151*/
152if (current.backup_mode == BACKUP_MODE_DIFF_PAGE ||
153current.backup_mode == BACKUP_MODE_DIFF_PTRACK ||
154current.backup_mode == BACKUP_MODE_DIFF_DELTA)
155{
156/* get list of backups already taken */
157backup_list = catalog_get_backup_list(instanceState, INVALID_BACKUP_ID);
158
159prev_backup = catalog_get_last_data_backup(backup_list, current.tli, current.start_time);
160if (prev_backup == NULL)
161{
162/* try to setup multi-timeline backup chain */
163elog(WARNING, "Valid full backup on current timeline %u is not found, "
164"trying to look up on previous timelines",
165current.tli);
166
167tli_list = get_history_streaming(&instance_config.conn_opt, current.tli, backup_list);
168if (!tli_list)
169{
170elog(WARNING, "Failed to obtain current timeline history file via replication protocol");
171/* fallback to using archive */
172tli_list = catalog_get_timelines(instanceState, &instance_config);
173}
174
175if (parray_num(tli_list) == 0)
176elog(WARNING, "Cannot find valid backup on previous timelines, "
177"WAL archive is not available");
178else
179{
180prev_backup = get_multi_timeline_parent(backup_list, tli_list, current.tli,
181current.start_time, &instance_config);
182
183if (prev_backup == NULL)
184elog(WARNING, "Cannot find valid backup on previous timelines");
185}
186
187/* failed to find suitable parent, error out */
188if (!prev_backup)
189elog(ERROR, "Create new full backup before an incremental one");
190}
191}
192
193if (prev_backup)
194{
195if (parse_program_version(prev_backup->program_version) > parse_program_version(PROGRAM_VERSION))
196elog(ERROR, "pg_probackup binary version is %s, but backup %s version is %s. "
197"pg_probackup do not guarantee to be forward compatible. "
198"Please upgrade pg_probackup binary.",
199PROGRAM_VERSION, backup_id_of(prev_backup), prev_backup->program_version);
200
201elog(INFO, "Parent backup: %s", backup_id_of(prev_backup));
202
203/* Files of previous backup needed by DELTA backup */
204prev_backup_filelist = get_backup_filelist(prev_backup, true);
205
206/* If lsn is not NULL, only pages with higher lsn will be copied. */
207prev_backup_start_lsn = prev_backup->start_lsn;
208current.parent_backup = prev_backup->start_time;
209
210write_backup(¤t, true);
211}
212
213/*
214* It`s illegal to take PTRACK backup if LSN from ptrack_control() is not
215* equal to start_lsn of previous backup.
216*/
217if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
218{
219XLogRecPtr ptrack_lsn = get_last_ptrack_lsn(backup_conn, nodeInfo);
220
221// new ptrack (>=2.0) is more robust and checks Start LSN
222if (ptrack_lsn > prev_backup->start_lsn || ptrack_lsn == InvalidXLogRecPtr)
223{
224elog(ERROR, "LSN from ptrack_control %X/%X is greater than Start LSN of previous backup %X/%X.\n"
225"Create new full backup before an incremental one.",
226(uint32) (ptrack_lsn >> 32), (uint32) (ptrack_lsn),
227(uint32) (prev_backup->start_lsn >> 32),
228(uint32) (prev_backup->start_lsn));
229}
230}
231
232/* For incremental backup check that start_lsn is not from the past
233* Though it will not save us if PostgreSQL instance is actually
234* restored STREAM backup.
235*/
236if (current.backup_mode != BACKUP_MODE_FULL &&
237prev_backup->start_lsn > current.start_lsn)
238elog(ERROR, "Current START LSN %X/%X is lower than START LSN %X/%X of previous backup %s. "
239"It may indicate that we are trying to backup PostgreSQL instance from the past.",
240(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
241(uint32) (prev_backup->start_lsn >> 32), (uint32) (prev_backup->start_lsn),
242backup_id_of(prev_backup));
243
244/* Update running backup meta with START LSN */
245write_backup(¤t, true);
246
247/* In PAGE mode or in ARCHIVE wal-mode wait for current segment */
248if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || !current.stream)
249{
250/* Check that archive_dir can be reached */
251if (fio_access(instanceState->instance_wal_subdir_path, F_OK, FIO_BACKUP_HOST) != 0)
252elog(ERROR, "WAL archive directory is not accessible \"%s\": %s",
253instanceState->instance_wal_subdir_path, strerror(errno));
254
255/*
256* Do not wait start_lsn for stream backup.
257* Because WAL streaming will start after pg_start_backup() in stream
258* mode.
259*/
260wait_wal_lsn(instanceState->instance_wal_subdir_path, current.start_lsn, true, current.tli, false, true, ERROR, false);
261}
262
263/* start stream replication */
264if (current.stream)
265{
266char stream_xlog_path[MAXPGPATH];
267
268join_path_components(stream_xlog_path, current.database_dir, PG_XLOG_DIR);
269fio_mkdir(stream_xlog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
270
271start_WAL_streaming(backup_conn, stream_xlog_path, &instance_config.conn_opt,
272current.start_lsn, current.tli, true);
273
274/* Make sure that WAL streaming is working
275* PAGE backup in stream mode is waited twice, first for
276* segment in WAL archive and then for streamed segment
277*/
278wait_wal_lsn(stream_xlog_path, current.start_lsn, true, current.tli, false, true, ERROR, true);
279}
280
281/* initialize backup's file list */
282backup_files_list = parray_new();
283join_path_components(external_prefix, current.root_dir, EXTERNAL_DIR);
284
285/* list files with the logical path. omit $PGDATA */
286fio_list_dir(backup_files_list, instance_config.pgdata,
287true, true, false, backup_logs, true, 0);
288
289/*
290* Get database_map (name to oid) for use in partial restore feature.
291* It's possible that we fail and database_map will be NULL.
292*/
293database_map = get_database_map(backup_conn);
294
295/*
296* Append to backup list all files and directories
297* from external directory option
298*/
299if (external_dirs)
300{
301for (i = 0; i < parray_num(external_dirs); i++)
302{
303/* External dirs numeration starts with 1.
304* 0 value is not external dir */
305if (fio_is_remote(FIO_DB_HOST))
306fio_list_dir(backup_files_list, parray_get(external_dirs, i),
307false, true, false, false, true, i+1);
308else
309dir_list_file(backup_files_list, parray_get(external_dirs, i),
310false, true, false, false, true, i+1, FIO_LOCAL_HOST);
311}
312}
313
314/* close ssh session in main thread */
315fio_disconnect();
316
317/* Sanity check for backup_files_list, thank you, Windows:
318* https://github.com/postgrespro/pg_probackup/issues/48
319*/
320
321if (parray_num(backup_files_list) < 100)
322elog(ERROR, "PGDATA is almost empty. Either it was concurrently deleted or "
323"pg_probackup do not possess sufficient permissions to list PGDATA content");
324
325current.pgdata_bytes += calculate_datasize_of_filelist(backup_files_list);
326pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));
327elog(INFO, "PGDATA size: %s", pretty_bytes);
328
329/*
330* Sort pathname ascending. It is necessary to create intermediate
331* directories sequentially.
332*
333* For example:
334* 1 - create 'base'
335* 2 - create 'base/1'
336*
337* Sorted array is used at least in parse_filelist_filenames(),
338* extractPageMap(), make_pagemap_from_ptrack().
339*/
340parray_qsort(backup_files_list, pgFileCompareRelPathWithExternal);
341
342/* Extract information about files in backup_list parsing their names:*/
343parse_filelist_filenames(backup_files_list, instance_config.pgdata);
344
345elog(INFO, "Current Start LSN: %X/%X, TLI: %X",
346(uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
347current.tli);
348if (current.backup_mode != BACKUP_MODE_FULL)
349elog(INFO, "Parent Start LSN: %X/%X, TLI: %X",
350(uint32) (prev_backup->start_lsn >> 32), (uint32) (prev_backup->start_lsn),
351prev_backup->tli);
352
353/*
354* Build page mapping in incremental mode.
355*/
356
357if (current.backup_mode == BACKUP_MODE_DIFF_PAGE ||
358current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
359{
360bool pagemap_isok = true;
361
362time(&start_time);
363elog(INFO, "Extracting pagemap of changed blocks");
364
365if (current.backup_mode == BACKUP_MODE_DIFF_PAGE)
366{
367/*
368* Build the page map. Obtain information about changed pages
369* reading WAL segments present in archives up to the point
370* where this backup has started.
371*/
372pagemap_isok = extractPageMap(instanceState->instance_wal_subdir_path,
373instance_config.xlog_seg_size,
374prev_backup->start_lsn, prev_backup->tli,
375current.start_lsn, current.tli, tli_list);
376}
377else if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
378{
379/*
380* Build the page map from ptrack information.
381*/
382make_pagemap_from_ptrack_2(backup_files_list, backup_conn,
383nodeInfo->ptrack_schema,
384nodeInfo->ptrack_version_num,
385prev_backup_start_lsn);
386}
387
388time(&end_time);
389
390/* TODO: add ms precision */
391if (pagemap_isok)
392elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
393difftime(end_time, start_time));
394else
395elog(ERROR, "Pagemap extraction failed, time elasped: %.0f sec",
396difftime(end_time, start_time));
397}
398
399/*
400* Make directories before backup
401*/
402for (i = 0; i < parray_num(backup_files_list); i++)
403{
404pgFile *file = (pgFile *) parray_get(backup_files_list, i);
405
406/* if the entry was a directory, create it in the backup */
407if (S_ISDIR(file->mode))
408{
409char dirpath[MAXPGPATH];
410
411if (file->external_dir_num)
412{
413char temp[MAXPGPATH];
414snprintf(temp, MAXPGPATH, "%s%d", external_prefix,
415file->external_dir_num);
416join_path_components(dirpath, temp, file->rel_path);
417}
418else
419join_path_components(dirpath, current.database_dir, file->rel_path);
420
421elog(LOG, "Create directory '%s'", dirpath);
422fio_mkdir(dirpath, DIR_PERMISSION, FIO_BACKUP_HOST);
423}
424
425}
426
427/* setup thread locks */
428pfilearray_clear_locks(backup_files_list);
429
430/* Sort by size for load balancing */
431parray_qsort(backup_files_list, pgFileCompareSize);
432/* Sort the array for binary search */
433if (prev_backup_filelist)
434parray_qsort(prev_backup_filelist, pgFileCompareRelPathWithExternal);
435
436/* write initial backup_content.control file and update backup.control */
437write_backup_filelist(¤t, backup_files_list,
438instance_config.pgdata, external_dirs, true);
439write_backup(¤t, true);
440
441/* Init backup page header map */
442init_header_map(¤t);
443
444/* init thread args with own file lists */
445threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
446threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
447
448for (i = 0; i < num_threads; i++)
449{
450backup_files_arg *arg = &(threads_args[i]);
451
452arg->nodeInfo = nodeInfo;
453arg->from_root = instance_config.pgdata;
454arg->to_root = current.database_dir;
455arg->external_prefix = external_prefix;
456arg->external_dirs = external_dirs;
457arg->files_list = backup_files_list;
458arg->prev_filelist = prev_backup_filelist;
459arg->prev_start_lsn = prev_backup_start_lsn;
460arg->hdr_map = &(current.hdr_map);
461arg->thread_num = i+1;
462/* By default there are some error */
463arg->ret = 1;
464}
465
466/* Run threads */
467thread_interrupted = false;
468elog(INFO, "Start transferring data files");
469time(&start_time);
470for (i = 0; i < num_threads; i++)
471{
472backup_files_arg *arg = &(threads_args[i]);
473
474elog(VERBOSE, "Start thread num: %i", i);
475pthread_create(&threads[i], NULL, backup_files, arg);
476}
477
478/* Wait threads */
479for (i = 0; i < num_threads; i++)
480{
481pthread_join(threads[i], NULL);
482if (threads_args[i].ret == 1)
483backup_isok = false;
484}
485
486time(&end_time);
487pretty_time_interval(difftime(end_time, start_time),
488pretty_time, lengthof(pretty_time));
489if (backup_isok)
490elog(INFO, "Data files are transferred, time elapsed: %s",
491pretty_time);
492else
493elog(ERROR, "Data files transferring failed, time elapsed: %s",
494pretty_time);
495
496/* clean previous backup file list */
497if (prev_backup_filelist)
498{
499parray_walk(prev_backup_filelist, pgFileFree);
500parray_free(prev_backup_filelist);
501}
502
503/* Notify end of backup */
504pg_stop_backup(instanceState, ¤t, backup_conn, nodeInfo);
505
506/* In case of backup from replica >= 9.6 we must fix minRecPoint,
507* First we must find pg_control in backup_files_list.
508*/
509if (current.from_replica && !exclusive_backup)
510{
511pgFile *pg_control = NULL;
512
513for (i = 0; i < parray_num(backup_files_list); i++)
514{
515pgFile *tmp_file = (pgFile *) parray_get(backup_files_list, i);
516
517if (tmp_file->external_dir_num == 0 &&
518(strcmp(tmp_file->rel_path, XLOG_CONTROL_FILE) == 0))
519{
520pg_control = tmp_file;
521break;
522}
523}
524
525if (!pg_control)
526elog(ERROR, "Failed to find file \"%s\" in backup filelist.",
527XLOG_CONTROL_FILE);
528
529set_min_recovery_point(pg_control, current.database_dir, current.stop_lsn);
530}
531
532/* close and sync page header map */
533if (current.hdr_map.fp)
534{
535cleanup_header_map(&(current.hdr_map));
536
537if (fio_sync(current.hdr_map.path, FIO_BACKUP_HOST) != 0)
538elog(ERROR, "Cannot sync file \"%s\": %s", current.hdr_map.path, strerror(errno));
539}
540
541/* close ssh session in main thread */
542fio_disconnect();
543
544/*
545* Add archived xlog files into the list of files of this backup
546* NOTHING TO DO HERE
547*/
548
549/* write database map to file and add it to control file */
550if (database_map)
551{
552write_database_map(¤t, database_map, backup_files_list);
553/* cleanup */
554parray_walk(database_map, db_map_entry_free);
555parray_free(database_map);
556}
557
558/* Print the list of files to backup catalog */
559write_backup_filelist(¤t, backup_files_list, instance_config.pgdata,
560external_dirs, true);
561/* update backup control file to update size info */
562write_backup(¤t, true);
563
564/* Sync all copied files unless '--no-sync' flag is used */
565if (no_sync)
566elog(WARNING, "Backup files are not synced to disk");
567else
568{
569elog(INFO, "Syncing backup files to disk");
570time(&start_time);
571
572for (i = 0; i < parray_num(backup_files_list); i++)
573{
574char to_fullpath[MAXPGPATH];
575pgFile *file = (pgFile *) parray_get(backup_files_list, i);
576
577/* TODO: sync directory ? */
578if (S_ISDIR(file->mode))
579continue;
580
581if (file->write_size <= 0)
582continue;
583
584/* construct fullpath */
585if (file->external_dir_num == 0)
586join_path_components(to_fullpath, current.database_dir, file->rel_path);
587else
588{
589char external_dst[MAXPGPATH];
590
591makeExternalDirPathByNum(external_dst, external_prefix,
592file->external_dir_num);
593join_path_components(to_fullpath, external_dst, file->rel_path);
594}
595
596if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0)
597elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath, strerror(errno));
598}
599
600time(&end_time);
601pretty_time_interval(difftime(end_time, start_time),
602pretty_time, lengthof(pretty_time));
603elog(INFO, "Backup files are synced, time elapsed: %s", pretty_time);
604}
605
606/* be paranoid about instance been from the past */
607if (current.backup_mode != BACKUP_MODE_FULL &&
608current.stop_lsn < prev_backup->stop_lsn)
609elog(ERROR, "Current backup STOP LSN %X/%X is lower than STOP LSN %X/%X of previous backup %s. "
610"It may indicate that we are trying to backup PostgreSQL instance from the past.",
611(uint32) (current.stop_lsn >> 32), (uint32) (current.stop_lsn),
612(uint32) (prev_backup->stop_lsn >> 32), (uint32) (prev_backup->stop_lsn),
613backup_id_of(prev_backup));
614
615/* clean external directories list */
616if (external_dirs)
617free_dir_list(external_dirs);
618
619/* Cleanup */
620if (backup_list)
621{
622parray_walk(backup_list, pgBackupFree);
623parray_free(backup_list);
624}
625
626if (tli_list)
627{
628parray_walk(tli_list, timelineInfoFree);
629parray_free(tli_list);
630}
631
632parray_walk(backup_files_list, pgFileFree);
633parray_free(backup_files_list);
634backup_files_list = NULL;
635}
636
637/*
638* Common code for CHECKDB and BACKUP commands.
639* Ensure that we're able to connect to the instance
640* check compatibility and fill basic info.
641* For checkdb launched in amcheck mode with pgdata validation
642* do not check system ID, it gives user an opportunity to
643* check remote PostgreSQL instance.
644* Also checking system ID in this case serves no purpose, because
645* all work is done by server.
646*
647* Returns established connection
648*/
649PGconn *
650pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo)
651{
652PGconn *cur_conn;
653
654/* Create connection for PostgreSQL */
655cur_conn = pgut_connect(conn_opt.pghost, conn_opt.pgport,
656conn_opt.pgdatabase,
657conn_opt.pguser);
658
659current.primary_conninfo = pgut_get_conninfo_string(cur_conn);
660
661/* Confirm data block size and xlog block size are compatible */
662confirm_block_size(cur_conn, "block_size", BLCKSZ);
663confirm_block_size(cur_conn, "wal_block_size", XLOG_BLCKSZ);
664nodeInfo->block_size = BLCKSZ;
665nodeInfo->wal_block_size = XLOG_BLCKSZ;
666nodeInfo->is_superuser = pg_is_superuser(cur_conn);
667nodeInfo->pgpro_support = pgpro_support(cur_conn);
668
669current.from_replica = pg_is_in_recovery(cur_conn);
670
671/* Confirm that this server version is supported */
672check_server_version(cur_conn, nodeInfo);
673
674if (pg_is_checksum_enabled(cur_conn))
675current.checksum_version = 1;
676else
677current.checksum_version = 0;
678
679nodeInfo->checksum_version = current.checksum_version;
680
681if (current.checksum_version)
682elog(INFO, "This PostgreSQL instance was initialized with data block checksums. "
683"Data block corruption will be detected");
684else
685elog(WARNING, "This PostgreSQL instance was initialized without data block checksums. "
686"pg_probackup have no way to detect data block corruption without them. "
687"Reinitialize PGDATA with option '--data-checksums'.");
688
689if (nodeInfo->is_superuser)
690elog(WARNING, "Current PostgreSQL role is superuser. "
691"It is not recommended to run pg_probackup under superuser.");
692
693strlcpy(current.server_version, nodeInfo->server_version_str,
694sizeof(current.server_version));
695
696return cur_conn;
697}
698
699/*
700* Entry point of pg_probackup BACKUP subcommand.
701*
702* if start_time == INVALID_BACKUP_ID then we can generate backup_id
703*/
704int
705do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params,
706bool no_validate, bool no_sync, bool backup_logs, time_t start_time)
707{
708PGconn *backup_conn = NULL;
709PGNodeInfo nodeInfo;
710time_t latest_backup_id = INVALID_BACKUP_ID;
711char pretty_bytes[20];
712
713if (!instance_config.pgdata)
714elog(ERROR, "No postgres data directory specified.\n"
715"Please specify it either using environment variable PGDATA or\n"
716"command line option --pgdata (-D)");
717
718/* Initialize PGInfonode */
719pgNodeInit(&nodeInfo);
720
721/* Save list of external directories */
722if (instance_config.external_dir_str &&
723(pg_strcasecmp(instance_config.external_dir_str, "none") != 0))
724current.external_dir_str = instance_config.external_dir_str;
725
726/* Find latest backup_id */
727{
728parray *backup_list = catalog_get_backup_list(instanceState, INVALID_BACKUP_ID);
729
730if (parray_num(backup_list) > 0)
731latest_backup_id = ((pgBackup *)parray_get(backup_list, 0))->backup_id;
732
733parray_walk(backup_list, pgBackupFree);
734parray_free(backup_list);
735}
736
737/* Try to pick backup_id and create backup directory with BACKUP_CONTROL_FILE */
738if (start_time != INVALID_BACKUP_ID)
739{
740/* If user already choosed backup_id for us, then try to use it. */
741if (start_time <= latest_backup_id)
742/* don't care about freeing base36enc_dup memory, we exit anyway */
743elog(ERROR, "Can't assign backup_id from requested start_time (%s), "
744"this time must be later that backup %s",
745base36enc(start_time), base36enc(latest_backup_id));
746
747current.backup_id = start_time;
748pgBackupInitDir(¤t, instanceState->instance_backup_subdir_path);
749}
750else
751{
752/* We can generate our own unique backup_id
753* Sometimes (when we try to backup twice in one second)
754* backup_id will be duplicated -> try more times.
755*/
756int attempts = 10;
757
758if (time(NULL) < latest_backup_id)
759elog(ERROR, "Can't assign backup_id, there is already a backup in future (%s)",
760base36enc(latest_backup_id));
761
762do
763{
764current.backup_id = time(NULL);
765pgBackupInitDir(¤t, instanceState->instance_backup_subdir_path);
766if (current.backup_id == INVALID_BACKUP_ID)
767sleep(1);
768}
769while (current.backup_id == INVALID_BACKUP_ID && attempts-- > 0);
770}
771
772/* If creation of backup dir was unsuccessful, there will be WARNINGS in logs already */
773if (current.backup_id == INVALID_BACKUP_ID)
774elog(ERROR, "Can't create backup directory");
775
776/* Update backup status and other metainfo. */
777current.status = BACKUP_STATUS_RUNNING;
778/* XXX BACKUP_ID change it when backup_id wouldn't match start_time */
779current.start_time = current.backup_id;
780
781strlcpy(current.program_version, PROGRAM_VERSION,
782sizeof(current.program_version));
783
784current.compress_alg = instance_config.compress_alg;
785current.compress_level = instance_config.compress_level;
786
787elog(INFO, "Backup start, pg_probackup version: %s, instance: %s, backup ID: %s, backup mode: %s, "
788"wal mode: %s, remote: %s, compress-algorithm: %s, compress-level: %i",
789PROGRAM_VERSION, instanceState->instance_name, backup_id_of(¤t), pgBackupGetBackupMode(¤t, false),
790current.stream ? "STREAM" : "ARCHIVE", IsSshProtocol() ? "true" : "false",
791deparse_compress_alg(current.compress_alg), current.compress_level);
792
793if (!lock_backup(¤t, true, true))
794elog(ERROR, "Cannot lock backup %s directory",
795backup_id_of(¤t));
796write_backup(¤t, true);
797
798/* set the error processing function for the backup process */
799pgut_atexit_push(backup_cleanup, NULL);
800
801elog(LOG, "Backup destination is initialized");
802
803/*
804* setup backup_conn, do some compatibility checks and
805* fill basic info about instance
806*/
807backup_conn = pgdata_basic_setup(instance_config.conn_opt, &nodeInfo);
808
809if (current.from_replica)
810elog(INFO, "Backup %s is going to be taken from standby", backup_id_of(¤t));
811
812/* TODO, print PostgreSQL full version */
813//elog(INFO, "PostgreSQL version: %s", nodeInfo.server_version_str);
814
815/*
816* Ensure that backup directory was initialized for the same PostgreSQL
817* instance we opened connection to. And that target backup database PGDATA
818* belogns to the same instance.
819*/
820check_system_identifiers(backup_conn, instance_config.pgdata);
821
822/* below perform checks specific for backup command */
823#if PG_VERSION_NUM >= 110000
824if (!RetrieveWalSegSize(backup_conn))
825elog(ERROR, "Failed to retrieve wal_segment_size");
826#endif
827
828get_ptrack_version(backup_conn, &nodeInfo);
829// elog(WARNING, "ptrack_version_num %d", ptrack_version_num);
830
831if (nodeInfo.ptrack_version_num > 0)
832nodeInfo.is_ptrack_enabled = pg_is_ptrack_enabled(backup_conn, nodeInfo.ptrack_version_num);
833
834if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
835{
836/* ptrack_version_num < 2.0 was already checked in get_ptrack_version() */
837if (nodeInfo.ptrack_version_num == 0)
838elog(ERROR, "This PostgreSQL instance does not support ptrack");
839else
840{
841if (!nodeInfo.is_ptrack_enabled)
842elog(ERROR, "Ptrack is disabled");
843}
844}
845
846if (current.from_replica && exclusive_backup)
847/* Check master connection options */
848if (instance_config.master_conn_opt.pghost == NULL)
849elog(ERROR, "Options for connection to master must be provided to perform backup from replica");
850
851/* add note to backup if requested */
852if (set_backup_params && set_backup_params->note)
853add_note(¤t, set_backup_params->note);
854
855/* backup data */
856do_backup_pg(instanceState, backup_conn, &nodeInfo, no_sync, backup_logs);
857pgut_atexit_pop(backup_cleanup, NULL);
858
859/* compute size of wal files of this backup stored in the archive */
860if (!current.stream)
861{
862XLogSegNo start_segno;
863XLogSegNo stop_segno;
864
865GetXLogSegNo(current.start_lsn, start_segno, instance_config.xlog_seg_size);
866GetXLogSegNo(current.stop_lsn, stop_segno, instance_config.xlog_seg_size);
867current.wal_bytes = (stop_segno - start_segno) * instance_config.xlog_seg_size;
868
869/*
870* If start_lsn and stop_lsn are located in the same segment, then
871* set wal_bytes to the size of 1 segment.
872*/
873if (current.wal_bytes <= 0)
874current.wal_bytes = instance_config.xlog_seg_size;
875}
876
877/* Backup is done. Update backup status */
878current.end_time = time(NULL);
879current.status = BACKUP_STATUS_DONE;
880write_backup(¤t, true);
881
882/* Pin backup if requested */
883if (set_backup_params &&
884(set_backup_params->ttl > 0 ||
885set_backup_params->expire_time > 0))
886{
887pin_backup(¤t, set_backup_params);
888}
889
890if (!no_validate)
891pgBackupValidate(¤t, NULL);
892
893/* Notify user about backup size */
894if (current.stream)
895pretty_size(current.data_bytes + current.wal_bytes, pretty_bytes, lengthof(pretty_bytes));
896else
897pretty_size(current.data_bytes, pretty_bytes, lengthof(pretty_bytes));
898elog(INFO, "Backup %s resident size: %s", backup_id_of(¤t), pretty_bytes);
899
900if (current.status == BACKUP_STATUS_OK ||
901current.status == BACKUP_STATUS_DONE)
902elog(INFO, "Backup %s completed", backup_id_of(¤t));
903else
904elog(ERROR, "Backup %s failed", backup_id_of(¤t));
905
906/*
907* After successful backup completion remove backups
908* which are expired according to retention policies
909*/
910if (delete_expired || merge_expired || delete_wal)
911do_retention(instanceState, no_validate, no_sync);
912
913return 0;
914}
915
916/*
917* Confirm that this server version is supported
918*/
919static void
920check_server_version(PGconn *conn, PGNodeInfo *nodeInfo)
921{
922PGresult *res = NULL;
923
924/* confirm server version */
925nodeInfo->server_version = PQserverVersion(conn);
926
927if (nodeInfo->server_version == 0)
928elog(ERROR, "Unknown server version %d", nodeInfo->server_version);
929
930if (nodeInfo->server_version < 100000)
931sprintf(nodeInfo->server_version_str, "%d.%d",
932nodeInfo->server_version / 10000,
933(nodeInfo->server_version / 100) % 100);
934else
935sprintf(nodeInfo->server_version_str, "%d",
936nodeInfo->server_version / 10000);
937
938if (nodeInfo->server_version < 90500)
939elog(ERROR,
940"Server version is %s, must be %s or higher",
941nodeInfo->server_version_str, "9.5");
942
943if (current.from_replica && nodeInfo->server_version < 90600)
944elog(ERROR,
945"Server version is %s, must be %s or higher for backup from replica",
946nodeInfo->server_version_str, "9.6");
947
948if (nodeInfo->pgpro_support)
949res = pgut_execute(conn, "SELECT pg_catalog.pgpro_edition()", 0, NULL);
950
951/*
952* Check major version of connected PostgreSQL and major version of
953* compiled PostgreSQL.
954*/
955#ifdef PGPRO_VERSION
956if (!res)
957{
958/* It seems we connected to PostgreSQL (not Postgres Pro) */
959if(strcmp(PGPRO_EDITION, "1C") != 0)
960{
961elog(ERROR, "%s was built with Postgres Pro %s %s, "
962"but connection is made with PostgreSQL %s",
963PROGRAM_NAME, PG_MAJORVERSION, PGPRO_EDITION, nodeInfo->server_version_str);
964}
965/* We have PostgresPro for 1C and connect to PostgreSQL or PostgresPro for 1C
966* Check the major version
967*/
968if (strcmp(nodeInfo->server_version_str, PG_MAJORVERSION) != 0)
969elog(ERROR, "%s was built with PostgrePro %s %s, but connection is made with %s",
970PROGRAM_NAME, PG_MAJORVERSION, PGPRO_EDITION, nodeInfo->server_version_str);
971}
972else
973{
974if (strcmp(nodeInfo->server_version_str, PG_MAJORVERSION) != 0 &&
975strcmp(PQgetvalue(res, 0, 0), PGPRO_EDITION) != 0)
976elog(ERROR, "%s was built with Postgres Pro %s %s, "
977"but connection is made with Postgres Pro %s %s",
978PROGRAM_NAME, PG_MAJORVERSION, PGPRO_EDITION,
979nodeInfo->server_version_str, PQgetvalue(res, 0, 0));
980}
981#else
982if (res)
983/* It seems we connected to Postgres Pro (not PostgreSQL) */
984elog(ERROR, "%s was built with PostgreSQL %s, "
985"but connection is made with Postgres Pro %s %s",
986PROGRAM_NAME, PG_MAJORVERSION,
987nodeInfo->server_version_str, PQgetvalue(res, 0, 0));
988else
989{
990if (strcmp(nodeInfo->server_version_str, PG_MAJORVERSION) != 0)
991elog(ERROR, "%s was built with PostgreSQL %s, but connection is made with %s",
992PROGRAM_NAME, PG_MAJORVERSION, nodeInfo->server_version_str);
993}
994#endif
995
996if (res)
997PQclear(res);
998
999/* Do exclusive backup only for PostgreSQL 9.5 */
1000exclusive_backup = nodeInfo->server_version < 90600;
1001}
1002
1003/*
1004* Ensure that backup directory was initialized for the same PostgreSQL
1005* instance we opened connection to. And that target backup database PGDATA
1006* belogns to the same instance.
1007* All system identifiers must be equal.
1008*/
1009void
1010check_system_identifiers(PGconn *conn, const char *pgdata)
1011{
1012uint64 system_id_conn;
1013uint64 system_id_pgdata;
1014
1015system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST, false);
1016system_id_conn = get_remote_system_identifier(conn);
1017
1018/* for checkdb check only system_id_pgdata and system_id_conn */
1019if (current.backup_mode == BACKUP_MODE_INVALID)
1020{
1021if (system_id_conn != system_id_pgdata)
1022{
1023elog(ERROR, "Data directory initialized with system id " UINT64_FORMAT ", "
1024"but connected instance system id is " UINT64_FORMAT,
1025system_id_pgdata, system_id_conn);
1026}
1027return;
1028}
1029
1030if (system_id_conn != instance_config.system_identifier)
1031elog(ERROR, "Backup data directory was initialized for system id " UINT64_FORMAT ", "
1032"but connected instance system id is " UINT64_FORMAT,
1033instance_config.system_identifier, system_id_conn);
1034
1035if (system_id_pgdata != instance_config.system_identifier)
1036elog(ERROR, "Backup data directory was initialized for system id " UINT64_FORMAT ", "
1037"but target backup directory system id is " UINT64_FORMAT,
1038instance_config.system_identifier, system_id_pgdata);
1039}
1040
1041/*
1042* Ensure that target backup database is initialized with
1043* compatible settings. Currently check BLCKSZ and XLOG_BLCKSZ.
1044*/
1045static void
1046confirm_block_size(PGconn *conn, const char *name, int blcksz)
1047{
1048PGresult *res;
1049char *endp;
1050int block_size;
1051
1052res = pgut_execute(conn, "SELECT pg_catalog.current_setting($1)", 1, &name);
1053if (PQntuples(res) != 1 || PQnfields(res) != 1)
1054elog(ERROR, "Cannot get %s: %s", name, PQerrorMessage(conn));
1055
1056block_size = strtol(PQgetvalue(res, 0, 0), &endp, 10);
1057if ((endp && *endp) || block_size != blcksz)
1058elog(ERROR,
1059"%s(%d) is not compatible(%d expected)",
1060name, block_size, blcksz);
1061
1062PQclear(res);
1063}
1064
1065/*
1066* Notify start of backup to PostgreSQL server.
1067*/
1068void
1069pg_start_backup(const char *label, bool smooth, pgBackup *backup,
1070PGNodeInfo *nodeInfo, PGconn *conn)
1071{
1072PGresult *res;
1073const char *params[2];
1074uint32 lsn_hi;
1075uint32 lsn_lo;
1076params[0] = label;
1077
1078#if PG_VERSION_NUM >= 150000
1079elog(INFO, "wait for pg_backup_start()");
1080#else
1081elog(INFO, "wait for pg_start_backup()");
1082#endif
1083
1084/* 2nd argument is 'fast'*/
1085params[1] = smooth ? "false" : "true";
1086res = pgut_execute(conn,
1087#if PG_VERSION_NUM >= 150000
1088"SELECT pg_catalog.pg_backup_start($1, $2)",
1089#else
1090"SELECT pg_catalog.pg_start_backup($1, $2, false)",
1091#endif
10922,
1093params);
1094
1095/*
1096* Set flag that pg_start_backup() was called. If an error will happen it
1097* is necessary to call pg_stop_backup() in backup_cleanup().
1098*/
1099backup_in_progress = true;
1100stop_callback_params.conn = conn;
1101stop_callback_params.server_version = nodeInfo->server_version;
1102pgut_atexit_push(backup_stopbackup_callback, &stop_callback_params);
1103
1104/* Extract timeline and LSN from results of pg_start_backup() */
1105XLogDataFromLSN(PQgetvalue(res, 0, 0), &lsn_hi, &lsn_lo);
1106/* Calculate LSN */
1107backup->start_lsn = ((uint64) lsn_hi )<< 32 | lsn_lo;
1108
1109PQclear(res);
1110
1111if ((!backup->stream || backup->backup_mode == BACKUP_MODE_DIFF_PAGE) &&
1112!backup->from_replica &&
1113!(nodeInfo->server_version < 90600 &&
1114!nodeInfo->is_superuser))
1115/*
1116* Switch to a new WAL segment. It is necessary to get archived WAL
1117* segment, which includes start LSN of current backup.
1118* Don`t do this for replica backups and for PG 9.5 if pguser is not superuser
1119* (because in 9.5 only superuser can switch WAL)
1120*/
1121pg_switch_wal(conn);
1122}
1123
1124/*
1125* Switch to a new WAL segment. It should be called only for master.
1126* For PG 9.5 it should be called only if pguser is superuser.
1127*/
1128void
1129pg_switch_wal(PGconn *conn)
1130{
1131PGresult *res;
1132
1133pg_silent_client_messages(conn);
1134
1135#if PG_VERSION_NUM >= 100000
1136res = pgut_execute(conn, "SELECT pg_catalog.pg_switch_wal()", 0, NULL);
1137#else
1138res = pgut_execute(conn, "SELECT pg_catalog.pg_switch_xlog()", 0, NULL);
1139#endif
1140
1141PQclear(res);
1142}
1143
1144/*
1145* Check if the instance is PostgresPro fork.
1146*/
1147static bool
1148pgpro_support(PGconn *conn)
1149{
1150PGresult *res;
1151
1152res = pgut_execute(conn,
1153"SELECT proname FROM pg_catalog.pg_proc WHERE proname='pgpro_edition'::name AND pronamespace='pg_catalog'::regnamespace::oid",
11540, NULL);
1155
1156if (PQresultStatus(res) == PGRES_TUPLES_OK &&
1157(PQntuples(res) == 1) &&
1158(strcmp(PQgetvalue(res, 0, 0), "pgpro_edition") == 0))
1159{
1160PQclear(res);
1161return true;
1162}
1163
1164PQclear(res);
1165return false;
1166}
1167
1168/*
1169* Fill 'datname to Oid' map
1170*
1171* This function can fail to get the map for legal reasons, e.g. missing
1172* permissions on pg_database during `backup`.
1173* As long as user do not use partial restore feature it`s fine.
1174*
1175* To avoid breaking a backward compatibility don't throw an ERROR,
1176* throw a warning instead of an error and return NULL.
1177* Caller is responsible for checking the result.
1178*/
1179parray *
1180get_database_map(PGconn *conn)
1181{
1182PGresult *res;
1183parray *database_map = NULL;
1184int i;
1185
1186/*
1187* Do not include template0 and template1 to the map
1188* as default databases that must always be restored.
1189*/
1190res = pgut_execute_extended(conn,
1191"SELECT oid, datname FROM pg_catalog.pg_database "
1192"WHERE datname NOT IN ('template1'::name, 'template0'::name)",
11930, NULL, true, true);
1194
1195/* Don't error out, simply return NULL. See comment above. */
1196if (PQresultStatus(res) != PGRES_TUPLES_OK)
1197{
1198PQclear(res);
1199elog(WARNING, "Failed to get database map: %s",
1200PQerrorMessage(conn));
1201
1202return NULL;
1203}
1204
1205/* Construct database map */
1206for (i = 0; i < PQntuples(res); i++)
1207{
1208char *datname = NULL;
1209db_map_entry *db_entry = (db_map_entry *) pgut_malloc(sizeof(db_map_entry));
1210
1211/* get Oid */
1212db_entry->dbOid = atoll(PQgetvalue(res, i, 0));
1213
1214/* get datname */
1215datname = PQgetvalue(res, i, 1);
1216db_entry->datname = pgut_malloc(strlen(datname) + 1);
1217strcpy(db_entry->datname, datname);
1218
1219if (database_map == NULL)
1220database_map = parray_new();
1221
1222parray_append(database_map, db_entry);
1223}
1224
1225return database_map;
1226}
1227
1228/* Check if ptrack is enabled in target instance */
1229static bool
1230pg_is_checksum_enabled(PGconn *conn)
1231{
1232PGresult *res_db;
1233
1234res_db = pgut_execute(conn, "SHOW data_checksums", 0, NULL);
1235
1236if (strcmp(PQgetvalue(res_db, 0, 0), "on") == 0)
1237{
1238PQclear(res_db);
1239return true;
1240}
1241PQclear(res_db);
1242return false;
1243}
1244
1245/* Check if target instance is replica */
1246static bool
1247pg_is_in_recovery(PGconn *conn)
1248{
1249PGresult *res_db;
1250
1251res_db = pgut_execute(conn, "SELECT pg_catalog.pg_is_in_recovery()", 0, NULL);
1252
1253if (PQgetvalue(res_db, 0, 0)[0] == 't')
1254{
1255PQclear(res_db);
1256return true;
1257}
1258PQclear(res_db);
1259return false;
1260}
1261
1262
1263/* Check if current PostgreSQL role is superuser */
1264static bool
1265pg_is_superuser(PGconn *conn)
1266{
1267PGresult *res;
1268
1269res = pgut_execute(conn, "SELECT pg_catalog.current_setting('is_superuser')", 0, NULL);
1270
1271if (strcmp(PQgetvalue(res, 0, 0), "on") == 0)
1272{
1273PQclear(res);
1274return true;
1275}
1276PQclear(res);
1277return false;
1278}
1279
1280/*
1281* Wait for target LSN or WAL segment, containing target LSN.
1282*
1283* Depending on value of flag in_stream_dir wait for target LSN to archived or
1284* streamed in 'archive_dir' or 'pg_wal' directory.
1285*
1286* If flag 'is_start_lsn' is set then issue warning for first-time users.
1287* If flag 'in_prev_segment' is set, look for LSN in previous segment,
1288* with EndRecPtr >= Target LSN. It should be used only for solving
1289* invalid XRecOff problem.
1290* If flag 'segment_only' is set, then, instead of waiting for LSN, wait for segment,
1291* containing that LSN.
1292* If flags 'in_prev_segment' and 'segment_only' are both set, then wait for
1293* previous segment.
1294*
1295* Flag 'in_stream_dir' determine whether we looking for WAL in 'pg_wal' directory or
1296* in archive. Do note, that we cannot rely sorely on global variable 'stream_wal' (current.stream) because,
1297* for example, PAGE backup must(!) look for start_lsn in archive regardless of wal_mode.
1298*
1299* 'timeout_elevel' determine the elevel for timeout elog message. If elevel lighter than
1300* ERROR is used, then return InvalidXLogRecPtr. TODO: return something more concrete, for example 1.
1301*
1302* Returns target LSN if such is found, failing that returns LSN of record prior to target LSN.
1303* Returns InvalidXLogRecPtr if 'segment_only' flag is used.
1304*/
1305XLogRecPtr
1306wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli,
1307bool in_prev_segment, bool segment_only,
1308int timeout_elevel, bool in_stream_dir)
1309{
1310XLogSegNo targetSegNo;
1311char wal_segment_path[MAXPGPATH],
1312wal_segment[MAXFNAMELEN];
1313bool file_exists = false;
1314uint32 try_count = 0,
1315timeout;
1316char *wal_delivery_str = in_stream_dir ? "streamed":"archived";
1317
1318#ifdef HAVE_LIBZ
1319char gz_wal_segment_path[MAXPGPATH];
1320#endif
1321
1322/* Compute the name of the WAL file containing requested LSN */
1323GetXLogSegNo(target_lsn, targetSegNo, instance_config.xlog_seg_size);
1324if (in_prev_segment)
1325targetSegNo--;
1326GetXLogFileName(wal_segment, tli, targetSegNo,
1327instance_config.xlog_seg_size);
1328
1329join_path_components(wal_segment_path, wal_segment_dir, wal_segment);
1330/*
1331* In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is
1332* stream and non-page backup. Page backup needs archived WAL files, so we
1333* wait for 'target_lsn' in archive 'wal' directory for page backups.
1334*
1335* In pg_stop_backup it depends only on stream_wal.
1336*/
1337
1338/* TODO: remove this in 3.0 (it is a cludge against some old bug with archive_timeout) */
1339if (instance_config.archive_timeout > 0)
1340timeout = instance_config.archive_timeout;
1341else
1342timeout = ARCHIVE_TIMEOUT_DEFAULT;
1343
1344if (segment_only)
1345elog(LOG, "Looking for segment: %s", wal_segment);
1346else
1347elog(LOG, "Looking for LSN %X/%X in segment: %s",
1348(uint32) (target_lsn >> 32), (uint32) target_lsn, wal_segment);
1349
1350#ifdef HAVE_LIBZ
1351snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
1352wal_segment_path);
1353#endif
1354
1355/* Wait until target LSN is archived or streamed */
1356while (true)
1357{
1358if (!file_exists)
1359{
1360file_exists = fileExists(wal_segment_path, FIO_BACKUP_HOST);
1361
1362/* Try to find compressed WAL file */
1363if (!file_exists)
1364{
1365#ifdef HAVE_LIBZ
1366file_exists = fileExists(gz_wal_segment_path, FIO_BACKUP_HOST);
1367if (file_exists)
1368elog(LOG, "Found compressed WAL segment: %s", wal_segment_path);
1369#endif
1370}
1371else
1372elog(LOG, "Found WAL segment: %s", wal_segment_path);
1373}
1374
1375if (file_exists)
1376{
1377/* Do not check for target LSN */
1378if (segment_only)
1379return InvalidXLogRecPtr;
1380
1381/*
1382* A WAL segment found. Look for target LSN in it.
1383*/
1384if (!XRecOffIsNull(target_lsn) &&
1385wal_contains_lsn(wal_segment_dir, target_lsn, tli,
1386instance_config.xlog_seg_size))
1387/* Target LSN was found */
1388{
1389elog(LOG, "Found LSN: %X/%X", (uint32) (target_lsn >> 32), (uint32) target_lsn);
1390return target_lsn;
1391}
1392
1393/*
1394* If we failed to get target LSN in a reasonable time, try
1395* to get LSN of last valid record prior to the target LSN. But only
1396* in case of a backup from a replica.
1397* Note, that with NullXRecOff target_lsn we do not wait
1398* for 'timeout / 2' seconds before going for previous record,
1399* because such LSN cannot be delivered at all.
1400*
1401* There are two cases for this:
1402* 1. Replica returned readpoint LSN which just do not exists. We want to look
1403* for previous record in the same(!) WAL segment which endpoint points to this LSN.
1404* 2. Replica returened endpoint LSN with NullXRecOff. We want to look
1405* for previous record which endpoint points greater or equal LSN in previous WAL segment.
1406*/
1407if (current.from_replica &&
1408(XRecOffIsNull(target_lsn) || try_count > timeout / 2))
1409{
1410XLogRecPtr res;
1411
1412res = get_prior_record_lsn(wal_segment_dir, current.start_lsn, target_lsn, tli,
1413in_prev_segment, instance_config.xlog_seg_size);
1414
1415if (!XLogRecPtrIsInvalid(res))
1416{
1417/* LSN of the prior record was found */
1418elog(LOG, "Found prior LSN: %X/%X",
1419(uint32) (res >> 32), (uint32) res);
1420return res;
1421}
1422}
1423}
1424
1425sleep(1);
1426if (interrupted || thread_interrupted)
1427elog(ERROR, "Interrupted during waiting for WAL %s", in_stream_dir ? "streaming" : "archiving");
1428try_count++;
1429
1430/* Inform user if WAL segment is absent in first attempt */
1431if (try_count == 1)
1432{
1433if (segment_only)
1434elog(INFO, "Wait for WAL segment %s to be %s",
1435wal_segment_path, wal_delivery_str);
1436else
1437elog(INFO, "Wait for LSN %X/%X in %s WAL segment %s",
1438(uint32) (target_lsn >> 32), (uint32) target_lsn,
1439wal_delivery_str, wal_segment_path);
1440}
1441
1442if (!current.stream && is_start_lsn && try_count == 30)
1443elog(WARNING, "By default pg_probackup assumes that WAL delivery method to be ARCHIVE. "
1444"If continuous archiving is not set up, use '--stream' option to make autonomous backup. "
1445"Otherwise check that continuous archiving works correctly.");
1446
1447if (timeout > 0 && try_count > timeout)
1448{
1449if (file_exists)
1450elog(timeout_elevel, "WAL segment %s was %s, "
1451"but target LSN %X/%X could not be %s in %d seconds",
1452wal_segment, wal_delivery_str,
1453(uint32) (target_lsn >> 32), (uint32) target_lsn,
1454wal_delivery_str, timeout);
1455/* If WAL segment doesn't exist or we wait for previous segment */
1456else
1457elog(timeout_elevel,
1458"WAL segment %s could not be %s in %d seconds",
1459wal_segment, wal_delivery_str, timeout);
1460
1461return InvalidXLogRecPtr;
1462}
1463}
1464}
1465
1466/*
1467* Check stop_lsn (returned from pg_stop_backup()) and update backup->stop_lsn
1468*/
1469void
1470wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup)
1471{
1472bool stop_lsn_exists = false;
1473
1474/* It is ok for replica to return invalid STOP LSN
1475* UPD: Apparently it is ok even for a master.
1476*/
1477if (!XRecOffIsValid(stop_lsn))
1478{
1479XLogSegNo segno = 0;
1480XLogRecPtr lsn_tmp = InvalidXLogRecPtr;
1481
1482/*
1483* Even though the value is invalid, it's expected postgres behaviour
1484* and we're trying to fix it below.
1485*/
1486elog(LOG, "Invalid offset in stop_lsn value %X/%X, trying to fix",
1487(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
1488
1489/*
1490* Note: even with gdb it is very hard to produce automated tests for
1491* contrecord + invalid LSN, so emulate it for manual testing.
1492*/
1493//lsn = lsn - XLOG_SEG_SIZE;
1494//elog(WARNING, "New Invalid stop_backup_lsn value %X/%X",
1495// (uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
1496
1497GetXLogSegNo(stop_lsn, segno, instance_config.xlog_seg_size);
1498
1499/*
1500* Note, that there is no guarantee that corresponding WAL file even exists.
1501* Replica may return LSN from future and keep staying in present.
1502* Or it can return invalid LSN.
1503*
1504* That's bad, since we want to get real LSN to save it in backup label file
1505* and to use it in WAL validation.
1506*
1507* So we try to do the following:
1508* 1. Wait 'archive_timeout' seconds for segment containing stop_lsn and
1509* look for the first valid record in it.
1510* It solves the problem of occasional invalid LSN on write-busy system.
1511* 2. Failing that, look for record in previous segment with endpoint
1512* equal or greater than stop_lsn. It may(!) solve the problem of invalid LSN
1513* on write-idle system. If that fails too, error out.
1514*/
1515
1516/* stop_lsn is pointing to a 0 byte of xlog segment */
1517if (stop_lsn % instance_config.xlog_seg_size == 0)
1518{
1519/* Wait for segment with current stop_lsn, it is ok for it to never arrive */
1520wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli,
1521false, true, WARNING, backup->stream);
1522
1523/* Get the first record in segment with current stop_lsn */
1524lsn_tmp = get_first_record_lsn(xlog_path, segno, backup->tli,
1525instance_config.xlog_seg_size,
1526instance_config.archive_timeout);
1527
1528/* Check that returned LSN is valid and greater than stop_lsn */
1529if (XLogRecPtrIsInvalid(lsn_tmp) ||
1530!XRecOffIsValid(lsn_tmp) ||
1531lsn_tmp < stop_lsn)
1532{
1533/* Backup from master should error out here */
1534if (!backup->from_replica)
1535elog(ERROR, "Failed to get next WAL record after %X/%X",
1536(uint32) (stop_lsn >> 32),
1537(uint32) (stop_lsn));
1538
1539/* No luck, falling back to looking up for previous record */
1540elog(WARNING, "Failed to get next WAL record after %X/%X, "
1541"looking for previous WAL record",
1542(uint32) (stop_lsn >> 32),
1543(uint32) (stop_lsn));
1544
1545/* Despite looking for previous record there is not guarantee of success
1546* because previous record can be the contrecord.
1547*/
1548lsn_tmp = wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli,
1549true, false, ERROR, backup->stream);
1550
1551/* sanity */
1552if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp))
1553elog(ERROR, "Failed to get WAL record prior to %X/%X",
1554(uint32) (stop_lsn >> 32),
1555(uint32) (stop_lsn));
1556}
1557}
1558/* stop lsn is aligned to xlog block size, just find next lsn */
1559else if (stop_lsn % XLOG_BLCKSZ == 0)
1560{
1561/* Wait for segment with current stop_lsn */
1562wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli,
1563false, true, ERROR, backup->stream);
1564
1565/* Get the next closest record in segment with current stop_lsn */
1566lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli,
1567instance_config.xlog_seg_size,
1568instance_config.archive_timeout,
1569stop_lsn);
1570
1571/* sanity */
1572if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp))
1573elog(ERROR, "Failed to get WAL record next to %X/%X",
1574(uint32) (stop_lsn >> 32),
1575(uint32) (stop_lsn));
1576}
1577/* PostgreSQL returned something very illegal as STOP_LSN, error out */
1578else
1579elog(ERROR, "Invalid stop_backup_lsn value %X/%X",
1580(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
1581
1582/* Setting stop_backup_lsn will set stop point for streaming */
1583stop_backup_lsn = lsn_tmp;
1584stop_lsn_exists = true;
1585}
1586
1587elog(INFO, "stop_lsn: %X/%X",
1588(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
1589
1590/*
1591* Wait for stop_lsn to be archived or streamed.
1592* If replica returned valid STOP_LSN of not actually existing record,
1593* look for previous record with endpoint >= STOP_LSN.
1594*/
1595if (!stop_lsn_exists)
1596stop_backup_lsn = wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli,
1597false, false, ERROR, backup->stream);
1598
1599backup->stop_lsn = stop_backup_lsn;
1600}
1601
1602/* Remove annoying NOTICE messages generated by backend */
1603void
1604pg_silent_client_messages(PGconn *conn)
1605{
1606PGresult *res;
1607res = pgut_execute(conn, "SET client_min_messages = warning;",
16080, NULL);
1609PQclear(res);
1610}
1611
1612void
1613pg_create_restore_point(PGconn *conn, time_t backup_start_time)
1614{
1615PGresult *res;
1616const char *params[1];
1617char name[1024];
1618
1619snprintf(name, lengthof(name), "pg_probackup, backup_id %s",
1620base36enc(backup_start_time));
1621params[0] = name;
1622
1623res = pgut_execute(conn, "SELECT pg_catalog.pg_create_restore_point($1)",
16241, params);
1625PQclear(res);
1626}
1627
1628void
1629pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica, bool is_exclusive, char **query_text)
1630{
1631static const char
1632stop_exlusive_backup_query[] =
1633/*
1634* Stop the non-exclusive backup. Besides stop_lsn it returns from
1635* pg_stop_backup(false) copy of the backup label and tablespace map
1636* so they can be written to disk by the caller.
1637* TODO, question: add NULLs as backup_label and tablespace_map?
1638*/
1639"SELECT"
1640" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1641" current_timestamp(0)::timestamptz,"
1642" pg_catalog.pg_stop_backup() as lsn",
1643stop_backup_on_master_query[] =
1644"SELECT"
1645" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1646" current_timestamp(0)::timestamptz,"
1647" lsn,"
1648" labelfile,"
1649" spcmapfile"
1650" FROM pg_catalog.pg_stop_backup(false, false)",
1651stop_backup_on_master_before10_query[] =
1652"SELECT"
1653" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1654" current_timestamp(0)::timestamptz,"
1655" lsn,"
1656" labelfile,"
1657" spcmapfile"
1658" FROM pg_catalog.pg_stop_backup(false)",
1659stop_backup_on_master_after15_query[] =
1660"SELECT"
1661" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1662" current_timestamp(0)::timestamptz,"
1663" lsn,"
1664" labelfile,"
1665" spcmapfile"
1666" FROM pg_catalog.pg_backup_stop(false)",
1667/*
1668* In case of backup from replica >= 9.6 we do not trust minRecPoint
1669* and stop_backup LSN, so we use latest replayed LSN as STOP LSN.
1670*/
1671stop_backup_on_replica_query[] =
1672"SELECT"
1673" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1674" current_timestamp(0)::timestamptz,"
1675" pg_catalog.pg_last_wal_replay_lsn(),"
1676" labelfile,"
1677" spcmapfile"
1678" FROM pg_catalog.pg_stop_backup(false, false)",
1679stop_backup_on_replica_before10_query[] =
1680"SELECT"
1681" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1682" current_timestamp(0)::timestamptz,"
1683" pg_catalog.pg_last_xlog_replay_location(),"
1684" labelfile,"
1685" spcmapfile"
1686" FROM pg_catalog.pg_stop_backup(false)",
1687stop_backup_on_replica_after15_query[] =
1688"SELECT"
1689" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
1690" current_timestamp(0)::timestamptz,"
1691" pg_catalog.pg_last_wal_replay_lsn(),"
1692" labelfile,"
1693" spcmapfile"
1694" FROM pg_catalog.pg_backup_stop(false)";
1695
1696const char * const stop_backup_query =
1697is_exclusive ?
1698stop_exlusive_backup_query :
1699server_version >= 150000 ?
1700(is_started_on_replica ?
1701stop_backup_on_replica_after15_query :
1702stop_backup_on_master_after15_query
1703) :
1704(server_version >= 100000 ?
1705(is_started_on_replica ?
1706stop_backup_on_replica_query :
1707stop_backup_on_master_query
1708) :
1709(is_started_on_replica ?
1710stop_backup_on_replica_before10_query :
1711stop_backup_on_master_before10_query
1712)
1713);
1714bool sent = false;
1715
1716/* Make proper timestamp format for parse_time(recovery_time) */
1717pgut_execute(conn, "SET datestyle = 'ISO, DMY';", 0, NULL);
1718// TODO: check result
1719
1720/*
1721* send pg_stop_backup asynchronously because we could came
1722* here from backup_cleanup() after some error caused by
1723* postgres archive_command problem and in this case we will
1724* wait for pg_stop_backup() forever.
1725*/
1726sent = pgut_send(conn, stop_backup_query, 0, NULL, WARNING);
1727if (!sent)
1728#if PG_VERSION_NUM >= 150000
1729elog(ERROR, "Failed to send pg_backup_stop query");
1730#else
1731elog(ERROR, "Failed to send pg_stop_backup query");
1732#endif
1733
1734/* After we have sent pg_stop_backup, we don't need this callback anymore */
1735pgut_atexit_pop(backup_stopbackup_callback, &stop_callback_params);
1736
1737if (query_text)
1738*query_text = pgut_strdup(stop_backup_query);
1739}
1740
1741/*
1742* pg_stop_backup_consume -- get 'pg_stop_backup' query results
1743* side effects:
1744* - allocates memory for tablespace_map and backup_label contents, so it must freed by caller (if its not null)
1745* parameters:
1746* -
1747*/
1748void
1749pg_stop_backup_consume(PGconn *conn, int server_version,
1750bool is_exclusive, uint32 timeout, const char *query_text,
1751PGStopBackupResult *result)
1752{
1753PGresult *query_result;
1754uint32 pg_stop_backup_timeout = 0;
1755enum stop_backup_query_result_column_numbers {
1756recovery_xid_colno = 0,
1757recovery_time_colno,
1758lsn_colno,
1759backup_label_colno,
1760tablespace_map_colno
1761};
1762
1763/* and now wait */
1764while (1)
1765{
1766if (!PQconsumeInput(conn))
1767elog(ERROR, "pg_stop backup() failed: %s",
1768PQerrorMessage(conn));
1769
1770if (PQisBusy(conn))
1771{
1772pg_stop_backup_timeout++;
1773sleep(1);
1774
1775if (interrupted)
1776{
1777pgut_cancel(conn);
1778#if PG_VERSION_NUM >= 150000
1779elog(ERROR, "Interrupted during waiting for pg_backup_stop");
1780#else
1781elog(ERROR, "Interrupted during waiting for pg_stop_backup");
1782#endif
1783}
1784
1785if (pg_stop_backup_timeout == 1)
1786elog(INFO, "wait for pg_stop_backup()");
1787
1788/*
1789* If postgres haven't answered in archive_timeout seconds,
1790* send an interrupt.
1791*/
1792if (pg_stop_backup_timeout > timeout)
1793{
1794pgut_cancel(conn);
1795#if PG_VERSION_NUM >= 150000
1796elog(ERROR, "pg_backup_stop doesn't answer in %d seconds, cancel it", timeout);
1797#else
1798elog(ERROR, "pg_stop_backup doesn't answer in %d seconds, cancel it", timeout);
1799#endif
1800}
1801}
1802else
1803{
1804query_result = PQgetResult(conn);
1805break;
1806}
1807}
1808
1809/* Check successfull execution of pg_stop_backup() */
1810if (!query_result)
1811#if PG_VERSION_NUM >= 150000
1812elog(ERROR, "pg_backup_stop() failed");
1813#else
1814elog(ERROR, "pg_stop_backup() failed");
1815#endif
1816else
1817{
1818switch (PQresultStatus(query_result))
1819{
1820/*
1821* We should expect only PGRES_TUPLES_OK since pg_stop_backup
1822* returns tuples.
1823*/
1824case PGRES_TUPLES_OK:
1825break;
1826default:
1827elog(ERROR, "Query failed: %s query was: %s",
1828PQerrorMessage(conn), query_text);
1829}
1830backup_in_progress = false;
1831elog(INFO, "pg_stop backup() successfully executed");
1832}
1833
1834/* get results and fill result structure */
1835/* get&check recovery_xid */
1836if (sscanf(PQgetvalue(query_result, 0, recovery_xid_colno), XID_FMT, &result->snapshot_xid) != 1)
1837elog(ERROR,
1838"Result of txid_snapshot_xmax() is invalid: %s",
1839PQgetvalue(query_result, 0, recovery_xid_colno));
1840
1841/* get&check recovery_time */
1842if (!parse_time(PQgetvalue(query_result, 0, recovery_time_colno), &result->invocation_time, true))
1843elog(ERROR,
1844"Result of current_timestamp is invalid: %s",
1845PQgetvalue(query_result, 0, recovery_time_colno));
1846
1847/* get stop_backup_lsn */
1848{
1849uint32 lsn_hi;
1850uint32 lsn_lo;
1851
1852// char *target_lsn = "2/F578A000";
1853// XLogDataFromLSN(target_lsn, &lsn_hi, &lsn_lo);
1854
1855/* Extract timeline and LSN from results of pg_stop_backup() */
1856XLogDataFromLSN(PQgetvalue(query_result, 0, lsn_colno), &lsn_hi, &lsn_lo);
1857/* Calculate LSN */
1858result->lsn = ((uint64) lsn_hi) << 32 | lsn_lo;
1859}
1860
1861/* get backup_label_content */
1862result->backup_label_content = NULL;
1863// if (!PQgetisnull(query_result, 0, backup_label_colno))
1864if (!is_exclusive)
1865{
1866result->backup_label_content_len = PQgetlength(query_result, 0, backup_label_colno);
1867if (result->backup_label_content_len > 0)
1868result->backup_label_content = pgut_strndup(PQgetvalue(query_result, 0, backup_label_colno),
1869result->backup_label_content_len);
1870} else {
1871result->backup_label_content_len = 0;
1872}
1873
1874/* get tablespace_map_content */
1875result->tablespace_map_content = NULL;
1876// if (!PQgetisnull(query_result, 0, tablespace_map_colno))
1877if (!is_exclusive)
1878{
1879result->tablespace_map_content_len = PQgetlength(query_result, 0, tablespace_map_colno);
1880if (result->tablespace_map_content_len > 0)
1881result->tablespace_map_content = pgut_strndup(PQgetvalue(query_result, 0, tablespace_map_colno),
1882result->tablespace_map_content_len);
1883} else {
1884result->tablespace_map_content_len = 0;
1885}
1886}
1887
1888/*
1889* helper routine used to write backup_label and tablespace_map in pg_stop_backup()
1890*/
1891void
1892pg_stop_backup_write_file_helper(const char *path, const char *filename, const char *error_msg_filename,
1893const void *data, size_t len, parray *file_list)
1894{
1895FILE *fp;
1896pgFile *file;
1897char full_filename[MAXPGPATH];
1898
1899join_path_components(full_filename, path, filename);
1900fp = fio_fopen(full_filename, PG_BINARY_W, FIO_BACKUP_HOST);
1901if (fp == NULL)
1902elog(ERROR, "Can't open %s file \"%s\": %s",
1903error_msg_filename, full_filename, strerror(errno));
1904
1905if (fio_fwrite(fp, data, len) != len ||
1906fio_fflush(fp) != 0 ||
1907fio_fclose(fp))
1908elog(ERROR, "Can't write %s file \"%s\": %s",
1909error_msg_filename, full_filename, strerror(errno));
1910
1911/*
1912* It's vital to check if files_list is initialized,
1913* because we could get here because the backup was interrupted
1914*/
1915if (file_list)
1916{
1917file = pgFileNew(full_filename, filename, true, 0,
1918FIO_BACKUP_HOST);
1919
1920if (S_ISREG(file->mode))
1921{
1922file->crc = pgFileGetCRC(full_filename, true, false);
1923
1924file->write_size = file->size;
1925file->uncompressed_size = file->size;
1926}
1927parray_append(file_list, file);
1928}
1929}
1930
1931/*
1932* Notify end of backup to PostgreSQL server.
1933*/
1934static void
1935pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn,
1936PGNodeInfo *nodeInfo)
1937{
1938PGStopBackupResult stop_backup_result;
1939char *xlog_path, stream_xlog_path[MAXPGPATH];
1940/* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */
1941int timeout = (instance_config.archive_timeout > 0) ?
1942instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT;
1943char *query_text = NULL;
1944
1945/* Remove it ? */
1946if (!backup_in_progress)
1947elog(ERROR, "Backup is not in progress");
1948
1949pg_silent_client_messages(pg_startbackup_conn);
1950
1951/* Create restore point
1952* Only if backup is from master.
1953* For PG 9.5 create restore point only if pguser is superuser.
1954*/
1955if (!backup->from_replica &&
1956!(nodeInfo->server_version < 90600 &&
1957!nodeInfo->is_superuser)) //TODO: check correctness
1958pg_create_restore_point(pg_startbackup_conn, backup->start_time);
1959
1960/* Execute pg_stop_backup using PostgreSQL connection */
1961pg_stop_backup_send(pg_startbackup_conn, nodeInfo->server_version, backup->from_replica, exclusive_backup, &query_text);
1962
1963/*
1964* Wait for the result of pg_stop_backup(), but no longer than
1965* archive_timeout seconds
1966*/
1967pg_stop_backup_consume(pg_startbackup_conn, nodeInfo->server_version, exclusive_backup, timeout, query_text, &stop_backup_result);
1968
1969if (backup->stream)
1970{
1971join_path_components(stream_xlog_path, backup->database_dir, PG_XLOG_DIR);
1972xlog_path = stream_xlog_path;
1973}
1974else
1975xlog_path = instanceState->instance_wal_subdir_path;
1976
1977wait_wal_and_calculate_stop_lsn(xlog_path, stop_backup_result.lsn, backup);
1978
1979/* Write backup_label and tablespace_map */
1980if (!exclusive_backup)
1981{
1982Assert(stop_backup_result.backup_label_content != NULL);
1983
1984/* Write backup_label */
1985pg_stop_backup_write_file_helper(backup->database_dir, PG_BACKUP_LABEL_FILE, "backup label",
1986stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
1987backup_files_list);
1988free(stop_backup_result.backup_label_content);
1989stop_backup_result.backup_label_content = NULL;
1990stop_backup_result.backup_label_content_len = 0;
1991
1992/* Write tablespace_map */
1993if (stop_backup_result.tablespace_map_content != NULL)
1994{
1995pg_stop_backup_write_file_helper(backup->database_dir, PG_TABLESPACE_MAP_FILE, "tablespace map",
1996stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len,
1997backup_files_list);
1998free(stop_backup_result.tablespace_map_content);
1999stop_backup_result.tablespace_map_content = NULL;
2000stop_backup_result.tablespace_map_content_len = 0;
2001}
2002}
2003
2004if (backup->stream)
2005{
2006/* This function will also add list of xlog files
2007* to the passed filelist */
2008if(wait_WAL_streaming_end(backup_files_list))
2009elog(ERROR, "WAL streaming failed");
2010}
2011
2012backup->recovery_xid = stop_backup_result.snapshot_xid;
2013
2014elog(INFO, "Getting the Recovery Time from WAL");
2015
2016/* iterate over WAL from stop_backup lsn to start_backup lsn */
2017if (!read_recovery_info(xlog_path, backup->tli,
2018instance_config.xlog_seg_size,
2019backup->start_lsn, backup->stop_lsn,
2020&backup->recovery_time))
2021{
2022elog(INFO, "Failed to find Recovery Time in WAL, forced to trust current_timestamp");
2023backup->recovery_time = stop_backup_result.invocation_time;
2024}
2025
2026/* Cleanup */
2027pg_free(query_text);
2028}
2029
2030/*
2031* Notify end of backup to server when "backup_label" is in the root directory
2032* of the DB cluster.
2033* Also update backup status to ERROR when the backup is not finished.
2034*/
2035static void
2036backup_cleanup(bool fatal, void *userdata)
2037{
2038/*
2039* Update status of backup in BACKUP_CONTROL_FILE to ERROR.
2040* end_time != 0 means backup finished
2041*/
2042if (current.status == BACKUP_STATUS_RUNNING && current.end_time == 0)
2043{
2044elog(WARNING, "Backup %s is running, setting its status to ERROR",
2045backup_id_of(¤t));
2046current.end_time = time(NULL);
2047current.status = BACKUP_STATUS_ERROR;
2048write_backup(¤t, true);
2049}
2050}
2051
2052/*
2053* Take a backup of the PGDATA at a file level.
2054* Copy all directories and files listed in backup_files_list.
2055* If the file is 'datafile' (regular relation's main fork), read it page by page,
2056* verify checksum and copy.
2057* In incremental backup mode, copy only files or datafiles' pages changed after
2058* previous backup.
2059*/
2060static void *
2061backup_files(void *arg)
2062{
2063int i;
2064static time_t prev_time;
2065
2066backup_files_arg *arguments = (backup_files_arg *) arg;
2067int n_backup_files_list = parray_num(arguments->files_list);
2068
2069prev_time = current.start_time;
2070
2071/* backup a file */
2072for (i = 0; i < n_backup_files_list; i++)
2073{
2074pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
2075
2076/* We have already copied all directories */
2077if (S_ISDIR(file->mode))
2078continue;
2079
2080if (arguments->thread_num == 1)
2081{
2082/* update backup_content.control every 60 seconds */
2083if ((difftime(time(NULL), prev_time)) > 60)
2084{
2085write_backup_filelist(¤t, arguments->files_list, arguments->from_root,
2086arguments->external_dirs, false);
2087/* update backup control file to update size info */
2088write_backup(¤t, true);
2089
2090prev_time = time(NULL);
2091}
2092}
2093
2094if (file->skip_cfs_nested)
2095continue;
2096
2097if (!pg_atomic_test_set_flag(&file->lock))
2098continue;
2099
2100/* check for interrupt */
2101if (interrupted || thread_interrupted)
2102elog(ERROR, "Interrupted during backup");
2103
2104elog(progress ? INFO : LOG, "Progress: (%d/%d). Process file \"%s\"",
2105i + 1, n_backup_files_list, file->rel_path);
2106
2107if (file->is_cfs)
2108{
2109backup_cfs_segment(i, file, arguments);
2110}
2111else
2112{
2113process_file(i, file, arguments);
2114}
2115}
2116
2117/* ssh connection to longer needed */
2118fio_disconnect();
2119
2120/* Data files transferring is successful */
2121arguments->ret = 0;
2122
2123return NULL;
2124}
2125
2126static void
2127process_file(int i, pgFile *file, backup_files_arg *arguments)
2128{
2129char from_fullpath[MAXPGPATH];
2130char to_fullpath[MAXPGPATH];
2131pgFile *prev_file = NULL;
2132
2133elog(progress ? INFO : LOG, "Progress: (%d/%zu). Process file \"%s\"",
2134i + 1, parray_num(arguments->files_list), file->rel_path);
2135
2136/* Handle zero sized files */
2137if (file->size == 0)
2138{
2139file->write_size = 0;
2140return;
2141}
2142
2143/* construct from_fullpath & to_fullpath */
2144if (file->external_dir_num == 0)
2145{
2146join_path_components(from_fullpath, arguments->from_root, file->rel_path);
2147join_path_components(to_fullpath, arguments->to_root, file->rel_path);
2148}
2149else
2150{
2151char external_dst[MAXPGPATH];
2152char *external_path = parray_get(arguments->external_dirs,
2153file->external_dir_num - 1);
2154
2155makeExternalDirPathByNum(external_dst,
2156arguments->external_prefix,
2157file->external_dir_num);
2158
2159join_path_components(to_fullpath, external_dst, file->rel_path);
2160join_path_components(from_fullpath, external_path, file->rel_path);
2161}
2162
2163/* Encountered some strange beast */
2164if (!S_ISREG(file->mode))
2165{
2166elog(WARNING, "Unexpected type %d of file \"%s\", skipping",
2167file->mode, from_fullpath);
2168return;
2169}
2170
2171/* Check that file exist in previous backup */
2172if (current.backup_mode != BACKUP_MODE_FULL)
2173{
2174pgFile **prevFileTmp = NULL;
2175prevFileTmp = (pgFile **) parray_bsearch(arguments->prev_filelist,
2176file, pgFileCompareRelPathWithExternal);
2177if (prevFileTmp)
2178{
2179/* File exists in previous backup */
2180file->exists_in_prev = true;
2181prev_file = *prevFileTmp;
2182}
2183}
2184
2185/* backup file */
2186if (file->is_datafile && !file->is_cfs)
2187{
2188backup_data_file(file, from_fullpath, to_fullpath,
2189arguments->prev_start_lsn,
2190current.backup_mode,
2191instance_config.compress_alg,
2192instance_config.compress_level,
2193arguments->nodeInfo->checksum_version,
2194arguments->hdr_map, false);
2195}
2196else
2197{
2198backup_non_data_file(file, prev_file, from_fullpath, to_fullpath,
2199current.backup_mode, current.parent_backup, true);
2200}
2201
2202if (file->write_size == FILE_NOT_FOUND)
2203return;
2204
2205if (file->write_size == BYTES_INVALID)
2206{
2207elog(LOG, "Skipping the unchanged file: \"%s\"", from_fullpath);
2208return;
2209}
2210
2211elog(LOG, "File \"%s\". Copied "INT64_FORMAT " bytes",
2212from_fullpath, file->write_size);
2213
2214}
2215
2216static void
2217backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments) {
2218pgFile *data_file = file;
2219pgFile *cfm_file = NULL;
2220pgFile *data_bck_file = NULL;
2221pgFile *cfm_bck_file = NULL;
2222
2223while (data_file->cfs_chain)
2224{
2225data_file = data_file->cfs_chain;
2226if (data_file->forkName == cfm)
2227cfm_file = data_file;
2228if (data_file->forkName == cfs_bck)
2229data_bck_file = data_file;
2230if (data_file->forkName == cfm_bck)
2231cfm_bck_file = data_file;
2232}
2233data_file = file;
2234if (data_file->relOid >= FirstNormalObjectId && cfm_file == NULL)
2235{
2236elog(ERROR, "'CFS' file '%s' have to have '%s.cfm' companion file",
2237data_file->rel_path, data_file->name);
2238}
2239
2240elog(LOG, "backup CFS segment %s, data_file=%s, cfm_file=%s, data_bck_file=%s, cfm_bck_file=%s",
2241data_file->name, data_file->name, cfm_file->name, data_bck_file == NULL? "NULL": data_bck_file->name, cfm_bck_file == NULL? "NULL": cfm_bck_file->name);
2242
2243/* storing cfs segment. processing corner case [PBCKP-287] stage 1.
2244* - when we do have data_bck_file we should skip both data_bck_file and cfm_bck_file if exists.
2245* they are removed by cfs_recover() during postgres start.
2246*/
2247if (data_bck_file)
2248{
2249if (cfm_bck_file)
2250cfm_bck_file->write_size = FILE_NOT_FOUND;
2251data_bck_file->write_size = FILE_NOT_FOUND;
2252}
2253/* else we store cfm_bck_file. processing corner case [PBCKP-287] stage 2.
2254* - when we do have cfm_bck_file only we should store it.
2255* it will replace cfm_file after postgres start.
2256*/
2257else if (cfm_bck_file)
2258process_file(i, cfm_bck_file, arguments);
2259
2260/* storing cfs segment in order cfm_file -> datafile to guarantee their consistency */
2261/* cfm_file could be NULL for system tables. But we don't clear is_cfs flag
2262* for compatibility with older pg_probackup. */
2263if (cfm_file)
2264process_file(i, cfm_file, arguments);
2265process_file(i, data_file, arguments);
2266elog(LOG, "Backup CFS segment %s done", data_file->name);
2267}
2268
2269/*
2270* Extract information about files in backup_list parsing their names:
2271* - remove temp tables from the list
2272* - remove unlogged tables from the list (leave the _init fork)
2273* - set flags for database directories
2274* - set flags for datafiles
2275*/
2276void
2277parse_filelist_filenames(parray *files, const char *root)
2278{
2279size_t i = 0;
2280Oid unlogged_file_reloid = 0;
2281
2282while (i < parray_num(files))
2283{
2284pgFile *file = (pgFile *) parray_get(files, i);
2285int sscanf_result;
2286
2287if (S_ISREG(file->mode) &&
2288path_is_prefix_of_path(PG_TBLSPC_DIR, file->rel_path))
2289{
2290/*
2291* Found file in pg_tblspc/tblsOid/TABLESPACE_VERSION_DIRECTORY
2292* Legal only in case of 'pg_compression'
2293*/
2294if (strcmp(file->name, "pg_compression") == 0)
2295{
2296/* processing potential cfs tablespace */
2297Oid tblspcOid;
2298Oid dbOid;
2299char tmp_rel_path[MAXPGPATH];
2300/*
2301* Check that pg_compression is located under
2302* TABLESPACE_VERSION_DIRECTORY
2303*/
2304sscanf_result = sscanf(file->rel_path, PG_TBLSPC_DIR "/%u/%s/%u",
2305&tblspcOid, tmp_rel_path, &dbOid);
2306
2307/* Yes, it is */
2308if (sscanf_result == 2 &&
2309strncmp(tmp_rel_path, TABLESPACE_VERSION_DIRECTORY,
2310strlen(TABLESPACE_VERSION_DIRECTORY)) == 0) {
2311/* rewind index to the beginning of cfs tablespace */
2312rewind_and_mark_cfs_datafiles(files, root, file->rel_path, i);
2313}
2314}
2315}
2316
2317if (S_ISREG(file->mode) && file->tblspcOid != 0 &&
2318file->name && file->name[0])
2319{
2320if (file->forkName == init)
2321{
2322/*
2323* Do not backup files of unlogged relations.
2324* scan filelist backward and exclude these files.
2325*/
2326int unlogged_file_num = i - 1;
2327pgFile *unlogged_file = (pgFile *) parray_get(files,
2328unlogged_file_num);
2329
2330unlogged_file_reloid = file->relOid;
2331
2332while (unlogged_file_num >= 0 &&
2333(unlogged_file_reloid != 0) &&
2334(unlogged_file->relOid == unlogged_file_reloid))
2335{
2336/* flagged to remove from list on stage 2 */
2337unlogged_file->remove_from_list = true;
2338
2339unlogged_file_num--;
2340
2341unlogged_file = (pgFile *) parray_get(files,
2342unlogged_file_num);
2343}
2344}
2345}
2346
2347i++;
2348}
2349
2350/* stage 2. clean up from temporary tables */
2351parray_remove_if(files, remove_excluded_files_criterion, NULL, pgFileFree);
2352}
2353
2354static bool
2355remove_excluded_files_criterion(void *value, void *exclude_args) {
2356pgFile *file = (pgFile*)value;
2357return file->remove_from_list;
2358}
2359
2360static uint32
2361hash_rel_seg(pgFile* file)
2362{
2363uint32 hash = hash_mix32_2(file->relOid, file->segno);
2364return hash_mix32_2(hash, 0xcf5);
2365}
2366
2367/* If file is equal to pg_compression, then we consider this tablespace as
2368* cfs-compressed and should mark every file in this tablespace as cfs-file
2369* Setting is_cfs is done via going back through 'files' set every file
2370* that contain cfs_tablespace in his path as 'is_cfs'
2371* Goings back through array 'files' is valid option possible because of current
2372* sort rules:
2373* tblspcOid/TABLESPACE_VERSION_DIRECTORY
2374* tblspcOid/TABLESPACE_VERSION_DIRECTORY/dboid
2375* tblspcOid/TABLESPACE_VERSION_DIRECTORY/dboid/1
2376* tblspcOid/TABLESPACE_VERSION_DIRECTORY/dboid/1.cfm
2377* tblspcOid/TABLESPACE_VERSION_DIRECTORY/pg_compression
2378*
2379* @returns index of first tablespace entry, i.e tblspcOid/TABLESPACE_VERSION_DIRECTORY
2380*/
2381static void
2382rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
2383{
2384int len;
2385int p;
2386int j;
2387pgFile *prev_file;
2388pgFile *tmp_file;
2389char *cfs_tblspc_path;
2390uint32 h;
2391
2392/* hash table for cfm files */
2393#define HASHN 128
2394parray *hashtab[HASHN] = {NULL};
2395parray *bucket;
2396for (p = 0; p < HASHN; p++)
2397hashtab[p] = parray_new();
2398
2399
2400cfs_tblspc_path = strdup(relative);
2401if(!cfs_tblspc_path)
2402elog(ERROR, "Out of memory");
2403len = strlen("/pg_compression");
2404cfs_tblspc_path[strlen(cfs_tblspc_path) - len] = 0;
2405elog(LOG, "CFS DIRECTORY %s, pg_compression path: %s", cfs_tblspc_path, relative);
2406
2407for (p = (int) i; p >= 0; p--)
2408{
2409prev_file = (pgFile *) parray_get(files, (size_t) p);
2410
2411elog(LOG, "Checking file in cfs tablespace %s", prev_file->rel_path);
2412
2413if (strstr(prev_file->rel_path, cfs_tblspc_path) == NULL)
2414{
2415elog(LOG, "Breaking on %s", prev_file->rel_path);
2416break;
2417}
2418
2419if (!S_ISREG(prev_file->mode))
2420continue;
2421
2422h = hash_rel_seg(prev_file);
2423bucket = hashtab[h % HASHN];
2424
2425if (prev_file->forkName == cfm || prev_file->forkName == cfm_bck ||
2426prev_file->forkName == cfs_bck)
2427{
2428prev_file->skip_cfs_nested = true;
2429parray_append(bucket, prev_file);
2430}
2431else if (prev_file->is_datafile && prev_file->forkName == none)
2432{
2433elog(LOG, "Processing 'cfs' file %s", prev_file->rel_path);
2434/* have to mark as is_cfs even for system-tables for compatibility
2435* with older pg_probackup */
2436prev_file->is_cfs = true;
2437prev_file->cfs_chain = NULL;
2438for (j = 0; j < parray_num(bucket); j++)
2439{
2440tmp_file = parray_get(bucket, j);
2441elog(LOG, "Linking 'cfs' file '%s' to '%s'",
2442tmp_file->rel_path, prev_file->rel_path);
2443if (tmp_file->relOid == prev_file->relOid &&
2444tmp_file->segno == prev_file->segno)
2445{
2446tmp_file->cfs_chain = prev_file->cfs_chain;
2447prev_file->cfs_chain = tmp_file;
2448parray_remove(bucket, j);
2449j--;
2450}
2451}
2452}
2453}
2454
2455for (p = 0; p < HASHN; p++)
2456{
2457bucket = hashtab[p];
2458for (j = 0; j < parray_num(bucket); j++)
2459{
2460tmp_file = parray_get(bucket, j);
2461elog(WARNING, "Orphaned cfs related file '%s'", tmp_file->rel_path);
2462}
2463parray_free(bucket);
2464hashtab[p] = NULL;
2465}
2466#undef HASHN
2467free(cfs_tblspc_path);
2468}
2469
2470/*
2471* Find pgfile by given rnode in the backup_files_list
2472* and add given blkno to its pagemap.
2473*/
2474void
2475process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
2476{
2477// char *path;
2478char *rel_path;
2479BlockNumber blkno_inseg;
2480int segno;
2481pgFile **file_item;
2482pgFile f;
2483
2484segno = blkno / RELSEG_SIZE;
2485blkno_inseg = blkno % RELSEG_SIZE;
2486
2487rel_path = relpathperm(rnode, forknum);
2488if (segno > 0)
2489f.rel_path = psprintf("%s.%u", rel_path, segno);
2490else
2491f.rel_path = rel_path;
2492
2493f.external_dir_num = 0;
2494
2495/* backup_files_list should be sorted before */
2496file_item = (pgFile **) parray_bsearch(backup_files_list, &f,
2497pgFileCompareRelPathWithExternal);
2498
2499/*
2500* If we don't have any record of this file in the file map, it means
2501* that it's a relation that did not have much activity since the last
2502* backup. We can safely ignore it. If it is a new relation file, the
2503* backup would simply copy it as-is.
2504*/
2505if (file_item)
2506{
2507/* We need critical section only we use more than one threads */
2508if (num_threads > 1)
2509pthread_lock(&backup_pagemap_mutex);
2510
2511datapagemap_add(&(*file_item)->pagemap, blkno_inseg);
2512
2513if (num_threads > 1)
2514pthread_mutex_unlock(&backup_pagemap_mutex);
2515}
2516
2517if (segno > 0)
2518pg_free(f.rel_path);
2519pg_free(rel_path);
2520
2521}
2522
2523void
2524check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
2525{
2526PGresult *res;
2527int i = 0;
2528int j = 0;
2529char *tablespace_path = NULL;
2530char *query = "SELECT pg_catalog.pg_tablespace_location(oid) "
2531"FROM pg_catalog.pg_tablespace "
2532"WHERE pg_catalog.pg_tablespace_location(oid) <> '';";
2533
2534res = pgut_execute(backup_conn, query, 0, NULL);
2535
2536/* Check successfull execution of query */
2537if (!res)
2538elog(ERROR, "Failed to get list of tablespaces");
2539
2540for (i = 0; i < res->ntups; i++)
2541{
2542tablespace_path = PQgetvalue(res, i, 0);
2543Assert (strlen(tablespace_path) > 0);
2544
2545canonicalize_path(tablespace_path);
2546
2547for (j = 0; j < parray_num(external_list); j++)
2548{
2549char *external_path = parray_get(external_list, j);
2550
2551if (path_is_prefix_of_path(external_path, tablespace_path))
2552elog(ERROR, "External directory path (-E option) \"%s\" "
2553"contains tablespace \"%s\"",
2554external_path, tablespace_path);
2555if (path_is_prefix_of_path(tablespace_path, external_path))
2556elog(WARNING, "External directory path (-E option) \"%s\" "
2557"is in tablespace directory \"%s\"",
2558tablespace_path, external_path);
2559}
2560}
2561PQclear(res);
2562
2563/* Check that external directories do not overlap */
2564if (parray_num(external_list) < 2)
2565return;
2566
2567for (i = 0; i < parray_num(external_list); i++)
2568{
2569char *external_path = parray_get(external_list, i);
2570
2571for (j = 0; j < parray_num(external_list); j++)
2572{
2573char *tmp_external_path = parray_get(external_list, j);
2574
2575/* skip yourself */
2576if (j == i)
2577continue;
2578
2579if (path_is_prefix_of_path(external_path, tmp_external_path))
2580elog(ERROR, "External directory path (-E option) \"%s\" "
2581"contain another external directory \"%s\"",
2582external_path, tmp_external_path);
2583
2584}
2585}
2586}
2587
2588/*
2589* Calculate pgdata_bytes
2590* accepts (parray *) of (pgFile *)
2591*/
2592int64
2593calculate_datasize_of_filelist(parray *filelist)
2594{
2595int64 bytes = 0;
2596int i;
2597
2598/* parray_num don't check for NULL */
2599if (filelist == NULL)
2600return 0;
2601
2602for (i = 0; i < parray_num(filelist); i++)
2603{
2604pgFile *file = (pgFile *) parray_get(filelist, i);
2605
2606if (file->external_dir_num != 0 || file->excluded)
2607continue;
2608
2609if (S_ISDIR(file->mode))
2610{
2611// TODO is a dir always 4K?
2612bytes += 4096;
2613continue;
2614}
2615
2616bytes += file->size;
2617}
2618return bytes;
2619}
2620