pg_probackup
1972 строки · 54.7 Кб
1/*-------------------------------------------------------------------------
2*
3* parsexlog.c
4* Functions for reading Write-Ahead-Log
5*
6* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7* Portions Copyright (c) 1994, Regents of the University of California
8* Portions Copyright (c) 2015-2019, Postgres Professional
9*
10*-------------------------------------------------------------------------
11*/
12
13#include "pg_probackup.h"
14
15#include "access/transam.h"
16#include "catalog/pg_control.h"
17#include "commands/dbcommands_xlog.h"
18#include "catalog/storage_xlog.h"
19
20#ifdef HAVE_LIBZ
21#include <zlib.h>
22#endif
23
24#include "utils/thread.h"
25#include <unistd.h>
26#include <time.h>
27
28/*
29* RmgrNames is an array of resource manager names, to make error messages
30* a bit nicer.
31*/
32#if PG_VERSION_NUM >= 150000
33#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
34name,
35#elif PG_VERSION_NUM >= 100000
36#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
37name,
38#else
39#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
40name,
41#endif
42
43static const char *RmgrNames[RM_MAX_ID + 1] = {
44#include "access/rmgrlist.h"
45};
46
47/* some from access/xact.h */
48/*
49* XLOG allows to store some information in high 4 bits of log record xl_info
50* field. We use 3 for the opcode, and one about an optional flag variable.
51*/
52#define XLOG_XACT_COMMIT 0x00
53#define XLOG_XACT_PREPARE 0x10
54#define XLOG_XACT_ABORT 0x20
55#define XLOG_XACT_COMMIT_PREPARED 0x30
56#define XLOG_XACT_ABORT_PREPARED 0x40
57#define XLOG_XACT_ASSIGNMENT 0x50
58/* free opcode 0x60 */
59/* free opcode 0x70 */
60
61/* mask for filtering opcodes out of xl_info */
62#define XLOG_XACT_OPMASK 0x70
63
64typedef struct xl_xact_commit
65{
66TimestampTz xact_time; /* time of commit */
67
68/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
69/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
70/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
71/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
72/* xl_xact_invals follows if XINFO_HAS_INVALS */
73/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
74/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
75} xl_xact_commit;
76
77typedef struct xl_xact_abort
78{
79TimestampTz xact_time; /* time of abort */
80
81/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
82/* No db_info required */
83/* xl_xact_subxacts follows if HAS_SUBXACT */
84/* xl_xact_relfilenodes follows if HAS_RELFILENODES */
85/* No invalidation messages needed. */
86/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
87} xl_xact_abort;
88
89/*
90* XLogRecTarget allows to track the last recovery targets. Currently used only
91* within validate_wal().
92*/
93typedef struct XLogRecTarget
94{
95TimestampTz rec_time;
96TransactionId rec_xid;
97XLogRecPtr rec_lsn;
98} XLogRecTarget;
99
100typedef struct XLogReaderData
101{
102int thread_num;
103TimeLineID tli;
104
105XLogRecTarget cur_rec;
106XLogSegNo xlogsegno;
107bool xlogexists;
108
109char page_buf[XLOG_BLCKSZ];
110uint32 prev_page_off;
111
112bool need_switch;
113
114int xlogfile;
115char xlogpath[MAXPGPATH];
116
117#ifdef HAVE_LIBZ
118gzFile gz_xlogfile;
119char gz_xlogpath[MAXPGPATH];
120#endif
121} XLogReaderData;
122
123/* Function to process a WAL record */
124typedef void (*xlog_record_function) (XLogReaderState *record,
125XLogReaderData *reader_data,
126bool *stop_reading);
127
128/* An argument for a thread function */
129typedef struct
130{
131XLogReaderData reader_data;
132
133xlog_record_function process_record;
134
135XLogRecPtr startpoint;
136XLogRecPtr endpoint;
137XLogSegNo endSegNo;
138
139/*
140* The thread got the recovery target.
141*/
142bool got_target;
143
144/* Should we read record, located at endpoint position */
145bool inclusive_endpoint;
146
147/*
148* Return value from the thread.
149* 0 means there is no error, 1 - there is an error.
150*/
151int ret;
152} xlog_thread_arg;
153
154static XLogRecord* WalReadRecord(XLogReaderState *xlogreader, XLogRecPtr startpoint, char **errormsg);
155static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *reader_data);
156
157static int SimpleXLogPageRead(XLogReaderState *xlogreader,
158XLogRecPtr targetPagePtr,
159int reqLen, XLogRecPtr targetRecPtr, char *readBuf
160#if PG_VERSION_NUM < 130000
161,TimeLineID *pageTLI
162#endif
163);
164static XLogReaderState *InitXLogPageRead(XLogReaderData *reader_data,
165const char *archivedir,
166TimeLineID tli, uint32 segment_size,
167bool manual_switch,
168bool consistent_read,
169bool allocate_reader);
170static bool RunXLogThreads(const char *archivedir,
171time_t target_time, TransactionId target_xid,
172XLogRecPtr target_lsn,
173TimeLineID tli, uint32 segment_size,
174XLogRecPtr startpoint, XLogRecPtr endpoint,
175bool consistent_read,
176xlog_record_function process_record,
177XLogRecTarget *last_rec,
178bool inclusive_endpoint);
179//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
180static bool SwitchThreadToNextWal(XLogReaderState *xlogreader,
181xlog_thread_arg *arg);
182static bool XLogWaitForConsistency(XLogReaderState *xlogreader);
183static void *XLogThreadWorker(void *arg);
184static void CleanupXLogPageRead(XLogReaderState *xlogreader);
185static void PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel);
186
187static void extractPageInfo(XLogReaderState *record,
188XLogReaderData *reader_data, bool *stop_reading);
189static void validateXLogRecord(XLogReaderState *record,
190XLogReaderData *reader_data, bool *stop_reading);
191static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime);
192
193static XLogSegNo segno_start = 0;
194/* Segment number where target record is located */
195static XLogSegNo segno_target = 0;
196/* Next segment number to read by a thread */
197static XLogSegNo segno_next = 0;
198/* Number of segments already read by threads */
199static uint32 segnum_read = 0;
200/* Number of detected corrupted or absent segments */
201static uint32 segnum_corrupted = 0;
202static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER;
203
204/* copied from timestamp.c */
205static pg_time_t
206timestamptz_to_time_t(TimestampTz t)
207{
208pg_time_t result;
209
210#ifdef HAVE_INT64_TIMESTAMP
211result = (pg_time_t) (t / USECS_PER_SEC +
212((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY));
213#else
214result = (pg_time_t) (t +
215((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY));
216#endif
217return result;
218}
219
220static const char *wal_archivedir = NULL;
221static uint32 wal_seg_size = 0;
222/*
223* If true a wal reader thread switches to the next segment using
224* segno_next.
225*/
226static bool wal_manual_switch = false;
227/*
228* If true a wal reader thread waits for other threads if the thread met absent
229* wal segment.
230*/
231static bool wal_consistent_read = false;
232
233/*
234* Variables used within validate_wal() and validateXLogRecord() to stop workers
235*/
236static time_t wal_target_time = 0;
237static TransactionId wal_target_xid = InvalidTransactionId;
238static XLogRecPtr wal_target_lsn = InvalidXLogRecPtr;
239
240/*
241* Read WAL from the archive directory, from 'startpoint' to 'endpoint' on the
242* given timeline. Collect data blocks touched by the WAL records into a page map.
243*
244* Pagemap extracting is processed using threads. Each thread reads single WAL
245* file.
246*/
247bool
248extractPageMap(const char *archivedir, uint32 wal_seg_size,
249XLogRecPtr startpoint, TimeLineID start_tli,
250XLogRecPtr endpoint, TimeLineID end_tli,
251parray *tli_list)
252{
253bool extract_isok = false;
254
255if (start_tli == end_tli)
256/* easy case */
257extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
258InvalidXLogRecPtr, end_tli, wal_seg_size,
259startpoint, endpoint, false, extractPageInfo,
260NULL, true);
261else
262{
263/* We have to process WAL located on several different xlog intervals,
264* located on different timelines.
265*
266* Consider this example:
267* t3 C-----X <!- We are here
268* /
269* t2 B---*-->
270* /
271* t1 -A----*------->
272*
273* A - prev backup START_LSN
274* B - switchpoint for t2, available as t2->switchpoint
275* C - switch for t3, available as t3->switchpoint
276* X - current backup START_LSN
277*
278* Intervals to be parsed:
279* - [A,B) on t1
280* - [B,C) on t2
281* - [C,X] on t3
282*/
283int i;
284parray *interval_list = parray_new();
285timelineInfo *end_tlinfo = NULL;
286timelineInfo *tmp_tlinfo = NULL;
287XLogRecPtr prev_switchpoint = InvalidXLogRecPtr;
288
289/* We must find TLI information about final timeline (t3 in example) */
290for (i = 0; i < parray_num(tli_list); i++)
291{
292tmp_tlinfo = parray_get(tli_list, i);
293
294if (tmp_tlinfo->tli == end_tli)
295{
296end_tlinfo = tmp_tlinfo;
297break;
298}
299}
300
301/* Iterate over timelines backward,
302* starting with end_tli and ending with start_tli.
303* For every timeline calculate LSN-interval that must be parsed.
304*/
305
306tmp_tlinfo = end_tlinfo;
307while (tmp_tlinfo)
308{
309lsnInterval *wal_interval = pgut_malloc(sizeof(lsnInterval));
310wal_interval->tli = tmp_tlinfo->tli;
311
312if (tmp_tlinfo->tli == end_tli)
313{
314wal_interval->begin_lsn = tmp_tlinfo->switchpoint;
315wal_interval->end_lsn = endpoint;
316}
317else if (tmp_tlinfo->tli == start_tli)
318{
319wal_interval->begin_lsn = startpoint;
320wal_interval->end_lsn = prev_switchpoint;
321}
322else
323{
324wal_interval->begin_lsn = tmp_tlinfo->switchpoint;
325wal_interval->end_lsn = prev_switchpoint;
326}
327
328parray_append(interval_list, wal_interval);
329
330if (tmp_tlinfo->tli == start_tli)
331break;
332
333prev_switchpoint = tmp_tlinfo->switchpoint;
334tmp_tlinfo = tmp_tlinfo->parent_link;
335}
336
337for (i = parray_num(interval_list) - 1; i >= 0; i--)
338{
339bool inclusive_endpoint;
340lsnInterval *tmp_interval = (lsnInterval *) parray_get(interval_list, i);
341
342/* In case of replica promotion, endpoints of intermediate
343* timelines can be unreachable.
344*/
345inclusive_endpoint = false;
346
347/* ... but not the end timeline */
348if (tmp_interval->tli == end_tli)
349inclusive_endpoint = true;
350
351extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
352InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size,
353tmp_interval->begin_lsn, tmp_interval->end_lsn,
354false, extractPageInfo, NULL, inclusive_endpoint);
355if (!extract_isok)
356break;
357
358pg_free(tmp_interval);
359}
360pg_free(interval_list);
361}
362
363return extract_isok;
364}
365
366/*
367* Ensure that the backup has all wal files needed for recovery to consistent
368* state.
369*
370* WAL records reading is processed using threads. Each thread reads single WAL
371* file.
372*/
373static void
374validate_backup_wal_from_start_to_stop(pgBackup *backup,
375const char *archivedir, TimeLineID tli,
376uint32 xlog_seg_size)
377{
378bool got_endpoint;
379
380got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId,
381InvalidXLogRecPtr, tli, xlog_seg_size,
382backup->start_lsn, backup->stop_lsn,
383false, NULL, NULL, true);
384
385if (!got_endpoint)
386{
387/*
388* If we don't have WAL between start_lsn and stop_lsn,
389* the backup is definitely corrupted. Update its status.
390*/
391write_backup_status(backup, BACKUP_STATUS_CORRUPT, true);
392
393elog(WARNING, "There are not enough WAL records to consistenly restore "
394"backup %s from START LSN: %X/%X to STOP LSN: %X/%X",
395backup_id_of(backup),
396(uint32) (backup->start_lsn >> 32),
397(uint32) (backup->start_lsn),
398(uint32) (backup->stop_lsn >> 32),
399(uint32) (backup->stop_lsn));
400}
401}
402
403/*
404* Ensure that the backup has all wal files needed for recovery to consistent
405* state. And check if we have in archive all files needed to restore the backup
406* up to the given recovery target.
407*/
408void
409validate_wal(pgBackup *backup, const char *archivedir,
410time_t target_time, TransactionId target_xid,
411XLogRecPtr target_lsn, TimeLineID tli, uint32 wal_seg_size)
412{
413XLogRecTarget last_rec;
414char last_timestamp[100],
415target_timestamp[100];
416bool all_wal = false;
417
418if (!XRecOffIsValid(backup->start_lsn))
419elog(ERROR, "Invalid start_lsn value %X/%X of backup %s",
420(uint32) (backup->start_lsn >> 32), (uint32) (backup->start_lsn),
421backup_id_of(backup));
422
423if (!XRecOffIsValid(backup->stop_lsn))
424elog(ERROR, "Invalid stop_lsn value %X/%X of backup %s",
425(uint32) (backup->stop_lsn >> 32), (uint32) (backup->stop_lsn),
426backup_id_of(backup));
427
428/*
429* Check that the backup has all wal files needed
430* for recovery to consistent state.
431*/
432if (backup->stream)
433{
434char backup_database_dir[MAXPGPATH];
435char backup_xlog_path[MAXPGPATH];
436
437join_path_components(backup_database_dir, backup->root_dir, DATABASE_DIR);
438join_path_components(backup_xlog_path, backup_database_dir, PG_XLOG_DIR);
439
440validate_backup_wal_from_start_to_stop(backup, backup_xlog_path, tli,
441wal_seg_size);
442}
443else
444validate_backup_wal_from_start_to_stop(backup, (char *) archivedir, tli,
445wal_seg_size);
446
447if (backup->status == BACKUP_STATUS_CORRUPT)
448{
449elog(WARNING, "Backup %s WAL segments are corrupted", backup_id_of(backup));
450return;
451}
452/*
453* If recovery target is provided check that we can restore backup to a
454* recovery target time or xid.
455*/
456if (!TransactionIdIsValid(target_xid) && target_time == 0 &&
457!XRecOffIsValid(target_lsn))
458{
459/* Recovery target is not given so exit */
460elog(INFO, "Backup %s WAL segments are valid", backup_id_of(backup));
461return;
462}
463
464/*
465* If recovery target is provided, ensure that archive files exist in
466* archive directory.
467*/
468if (dir_is_empty(archivedir, FIO_LOCAL_HOST))
469elog(ERROR, "WAL archive is empty. You cannot restore backup to a recovery target without WAL archive.");
470
471/*
472* Check if we have in archive all files needed to restore backup
473* up to the given recovery target.
474* In any case we cannot restore to the point before stop_lsn.
475*/
476
477/* We can restore at least up to the backup end */
478last_rec.rec_time = 0;
479last_rec.rec_xid = backup->recovery_xid;
480last_rec.rec_lsn = backup->stop_lsn;
481
482time2iso(last_timestamp, lengthof(last_timestamp), backup->recovery_time, false);
483
484if ((TransactionIdIsValid(target_xid) && target_xid == last_rec.rec_xid)
485|| (target_time != 0 && backup->recovery_time >= target_time)
486|| (XRecOffIsValid(target_lsn) && last_rec.rec_lsn >= target_lsn))
487all_wal = true;
488
489all_wal = all_wal ||
490RunXLogThreads(archivedir, target_time, target_xid, target_lsn,
491tli, wal_seg_size, backup->stop_lsn,
492InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true);
493if (last_rec.rec_time > 0)
494time2iso(last_timestamp, lengthof(last_timestamp),
495timestamptz_to_time_t(last_rec.rec_time), false);
496
497/* There are all needed WAL records */
498if (all_wal)
499elog(INFO, "Backup validation completed successfully on time %s, xid " XID_FMT " and LSN %X/%X",
500last_timestamp, last_rec.rec_xid,
501(uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn);
502/* Some needed WAL records are absent */
503else
504{
505elog(WARNING, "Recovery can be done up to time %s, xid " XID_FMT " and LSN %X/%X",
506last_timestamp, last_rec.rec_xid,
507(uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn);
508
509if (target_time > 0)
510time2iso(target_timestamp, lengthof(target_timestamp), target_time, false);
511if (TransactionIdIsValid(target_xid) && target_time != 0)
512elog(ERROR, "Not enough WAL records to time %s and xid " XID_FMT,
513target_timestamp, target_xid);
514else if (TransactionIdIsValid(target_xid))
515elog(ERROR, "Not enough WAL records to xid " XID_FMT,
516target_xid);
517else if (target_time != 0)
518elog(ERROR, "Not enough WAL records to time %s",
519target_timestamp);
520else if (XRecOffIsValid(target_lsn))
521elog(ERROR, "Not enough WAL records to lsn %X/%X",
522(uint32) (target_lsn >> 32), (uint32) (target_lsn));
523}
524}
525
526/*
527* Read from archived WAL segments latest recovery time and xid. All necessary
528* segments present at archive folder. We waited **stop_lsn** in
529* pg_stop_backup().
530*/
531bool
532read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
533XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
534time_t *recovery_time)
535{
536XLogRecPtr startpoint = stop_lsn;
537XLogReaderState *xlogreader;
538XLogReaderData reader_data;
539bool res;
540
541if (!XRecOffIsValid(start_lsn))
542elog(ERROR, "Invalid start_lsn value %X/%X",
543(uint32) (start_lsn >> 32), (uint32) (start_lsn));
544
545if (!XRecOffIsValid(stop_lsn))
546elog(ERROR, "Invalid stop_lsn value %X/%X",
547(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
548
549xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
550false, true, true);
551
552/* Read records from stop_lsn down to start_lsn */
553do
554{
555XLogRecord *record;
556TimestampTz last_time = 0;
557char *errormsg;
558
559#if PG_VERSION_NUM >= 130000
560if (XLogRecPtrIsInvalid(startpoint))
561startpoint = SizeOfXLogShortPHD;
562XLogBeginRead(xlogreader, startpoint);
563#endif
564
565record = WalReadRecord(xlogreader, startpoint, &errormsg);
566if (record == NULL)
567{
568XLogRecPtr errptr;
569
570errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
571
572if (errormsg)
573elog(ERROR, "Could not read WAL record at %X/%X: %s",
574(uint32) (errptr >> 32), (uint32) (errptr),
575errormsg);
576else
577elog(ERROR, "Could not read WAL record at %X/%X",
578(uint32) (errptr >> 32), (uint32) (errptr));
579}
580
581/* Read previous record */
582startpoint = record->xl_prev;
583
584if (getRecordTimestamp(xlogreader, &last_time))
585{
586*recovery_time = timestamptz_to_time_t(last_time);
587
588/* Found timestamp in WAL record 'record' */
589res = true;
590goto cleanup;
591}
592} while (startpoint >= start_lsn);
593
594/* Didn't find timestamp from WAL records between start_lsn and stop_lsn */
595res = false;
596
597cleanup:
598CleanupXLogPageRead(xlogreader);
599XLogReaderFree(xlogreader);
600
601return res;
602}
603
604/*
605* Check if there is a WAL segment file in 'archivedir' which contains
606* 'target_lsn'.
607*/
608bool
609wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
610TimeLineID target_tli, uint32 wal_seg_size)
611{
612XLogReaderState *xlogreader;
613XLogReaderData reader_data;
614char *errormsg;
615bool res;
616
617if (!XRecOffIsValid(target_lsn))
618elog(ERROR, "Invalid target_lsn value %X/%X",
619(uint32) (target_lsn >> 32), (uint32) (target_lsn));
620
621xlogreader = InitXLogPageRead(&reader_data, archivedir, target_tli,
622wal_seg_size, false, false, true);
623
624if (xlogreader == NULL)
625elog(ERROR, "Out of memory");
626
627xlogreader->system_identifier = instance_config.system_identifier;
628
629#if PG_VERSION_NUM >= 130000
630if (XLogRecPtrIsInvalid(target_lsn))
631target_lsn = SizeOfXLogShortPHD;
632XLogBeginRead(xlogreader, target_lsn);
633#endif
634
635res = WalReadRecord(xlogreader, target_lsn, &errormsg) != NULL;
636/* Didn't find 'target_lsn' and there is no error, return false */
637
638if (errormsg)
639elog(WARNING, "Could not read WAL record at %X/%X: %s",
640(uint32) (target_lsn >> 32), (uint32) (target_lsn), errormsg);
641
642CleanupXLogPageRead(xlogreader);
643XLogReaderFree(xlogreader);
644
645return res;
646}
647
648/*
649* Get LSN of a first record within the WAL segment with number 'segno'.
650*/
651XLogRecPtr
652get_first_record_lsn(const char *archivedir, XLogSegNo segno,
653TimeLineID tli, uint32 wal_seg_size, int timeout)
654{
655XLogReaderState *xlogreader;
656XLogReaderData reader_data;
657XLogRecPtr record = InvalidXLogRecPtr;
658XLogRecPtr startpoint;
659char wal_segment[MAXFNAMELEN];
660int attempts = 0;
661
662if (segno <= 1)
663elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
664
665GetXLogFileName(wal_segment, tli, segno, instance_config.xlog_seg_size);
666
667xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
668false, false, true);
669if (xlogreader == NULL)
670elog(ERROR, "Out of memory");
671xlogreader->system_identifier = instance_config.system_identifier;
672
673/* Set startpoint to 0 in segno */
674GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
675
676#if PG_VERSION_NUM >= 130000
677if (XLogRecPtrIsInvalid(startpoint))
678startpoint = SizeOfXLogShortPHD;
679XLogBeginRead(xlogreader, startpoint);
680#endif
681
682while (attempts <= timeout)
683{
684record = XLogFindNextRecord(xlogreader, startpoint);
685
686if (XLogRecPtrIsInvalid(record))
687record = InvalidXLogRecPtr;
688else
689{
690elog(LOG, "First record in WAL segment \"%s\": %X/%X", wal_segment,
691(uint32) (record >> 32), (uint32) (record));
692break;
693}
694
695attempts++;
696sleep(1);
697}
698
699/* cleanup */
700CleanupXLogPageRead(xlogreader);
701XLogReaderFree(xlogreader);
702
703return record;
704}
705
706
707/*
708* Get LSN of the record next after target lsn.
709*/
710XLogRecPtr
711get_next_record_lsn(const char *archivedir, XLogSegNo segno,
712TimeLineID tli, uint32 wal_seg_size, int timeout,
713XLogRecPtr target)
714{
715XLogReaderState *xlogreader;
716XLogReaderData reader_data;
717XLogRecPtr startpoint, found;
718XLogRecPtr res = InvalidXLogRecPtr;
719char wal_segment[MAXFNAMELEN];
720int attempts = 0;
721
722if (segno <= 1)
723elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
724
725GetXLogFileName(wal_segment, tli, segno, instance_config.xlog_seg_size);
726
727xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
728false, false, true);
729if (xlogreader == NULL)
730elog(ERROR, "Out of memory");
731xlogreader->system_identifier = instance_config.system_identifier;
732
733/* Set startpoint to 0 in segno */
734GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
735
736#if PG_VERSION_NUM >= 130000
737if (XLogRecPtrIsInvalid(startpoint))
738startpoint = SizeOfXLogShortPHD;
739XLogBeginRead(xlogreader, startpoint);
740#endif
741
742found = XLogFindNextRecord(xlogreader, startpoint);
743
744if (XLogRecPtrIsInvalid(found))
745{
746if (xlogreader->errormsg_buf[0] != '\0')
747elog(WARNING, "Could not read WAL record at %X/%X: %s",
748(uint32) (startpoint >> 32), (uint32) (startpoint),
749xlogreader->errormsg_buf);
750else
751elog(WARNING, "Could not read WAL record at %X/%X",
752(uint32) (startpoint >> 32), (uint32) (startpoint));
753PrintXLogCorruptionMsg(&reader_data, ERROR);
754}
755startpoint = found;
756
757while (attempts <= timeout)
758{
759XLogRecord *record;
760char *errormsg;
761
762if (interrupted)
763elog(ERROR, "Interrupted during WAL reading");
764
765record = WalReadRecord(xlogreader, startpoint, &errormsg);
766
767if (record == NULL)
768{
769XLogRecPtr errptr;
770
771errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
772startpoint;
773
774if (errormsg)
775elog(WARNING, "Could not read WAL record at %X/%X: %s",
776(uint32) (errptr >> 32), (uint32) (errptr),
777errormsg);
778else
779elog(WARNING, "Could not read WAL record at %X/%X",
780(uint32) (errptr >> 32), (uint32) (errptr));
781PrintXLogCorruptionMsg(&reader_data, ERROR);
782}
783
784if (xlogreader->ReadRecPtr >= target)
785{
786elog(LOG, "Record %X/%X is next after target LSN %X/%X",
787(uint32) (xlogreader->ReadRecPtr >> 32), (uint32) (xlogreader->ReadRecPtr),
788(uint32) (target >> 32), (uint32) (target));
789res = xlogreader->ReadRecPtr;
790break;
791}
792else
793startpoint = InvalidXLogRecPtr;
794}
795
796/* cleanup */
797CleanupXLogPageRead(xlogreader);
798XLogReaderFree(xlogreader);
799
800return res;
801}
802
803
804/*
805* Get LSN of a record prior to target_lsn.
806* If 'start_lsn' is in the segment with number 'segno' then start from 'start_lsn',
807* otherwise start from offset 0 within the segment.
808*
809* Returns LSN of a record which EndRecPtr is greater or equal to target_lsn.
810* If 'seek_prev_segment' is true, then look for prior record in prior WAL segment.
811*
812* it's unclear that "last" in "last_wal_lsn" refers to the
813* "closest to stop_lsn backward or forward, depending on seek_prev_segment setting".
814*/
815XLogRecPtr
816get_prior_record_lsn(const char *archivedir, XLogRecPtr start_lsn,
817XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment,
818uint32 wal_seg_size)
819{
820XLogReaderState *xlogreader;
821XLogReaderData reader_data;
822XLogRecPtr startpoint;
823XLogSegNo start_segno;
824XLogSegNo segno;
825XLogRecPtr res = InvalidXLogRecPtr;
826
827GetXLogSegNo(stop_lsn, segno, wal_seg_size);
828
829if (segno <= 1)
830elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
831
832if (seek_prev_segment)
833segno = segno - 1;
834
835xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
836false, false, true);
837
838if (xlogreader == NULL)
839elog(ERROR, "Out of memory");
840
841xlogreader->system_identifier = instance_config.system_identifier;
842
843/*
844* Calculate startpoint. Decide: we should use 'start_lsn' or offset 0.
845*/
846GetXLogSegNo(start_lsn, start_segno, wal_seg_size);
847if (start_segno == segno)
848{
849startpoint = start_lsn;
850#if PG_VERSION_NUM >= 130000
851if (XLogRecPtrIsInvalid(startpoint))
852startpoint = SizeOfXLogShortPHD;
853XLogBeginRead(xlogreader, startpoint);
854#endif
855}
856else
857{
858XLogRecPtr found;
859
860GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
861
862#if PG_VERSION_NUM >= 130000
863if (XLogRecPtrIsInvalid(startpoint))
864startpoint = SizeOfXLogShortPHD;
865XLogBeginRead(xlogreader, startpoint);
866#endif
867
868found = XLogFindNextRecord(xlogreader, startpoint);
869
870if (XLogRecPtrIsInvalid(found))
871{
872if (xlogreader->errormsg_buf[0] != '\0')
873elog(WARNING, "Could not read WAL record at %X/%X: %s",
874(uint32) (startpoint >> 32), (uint32) (startpoint),
875xlogreader->errormsg_buf);
876else
877elog(WARNING, "Could not read WAL record at %X/%X",
878(uint32) (startpoint >> 32), (uint32) (startpoint));
879PrintXLogCorruptionMsg(&reader_data, ERROR);
880}
881startpoint = found;
882}
883
884while (true)
885{
886XLogRecord *record;
887char *errormsg;
888
889if (interrupted)
890elog(ERROR, "Interrupted during WAL reading");
891
892record = WalReadRecord(xlogreader, startpoint, &errormsg);
893if (record == NULL)
894{
895XLogRecPtr errptr;
896
897errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
898startpoint;
899
900if (errormsg)
901elog(WARNING, "Could not read WAL record at %X/%X: %s",
902(uint32) (errptr >> 32), (uint32) (errptr),
903errormsg);
904else
905elog(WARNING, "Could not read WAL record at %X/%X",
906(uint32) (errptr >> 32), (uint32) (errptr));
907PrintXLogCorruptionMsg(&reader_data, ERROR);
908}
909
910if (xlogreader->EndRecPtr >= stop_lsn)
911{
912elog(LOG, "Record %X/%X has endpoint %X/%X which is equal or greater than requested LSN %X/%X",
913(uint32) (xlogreader->ReadRecPtr >> 32), (uint32) (xlogreader->ReadRecPtr),
914(uint32) (xlogreader->EndRecPtr >> 32), (uint32) (xlogreader->EndRecPtr),
915(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
916res = xlogreader->ReadRecPtr;
917break;
918}
919
920/* continue reading at next record */
921startpoint = InvalidXLogRecPtr;
922}
923
924CleanupXLogPageRead(xlogreader);
925XLogReaderFree(xlogreader);
926
927return res;
928}
929
930#ifdef HAVE_LIBZ
931/*
932* Show error during work with compressed file
933*/
934static const char *
935get_gz_error(gzFile gzf)
936{
937int errnum;
938const char *errmsg;
939
940errmsg = fio_gzerror(gzf, &errnum);
941if (errnum == Z_ERRNO)
942return strerror(errno);
943else
944return errmsg;
945}
946#endif
947
948/* XLogreader callback function, to read a WAL page */
949static int
950SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
951int reqLen, XLogRecPtr targetRecPtr, char *readBuf
952#if PG_VERSION_NUM < 130000
953,TimeLineID *pageTLI
954#endif
955)
956{
957XLogReaderData *reader_data;
958uint32 targetPageOff;
959
960reader_data = (XLogReaderData *) xlogreader->private_data;
961targetPageOff = targetPagePtr % wal_seg_size;
962
963if (interrupted || thread_interrupted)
964elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
965reader_data->thread_num);
966
967/*
968* See if we need to switch to a new segment because the requested record
969* is not in the currently open one.
970*/
971if (!IsInXLogSeg(targetPagePtr, reader_data->xlogsegno, wal_seg_size))
972{
973elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X",
974reader_data->thread_num,
975(uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr),
976(uint32) (xlogreader->currRecPtr >> 32),
977(uint32) (xlogreader->currRecPtr ));
978
979/*
980* If the last record on the page is not complete,
981* we must continue reading pages in the same thread
982*/
983if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) &&
984xlogreader->currRecPtr < targetPagePtr)
985{
986CleanupXLogPageRead(xlogreader);
987
988/*
989* Switch to the next WAL segment after reading contrecord.
990*/
991if (wal_manual_switch)
992reader_data->need_switch = true;
993}
994else
995{
996CleanupXLogPageRead(xlogreader);
997/*
998* Do not switch to next WAL segment in this function. It is
999* manually switched by a thread routine.
1000*/
1001if (wal_manual_switch)
1002{
1003reader_data->need_switch = true;
1004return -1;
1005}
1006}
1007}
1008
1009GetXLogSegNo(targetPagePtr, reader_data->xlogsegno, wal_seg_size);
1010
1011/* Try to switch to the next WAL segment */
1012if (!reader_data->xlogexists)
1013{
1014char xlogfname[MAXFNAMELEN];
1015char partial_file[MAXPGPATH];
1016
1017GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);
1018
1019join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
1020snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
1021
1022/* We fall back to using .partial segment in case if we are running
1023* multi-timeline incremental backup right after standby promotion.
1024* TODO: it should be explicitly enabled.
1025*/
1026snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
1027
1028/* If segment do not exists, but the same
1029* segment with '.partial' suffix does, use it instead */
1030if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
1031fileExists(partial_file, FIO_LOCAL_HOST))
1032{
1033snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
1034}
1035
1036if (fileExists(reader_data->xlogpath, FIO_LOCAL_HOST))
1037{
1038elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
1039reader_data->thread_num, reader_data->xlogpath);
1040
1041reader_data->xlogexists = true;
1042reader_data->xlogfile = fio_open(reader_data->xlogpath,
1043O_RDONLY | PG_BINARY, FIO_LOCAL_HOST);
1044
1045if (reader_data->xlogfile < 0)
1046{
1047elog(WARNING, "Thread [%d]: Could not open WAL segment \"%s\": %s",
1048reader_data->thread_num, reader_data->xlogpath,
1049strerror(errno));
1050return -1;
1051}
1052}
1053#ifdef HAVE_LIBZ
1054/* Try to open compressed WAL segment */
1055else if (fileExists(reader_data->gz_xlogpath, FIO_LOCAL_HOST))
1056{
1057elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
1058reader_data->thread_num, reader_data->gz_xlogpath);
1059
1060reader_data->xlogexists = true;
1061reader_data->gz_xlogfile = fio_gzopen(reader_data->gz_xlogpath,
1062"rb", -1, FIO_LOCAL_HOST);
1063if (reader_data->gz_xlogfile == NULL)
1064{
1065elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s",
1066reader_data->thread_num, reader_data->gz_xlogpath,
1067strerror(errno));
1068return -1;
1069}
1070}
1071#endif
1072/* Exit without error if WAL segment doesn't exist */
1073if (!reader_data->xlogexists)
1074return -1;
1075}
1076
1077/*
1078* At this point, we have the right segment open.
1079*/
1080Assert(reader_data->xlogexists);
1081
1082/*
1083* Do not read same page read earlier from the file, read it from the buffer
1084*/
1085if (reader_data->prev_page_off != 0 &&
1086reader_data->prev_page_off == targetPageOff)
1087{
1088memcpy(readBuf, reader_data->page_buf, XLOG_BLCKSZ);
1089#if PG_VERSION_NUM < 130000
1090*pageTLI = reader_data->tli;
1091#endif
1092return XLOG_BLCKSZ;
1093}
1094
1095/* Read the requested page */
1096if (reader_data->xlogfile != -1)
1097{
1098if (fio_seek(reader_data->xlogfile, (off_t) targetPageOff) < 0)
1099{
1100elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s",
1101reader_data->thread_num, reader_data->xlogpath, strerror(errno));
1102return -1;
1103}
1104
1105if (fio_read(reader_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
1106{
1107elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s",
1108reader_data->thread_num, reader_data->xlogpath, strerror(errno));
1109return -1;
1110}
1111}
1112#ifdef HAVE_LIBZ
1113else
1114{
1115if (fio_gzseek(reader_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1)
1116{
1117elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s",
1118reader_data->thread_num, reader_data->gz_xlogpath,
1119get_gz_error(reader_data->gz_xlogfile));
1120return -1;
1121}
1122
1123if (fio_gzread(reader_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
1124{
1125elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s",
1126reader_data->thread_num, reader_data->gz_xlogpath,
1127get_gz_error(reader_data->gz_xlogfile));
1128return -1;
1129}
1130}
1131#endif
1132
1133memcpy(reader_data->page_buf, readBuf, XLOG_BLCKSZ);
1134reader_data->prev_page_off = targetPageOff;
1135#if PG_VERSION_NUM < 130000
1136*pageTLI = reader_data->tli;
1137#endif
1138return XLOG_BLCKSZ;
1139}
1140
1141/*
1142* Initialize WAL segments reading.
1143*/
1144static XLogReaderState *
1145InitXLogPageRead(XLogReaderData *reader_data, const char *archivedir,
1146TimeLineID tli, uint32 segment_size, bool manual_switch,
1147bool consistent_read, bool allocate_reader)
1148{
1149XLogReaderState *xlogreader = NULL;
1150
1151wal_archivedir = archivedir;
1152wal_seg_size = segment_size;
1153wal_manual_switch = manual_switch;
1154wal_consistent_read = consistent_read;
1155
1156MemSet(reader_data, 0, sizeof(XLogReaderData));
1157reader_data->tli = tli;
1158reader_data->xlogfile = -1;
1159
1160if (allocate_reader)
1161{
1162xlogreader = WalReaderAllocate(wal_seg_size, reader_data);
1163if (xlogreader == NULL)
1164elog(ERROR, "Out of memory");
1165xlogreader->system_identifier = instance_config.system_identifier;
1166}
1167
1168return xlogreader;
1169}
1170
1171/*
1172* Comparison function to sort xlog_thread_arg array.
1173*/
1174static int
1175xlog_thread_arg_comp(const void *a1, const void *a2)
1176{
1177const xlog_thread_arg *arg1 = a1;
1178const xlog_thread_arg *arg2 = a2;
1179
1180return arg1->reader_data.xlogsegno - arg2->reader_data.xlogsegno;
1181}
1182
1183/*
1184* Run WAL processing routines using threads. Start from startpoint up to
1185* endpoint. It is possible to send zero endpoint, threads will read WAL
1186* infinitely in this case.
1187*/
1188static bool
1189RunXLogThreads(const char *archivedir, time_t target_time,
1190TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli,
1191uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint,
1192bool consistent_read, xlog_record_function process_record,
1193XLogRecTarget *last_rec, bool inclusive_endpoint)
1194{
1195pthread_t *threads;
1196xlog_thread_arg *thread_args;
1197int i;
1198int threads_need = 0;
1199XLogSegNo endSegNo = 0;
1200bool result = true;
1201
1202if (!XRecOffIsValid(startpoint) && !XRecOffIsNull(startpoint))
1203elog(ERROR, "Invalid startpoint value %X/%X",
1204(uint32) (startpoint >> 32), (uint32) (startpoint));
1205
1206if (process_record)
1207elog(LOG, "Extracting pagemap from tli %i on range from %X/%X to %X/%X",
1208tli,
1209(uint32) (startpoint >> 32), (uint32) (startpoint),
1210(uint32) (endpoint >> 32), (uint32) (endpoint));
1211
1212if (!XLogRecPtrIsInvalid(endpoint))
1213{
1214// if (XRecOffIsNull(endpoint) && !inclusive_endpoint)
1215if (XRecOffIsNull(endpoint))
1216{
1217GetXLogSegNo(endpoint, endSegNo, segment_size);
1218endSegNo--;
1219}
1220else if (!XRecOffIsValid(endpoint))
1221{
1222elog(ERROR, "Invalid endpoint value %X/%X",
1223(uint32) (endpoint >> 32), (uint32) (endpoint));
1224}
1225else
1226GetXLogSegNo(endpoint, endSegNo, segment_size);
1227}
1228
1229/* Initialize static variables for workers */
1230wal_target_time = target_time;
1231wal_target_xid = target_xid;
1232wal_target_lsn = target_lsn;
1233
1234GetXLogSegNo(startpoint, segno_start, segment_size);
1235segno_target = 0;
1236GetXLogSegNo(startpoint, segno_next, segment_size);
1237segnum_read = 0;
1238segnum_corrupted = 0;
1239
1240threads = (pthread_t *) pgut_malloc(sizeof(pthread_t) * num_threads);
1241thread_args = (xlog_thread_arg *) pgut_malloc(sizeof(xlog_thread_arg) * num_threads);
1242
1243/*
1244* Initialize thread args.
1245*
1246* Each thread works with its own WAL segment and we need to adjust
1247* startpoint value for each thread.
1248*/
1249for (i = 0; i < num_threads; i++)
1250{
1251xlog_thread_arg *arg = &thread_args[i];
1252
1253InitXLogPageRead(&arg->reader_data, archivedir, tli, segment_size, true,
1254consistent_read, false);
1255arg->reader_data.xlogsegno = segno_next;
1256arg->reader_data.thread_num = i + 1;
1257arg->process_record = process_record;
1258arg->startpoint = startpoint;
1259arg->endpoint = endpoint;
1260arg->endSegNo = endSegNo;
1261arg->inclusive_endpoint = inclusive_endpoint;
1262arg->got_target = false;
1263/* By default there is some error */
1264arg->ret = 1;
1265
1266threads_need++;
1267segno_next++;
1268/*
1269* If we need to read less WAL segments than num_threads, create less
1270* threads.
1271*/
1272if (endSegNo != 0 && segno_next > endSegNo)
1273break;
1274GetXLogRecPtr(segno_next, 0, segment_size, startpoint);
1275}
1276
1277/* Run threads */
1278thread_interrupted = false;
1279for (i = 0; i < threads_need; i++)
1280{
1281elog(VERBOSE, "Start WAL reader thread: %d", i + 1);
1282pthread_create(&threads[i], NULL, XLogThreadWorker, &thread_args[i]);
1283}
1284
1285/* Wait for threads */
1286for (i = 0; i < threads_need; i++)
1287{
1288pthread_join(threads[i], NULL);
1289if (thread_args[i].ret == 1)
1290result = false;
1291}
1292thread_interrupted = false;
1293
1294// TODO: we must detect difference between actual error (failed to read WAL) and interrupt signal
1295// if (interrupted)
1296// elog(ERROR, "Interrupted during WAL parsing");
1297
1298/* Release threads here, use thread_args only below */
1299pfree(threads);
1300threads = NULL;
1301
1302if (last_rec)
1303{
1304/*
1305* We need to sort xlog_thread_arg array by xlogsegno to return latest
1306* possible record up to which restore is possible. We need to sort to
1307* detect failed thread between start segment and target segment.
1308*
1309* Loop stops on first failed thread.
1310*/
1311if (threads_need > 1)
1312qsort((void *) thread_args, threads_need, sizeof(xlog_thread_arg),
1313xlog_thread_arg_comp);
1314
1315for (i = 0; i < threads_need; i++)
1316{
1317XLogRecTarget *cur_rec;
1318
1319cur_rec = &thread_args[i].reader_data.cur_rec;
1320/*
1321* If we got the target return minimum possible record.
1322*/
1323if (segno_target > 0)
1324{
1325if (thread_args[i].got_target &&
1326thread_args[i].reader_data.xlogsegno == segno_target)
1327{
1328*last_rec = *cur_rec;
1329break;
1330}
1331}
1332/*
1333* Else return maximum possible record up to which restore is
1334* possible.
1335*/
1336else if (last_rec->rec_lsn < cur_rec->rec_lsn)
1337*last_rec = *cur_rec;
1338
1339/*
1340* We reached failed thread, so stop here. We cannot use following
1341* WAL records after failed segment.
1342*/
1343if (thread_args[i].ret != 0)
1344break;
1345}
1346}
1347
1348pfree(thread_args);
1349
1350return result;
1351}
1352
1353/*
1354* WAL reader worker.
1355*/
1356void *
1357XLogThreadWorker(void *arg)
1358{
1359xlog_thread_arg *thread_arg = (xlog_thread_arg *) arg;
1360XLogReaderData *reader_data = &thread_arg->reader_data;
1361XLogReaderState *xlogreader;
1362XLogSegNo nextSegNo = 0;
1363XLogRecPtr found;
1364uint32 prev_page_off = 0;
1365bool need_read = true;
1366
1367xlogreader = WalReaderAllocate(wal_seg_size, reader_data);
1368
1369if (xlogreader == NULL)
1370elog(ERROR, "Thread [%d]: out of memory", reader_data->thread_num);
1371xlogreader->system_identifier = instance_config.system_identifier;
1372
1373#if PG_VERSION_NUM >= 130000
1374if (XLogRecPtrIsInvalid(thread_arg->startpoint))
1375thread_arg->startpoint = SizeOfXLogShortPHD;
1376XLogBeginRead(xlogreader, thread_arg->startpoint);
1377#endif
1378
1379found = XLogFindNextRecord(xlogreader, thread_arg->startpoint);
1380
1381/*
1382* We get invalid WAL record pointer usually when WAL segment is absent or
1383* is corrupted.
1384*/
1385if (XLogRecPtrIsInvalid(found))
1386{
1387if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1388need_read = false;
1389else
1390{
1391if (xlogreader->errormsg_buf[0] != '\0')
1392elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X: %s",
1393reader_data->thread_num,
1394(uint32) (thread_arg->startpoint >> 32),
1395(uint32) (thread_arg->startpoint),
1396xlogreader->errormsg_buf);
1397else
1398elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1399reader_data->thread_num,
1400(uint32) (thread_arg->startpoint >> 32),
1401(uint32) (thread_arg->startpoint));
1402PrintXLogCorruptionMsg(reader_data, ERROR);
1403}
1404}
1405
1406thread_arg->startpoint = found;
1407
1408elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X",
1409reader_data->thread_num,
1410(uint32) (thread_arg->startpoint >> 32),
1411(uint32) (thread_arg->startpoint));
1412
1413while (need_read)
1414{
1415XLogRecord *record;
1416char *errormsg;
1417bool stop_reading = false;
1418
1419if (interrupted || thread_interrupted)
1420elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
1421reader_data->thread_num);
1422
1423/*
1424* We need to switch to the next WAL segment after reading previous
1425* record. It may happen if we read contrecord.
1426*/
1427if (reader_data->need_switch &&
1428!SwitchThreadToNextWal(xlogreader, thread_arg))
1429break;
1430
1431record = WalReadRecord(xlogreader, thread_arg->startpoint, &errormsg);
1432
1433if (record == NULL)
1434{
1435XLogRecPtr errptr;
1436
1437/*
1438* There is no record, try to switch to the next WAL segment.
1439* Usually SimpleXLogPageRead() does it by itself. But here we need
1440* to do it manually to support threads.
1441*/
1442#if PG_VERSION_NUM >= 150000
1443if (reader_data->need_switch && (
1444errormsg == NULL ||
1445/*
1446* Pg15 now informs if "contrecord" is missing.
1447* TODO: probably we should abort reading logs at this moment.
1448* But we continue as we did with bug present in Pg < 15.
1449*/
1450!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)))
1451#else
1452if (reader_data->need_switch && errormsg == NULL)
1453#endif
1454{
1455if (SwitchThreadToNextWal(xlogreader, thread_arg))
1456continue;
1457else
1458break;
1459}
1460
1461/*
1462* XLogWaitForConsistency() is normally used only with threads.
1463* Call it here for just in case.
1464*/
1465if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1466break;
1467else if (wal_consistent_read)
1468{
1469XLogSegNo segno_report;
1470
1471pthread_lock(&wal_segment_mutex);
1472segno_report = segno_start + segnum_read;
1473pthread_mutex_unlock(&wal_segment_mutex);
1474
1475/*
1476* Report error message if this is the first corrupted WAL.
1477*/
1478if (reader_data->xlogsegno > segno_report)
1479return NULL; /* otherwise just stop the thread */
1480}
1481
1482errptr = thread_arg->startpoint ?
1483thread_arg->startpoint : xlogreader->EndRecPtr;
1484
1485if (errormsg)
1486elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X: %s",
1487reader_data->thread_num,
1488(uint32) (errptr >> 32), (uint32) (errptr),
1489errormsg);
1490else
1491elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1492reader_data->thread_num,
1493(uint32) (errptr >> 32), (uint32) (errptr));
1494
1495/* In we failed to read record located at endpoint position,
1496* and endpoint is not inclusive, do not consider this as an error.
1497*/
1498if (!thread_arg->inclusive_endpoint &&
1499errptr == thread_arg->endpoint)
1500{
1501elog(LOG, "Thread [%d]: Endpoint %X/%X is not inclusive, switch to the next timeline",
1502reader_data->thread_num,
1503(uint32) (thread_arg->endpoint >> 32), (uint32) (thread_arg->endpoint));
1504break;
1505}
1506
1507/*
1508* If we don't have all WAL files from prev backup start_lsn to current
1509* start_lsn, we won't be able to build page map and PAGE backup will
1510* be incorrect. Stop it and throw an error.
1511*/
1512PrintXLogCorruptionMsg(reader_data, ERROR);
1513}
1514
1515getRecordTimestamp(xlogreader, &reader_data->cur_rec.rec_time);
1516if (TransactionIdIsValid(XLogRecGetXid(xlogreader)))
1517reader_data->cur_rec.rec_xid = XLogRecGetXid(xlogreader);
1518reader_data->cur_rec.rec_lsn = xlogreader->ReadRecPtr;
1519
1520if (thread_arg->process_record)
1521thread_arg->process_record(xlogreader, reader_data, &stop_reading);
1522if (stop_reading)
1523{
1524thread_arg->got_target = true;
1525
1526pthread_lock(&wal_segment_mutex);
1527/* We should store least target segment number */
1528if (segno_target == 0 || segno_target > reader_data->xlogsegno)
1529segno_target = reader_data->xlogsegno;
1530pthread_mutex_unlock(&wal_segment_mutex);
1531
1532break;
1533}
1534
1535/*
1536* Check if other thread got the target segment. Check it not very
1537* often, only every WAL page.
1538*/
1539if (wal_consistent_read && prev_page_off != 0 &&
1540prev_page_off != reader_data->prev_page_off)
1541{
1542XLogSegNo segno;
1543
1544pthread_lock(&wal_segment_mutex);
1545segno = segno_target;
1546pthread_mutex_unlock(&wal_segment_mutex);
1547
1548if (segno != 0 && segno < reader_data->xlogsegno)
1549break;
1550}
1551prev_page_off = reader_data->prev_page_off;
1552
1553/* continue reading at next record */
1554thread_arg->startpoint = InvalidXLogRecPtr;
1555
1556GetXLogSegNo(xlogreader->EndRecPtr, nextSegNo, wal_seg_size);
1557
1558if (thread_arg->endSegNo != 0 &&
1559!XLogRecPtrIsInvalid(thread_arg->endpoint) &&
1560/*
1561* Consider thread_arg->endSegNo and thread_arg->endpoint only if
1562* they are valid.
1563*/
1564xlogreader->ReadRecPtr >= thread_arg->endpoint &&
1565nextSegNo >= thread_arg->endSegNo)
1566break;
1567}
1568
1569CleanupXLogPageRead(xlogreader);
1570XLogReaderFree(xlogreader);
1571
1572/* Extracting is successful */
1573thread_arg->ret = 0;
1574return NULL;
1575}
1576
1577/*
1578* Do manual switch to the next WAL segment.
1579*
1580* Returns false if the reader reaches the end of a WAL segment list.
1581*/
1582static bool
1583SwitchThreadToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg)
1584{
1585XLogReaderData *reader_data;
1586XLogRecPtr found;
1587
1588reader_data = (XLogReaderData *) xlogreader->private_data;
1589reader_data->need_switch = false;
1590
1591/* Critical section */
1592pthread_lock(&wal_segment_mutex);
1593Assert(segno_next);
1594reader_data->xlogsegno = segno_next;
1595segnum_read++;
1596segno_next++;
1597pthread_mutex_unlock(&wal_segment_mutex);
1598
1599/* We've reached the end */
1600if (arg->endSegNo != 0 && reader_data->xlogsegno > arg->endSegNo)
1601return false;
1602
1603/* Adjust next record position */
1604GetXLogRecPtr(reader_data->xlogsegno, 0, wal_seg_size, arg->startpoint);
1605/* We need to close previously opened file if it wasn't closed earlier */
1606CleanupXLogPageRead(xlogreader);
1607/* Skip over the page header and contrecord if any */
1608found = XLogFindNextRecord(xlogreader, arg->startpoint);
1609
1610/*
1611* We get invalid WAL record pointer usually when WAL segment is
1612* absent or is corrupted.
1613*/
1614if (XLogRecPtrIsInvalid(found))
1615{
1616/*
1617* Check if we need to stop reading. We stop if other thread found a
1618* target segment.
1619*/
1620if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1621return false;
1622else if (wal_consistent_read)
1623{
1624XLogSegNo segno_report;
1625
1626pthread_lock(&wal_segment_mutex);
1627segno_report = segno_start + segnum_read;
1628pthread_mutex_unlock(&wal_segment_mutex);
1629
1630/*
1631* Report error message if this is the first corrupted WAL.
1632*/
1633if (reader_data->xlogsegno > segno_report)
1634return false; /* otherwise just stop the thread */
1635}
1636
1637elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1638reader_data->thread_num,
1639(uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
1640PrintXLogCorruptionMsg(reader_data, ERROR);
1641}
1642arg->startpoint = found;
1643
1644elog(VERBOSE, "Thread [%d]: Switched to LSN %X/%X",
1645reader_data->thread_num,
1646(uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
1647
1648return true;
1649}
1650
1651/*
1652* Wait for other threads since the current thread couldn't read its segment.
1653* We need to decide is it fail or not.
1654*
1655* Returns true if there is no failure and previous target segment was found.
1656* Otherwise return false.
1657*/
1658static bool
1659XLogWaitForConsistency(XLogReaderState *xlogreader)
1660{
1661uint32 segnum_need;
1662XLogReaderData *reader_data =(XLogReaderData *) xlogreader->private_data;
1663bool log_message = true;
1664
1665segnum_need = reader_data->xlogsegno - segno_start;
1666while (true)
1667{
1668uint32 segnum_current_read;
1669XLogSegNo segno;
1670
1671if (log_message)
1672{
1673char xlogfname[MAXFNAMELEN];
1674
1675GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno,
1676wal_seg_size);
1677
1678elog(VERBOSE, "Thread [%d]: Possible WAL corruption in %s. Wait for other threads to decide is this a failure",
1679reader_data->thread_num, xlogfname);
1680log_message = false;
1681}
1682
1683if (interrupted || thread_interrupted)
1684elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
1685reader_data->thread_num);
1686
1687pthread_lock(&wal_segment_mutex);
1688segnum_current_read = segnum_read + segnum_corrupted;
1689segno = segno_target;
1690pthread_mutex_unlock(&wal_segment_mutex);
1691
1692/* Other threads read all previous segments and didn't find target */
1693if (segnum_need <= segnum_current_read)
1694{
1695/* Mark current segment as corrupted */
1696pthread_lock(&wal_segment_mutex);
1697segnum_corrupted++;
1698pthread_mutex_unlock(&wal_segment_mutex);
1699return false;
1700}
1701
1702if (segno != 0 && segno < reader_data->xlogsegno)
1703return true;
1704
1705pg_usleep(500000L); /* 500 ms */
1706}
1707
1708/* We shouldn't reach it */
1709return false;
1710}
1711
1712/*
1713* Cleanup after WAL segment reading.
1714*/
1715static void
1716CleanupXLogPageRead(XLogReaderState *xlogreader)
1717{
1718XLogReaderData *reader_data;
1719
1720reader_data = (XLogReaderData *) xlogreader->private_data;
1721if (reader_data->xlogfile >= 0)
1722{
1723fio_close(reader_data->xlogfile);
1724reader_data->xlogfile = -1;
1725}
1726#ifdef HAVE_LIBZ
1727else if (reader_data->gz_xlogfile != NULL)
1728{
1729fio_gzclose(reader_data->gz_xlogfile);
1730reader_data->gz_xlogfile = NULL;
1731}
1732#endif
1733reader_data->prev_page_off = 0;
1734reader_data->xlogexists = false;
1735}
1736
1737static void
1738PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel)
1739{
1740if (reader_data->xlogpath[0] != 0)
1741{
1742/*
1743* XLOG reader couldn't read WAL segment.
1744* We throw a WARNING here to be able to update backup status.
1745*/
1746if (!reader_data->xlogexists)
1747elog(elevel, "Thread [%d]: WAL segment \"%s\" is absent",
1748reader_data->thread_num, reader_data->xlogpath);
1749else if (reader_data->xlogfile != -1)
1750elog(elevel, "Thread [%d]: Possible WAL corruption. "
1751"Error has occured during reading WAL segment \"%s\"",
1752reader_data->thread_num, reader_data->xlogpath);
1753#ifdef HAVE_LIBZ
1754else if (reader_data->gz_xlogfile != NULL)
1755elog(elevel, "Thread [%d]: Possible WAL corruption. "
1756"Error has occured during reading WAL segment \"%s\"",
1757reader_data->thread_num, reader_data->gz_xlogpath);
1758#endif
1759}
1760else
1761{
1762/* Cannot tell what happened specifically */
1763elog(elevel, "Thread [%d]: An error occured during WAL reading",
1764reader_data->thread_num);
1765}
1766}
1767
1768/*
1769* Extract information about blocks modified in this record.
1770*/
1771static void
1772extractPageInfo(XLogReaderState *record, XLogReaderData *reader_data,
1773bool *stop_reading)
1774{
1775uint8 block_id;
1776RmgrId rmid = XLogRecGetRmid(record);
1777uint8 info = XLogRecGetInfo(record);
1778uint8 rminfo = info & ~XLR_INFO_MASK;
1779
1780/* Is this a special record type that I recognize? */
1781
1782if (rmid == RM_DBASE_ID
1783#if PG_VERSION_NUM >= 150000
1784&& (rminfo == XLOG_DBASE_CREATE_WAL_LOG || rminfo == XLOG_DBASE_CREATE_FILE_COPY))
1785#else
1786&& rminfo == XLOG_DBASE_CREATE)
1787#endif
1788{
1789/*
1790* New databases can be safely ignored. They would be completely
1791* copied if found.
1792*/
1793}
1794else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
1795{
1796/*
1797* An existing database was dropped. It is fine to ignore that
1798* they will be removed appropriately.
1799*/
1800}
1801else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_CREATE)
1802{
1803/*
1804* We can safely ignore these. The file will be removed when
1805* combining the backups in the case of differential on.
1806*/
1807}
1808else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_TRUNCATE)
1809{
1810/*
1811* We can safely ignore these. When we compare the sizes later on,
1812* we'll notice that they differ, and copy the missing tail from
1813* source system.
1814*/
1815}
1816else if (rmid == RM_XACT_ID &&
1817((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
1818(rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
1819(rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_ABORT ||
1820(rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_ABORT_PREPARED))
1821{
1822/*
1823* These records can include "dropped rels". We can safely ignore
1824* them, we will see that they are missing and copy them from the
1825* source.
1826*/
1827}
1828else if (info & XLR_SPECIAL_REL_UPDATE)
1829{
1830/*
1831* This record type modifies a relation file in some special way, but
1832* we don't recognize the type. That's bad - we don't know how to
1833* track that change.
1834*/
1835elog(ERROR, "WAL record modifies a relation, but record type is not recognized\n"
1836"lsn: %X/%X, rmgr: %s, info: %02X",
1837(uint32) (record->ReadRecPtr >> 32), (uint32) (record->ReadRecPtr),
1838RmgrNames[rmid], info);
1839}
1840
1841#if PG_VERSION_NUM >= 150000
1842for (block_id = 0; block_id <= record->record->max_block_id; block_id++)
1843#else
1844for (block_id = 0; block_id <= record->max_block_id; block_id++)
1845#endif
1846{
1847RelFileNode rnode;
1848ForkNumber forknum;
1849BlockNumber blkno;
1850
1851#if PG_VERSION_NUM >= 150000
1852if (!XLogRecGetBlockTagExtended(record, block_id, &rnode, &forknum, &blkno, NULL))
1853#else
1854if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
1855#endif
1856continue;
1857
1858/* We only care about the main fork; others are copied as is */
1859if (forknum != MAIN_FORKNUM)
1860continue;
1861
1862process_block_change(forknum, rnode, blkno);
1863}
1864}
1865
1866/*
1867* Check the current read WAL record during validation.
1868*/
1869static void
1870validateXLogRecord(XLogReaderState *record, XLogReaderData *reader_data,
1871bool *stop_reading)
1872{
1873/* Check target xid */
1874if (TransactionIdIsValid(wal_target_xid) &&
1875wal_target_xid == reader_data->cur_rec.rec_xid)
1876*stop_reading = true;
1877/* Check target time */
1878else if (wal_target_time != 0 &&
1879timestamptz_to_time_t(reader_data->cur_rec.rec_time) >= wal_target_time)
1880*stop_reading = true;
1881/* Check target lsn */
1882else if (XRecOffIsValid(wal_target_lsn) &&
1883reader_data->cur_rec.rec_lsn >= wal_target_lsn)
1884*stop_reading = true;
1885}
1886
1887/*
1888* Extract timestamp from WAL record.
1889*
1890* If the record contains a timestamp, returns true, and saves the timestamp
1891* in *recordXtime. If the record type has no timestamp, returns false.
1892* Currently, only transaction commit/abort records and restore points contain
1893* timestamps.
1894*/
1895static bool
1896getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
1897{
1898uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1899uint8 xact_info = info & XLOG_XACT_OPMASK;
1900uint8 rmid = XLogRecGetRmid(record);
1901
1902if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
1903{
1904*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
1905return true;
1906}
1907else if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
1908xact_info == XLOG_XACT_COMMIT_PREPARED))
1909{
1910*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
1911return true;
1912}
1913else if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
1914xact_info == XLOG_XACT_ABORT_PREPARED))
1915{
1916*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
1917return true;
1918}
1919
1920return false;
1921}
1922
1923bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size)
1924{
1925XLogRecPtr startpoint;
1926XLogRecPtr endpoint;
1927
1928bool rc;
1929int tmp_num_threads = num_threads;
1930num_threads = 1;
1931
1932/* calculate startpoint and endpoint */
1933GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
1934GetXLogRecPtr(segno+1, 0, wal_seg_size, endpoint);
1935
1936/* disable multi-threading */
1937num_threads = 1;
1938
1939rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId,
1940InvalidXLogRecPtr, tli, wal_seg_size,
1941startpoint, endpoint, false, NULL, NULL, true);
1942
1943num_threads = tmp_num_threads;
1944
1945return rc;
1946}
1947
1948static XLogRecord* WalReadRecord(XLogReaderState *xlogreader, XLogRecPtr startpoint, char **errormsg)
1949{
1950
1951#if PG_VERSION_NUM >= 130000
1952return XLogReadRecord(xlogreader, errormsg);
1953#else
1954return XLogReadRecord(xlogreader, startpoint, errormsg);
1955#endif
1956
1957}
1958
1959static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *reader_data)
1960{
1961
1962#if PG_VERSION_NUM >= 130000
1963return XLogReaderAllocate(wal_seg_size, NULL,
1964XL_ROUTINE(.page_read = &SimpleXLogPageRead),
1965reader_data);
1966#elif PG_VERSION_NUM >= 110000
1967return XLogReaderAllocate(wal_seg_size, &SimpleXLogPageRead,
1968reader_data);
1969#else
1970return XLogReaderAllocate(&SimpleXLogPageRead, reader_data);
1971#endif
1972}
1973