pg_probackup

Форк
0
/
parsexlog.c 
1972 строки · 54.7 Кб
1
/*-------------------------------------------------------------------------
2
 *
3
 * parsexlog.c
4
 *	  Functions for reading Write-Ahead-Log
5
 *
6
 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 * Portions Copyright (c) 2015-2019, Postgres Professional
9
 *
10
 *-------------------------------------------------------------------------
11
 */
12

13
#include "pg_probackup.h"
14

15
#include "access/transam.h"
16
#include "catalog/pg_control.h"
17
#include "commands/dbcommands_xlog.h"
18
#include "catalog/storage_xlog.h"
19

20
#ifdef HAVE_LIBZ
21
#include <zlib.h>
22
#endif
23

24
#include "utils/thread.h"
25
#include <unistd.h>
26
#include <time.h>
27

28
/*
29
 * RmgrNames is an array of resource manager names, to make error messages
30
 * a bit nicer.
31
 */
32
#if PG_VERSION_NUM >= 150000
33
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
34
  name,
35
#elif PG_VERSION_NUM >= 100000
36
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
37
  name,
38
#else
39
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
40
  name,
41
#endif
42

43
static const char *RmgrNames[RM_MAX_ID + 1] = {
44
#include "access/rmgrlist.h"
45
};
46

47
/* some from access/xact.h */
48
/*
49
 * XLOG allows to store some information in high 4 bits of log record xl_info
50
 * field. We use 3 for the opcode, and one about an optional flag variable.
51
 */
52
#define XLOG_XACT_COMMIT			0x00
53
#define XLOG_XACT_PREPARE			0x10
54
#define XLOG_XACT_ABORT				0x20
55
#define XLOG_XACT_COMMIT_PREPARED	0x30
56
#define XLOG_XACT_ABORT_PREPARED	0x40
57
#define XLOG_XACT_ASSIGNMENT		0x50
58
/* free opcode 0x60 */
59
/* free opcode 0x70 */
60

61
/* mask for filtering opcodes out of xl_info */
62
#define XLOG_XACT_OPMASK			0x70
63

64
typedef struct xl_xact_commit
65
{
66
	TimestampTz xact_time;		/* time of commit */
67

68
	/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
69
	/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
70
	/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
71
	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
72
	/* xl_xact_invals follows if XINFO_HAS_INVALS */
73
	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
74
	/* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */
75
} xl_xact_commit;
76

77
typedef struct xl_xact_abort
78
{
79
	TimestampTz xact_time;		/* time of abort */
80

81
	/* xl_xact_xinfo follows if XLOG_XACT_HAS_INFO */
82
	/* No db_info required */
83
	/* xl_xact_subxacts follows if HAS_SUBXACT */
84
	/* xl_xact_relfilenodes follows if HAS_RELFILENODES */
85
	/* No invalidation messages needed. */
86
	/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
87
} xl_xact_abort;
88

89
/*
90
 * XLogRecTarget allows to track the last recovery targets. Currently used only
91
 * within validate_wal().
92
 */
93
typedef struct XLogRecTarget
94
{
95
	TimestampTz	rec_time;
96
	TransactionId rec_xid;
97
	XLogRecPtr	rec_lsn;
98
} XLogRecTarget;
99

100
typedef struct XLogReaderData
101
{
102
	int			thread_num;
103
	TimeLineID	tli;
104

105
	XLogRecTarget cur_rec;
106
	XLogSegNo	xlogsegno;
107
	bool		xlogexists;
108

109
	char		 page_buf[XLOG_BLCKSZ];
110
	uint32		 prev_page_off;
111

112
	bool		need_switch;
113

114
	int			xlogfile;
115
	char		xlogpath[MAXPGPATH];
116

117
#ifdef HAVE_LIBZ
118
	gzFile		 gz_xlogfile;
119
	char		 gz_xlogpath[MAXPGPATH];
120
#endif
121
} XLogReaderData;
122

123
/* Function to process a WAL record */
124
typedef void (*xlog_record_function) (XLogReaderState *record,
125
									  XLogReaderData *reader_data,
126
									  bool *stop_reading);
127

128
/* An argument for a thread function */
129
typedef struct
130
{
131
	XLogReaderData reader_data;
132

133
	xlog_record_function process_record;
134

135
	XLogRecPtr	startpoint;
136
	XLogRecPtr	endpoint;
137
	XLogSegNo	endSegNo;
138

139
	/*
140
	 * The thread got the recovery target.
141
	 */
142
	bool		got_target;
143

144
	/* Should we read record, located at endpoint position */
145
	bool        inclusive_endpoint;
146

147
	/*
148
	 * Return value from the thread.
149
	 * 0 means there is no error, 1 - there is an error.
150
	 */
151
	int			ret;
152
} xlog_thread_arg;
153

154
static XLogRecord* WalReadRecord(XLogReaderState *xlogreader, XLogRecPtr startpoint, char **errormsg);
155
static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *reader_data);
156

157
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
158
				   XLogRecPtr targetPagePtr,
159
				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf
160
#if PG_VERSION_NUM < 130000
161
				   ,TimeLineID *pageTLI
162
#endif
163
					);
164
static XLogReaderState *InitXLogPageRead(XLogReaderData *reader_data,
165
										 const char *archivedir,
166
										 TimeLineID tli, uint32 segment_size,
167
										 bool manual_switch,
168
										 bool consistent_read,
169
										 bool allocate_reader);
170
static bool RunXLogThreads(const char *archivedir,
171
						   time_t target_time, TransactionId target_xid,
172
						   XLogRecPtr target_lsn,
173
						   TimeLineID tli, uint32 segment_size,
174
						   XLogRecPtr startpoint, XLogRecPtr endpoint,
175
						   bool consistent_read,
176
						   xlog_record_function process_record,
177
						   XLogRecTarget *last_rec,
178
						   bool inclusive_endpoint);
179
//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg);
180
static bool SwitchThreadToNextWal(XLogReaderState *xlogreader,
181
								  xlog_thread_arg *arg);
182
static bool XLogWaitForConsistency(XLogReaderState *xlogreader);
183
static void *XLogThreadWorker(void *arg);
184
static void CleanupXLogPageRead(XLogReaderState *xlogreader);
185
static void PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel);
186

187
static void extractPageInfo(XLogReaderState *record,
188
							XLogReaderData *reader_data, bool *stop_reading);
189
static void validateXLogRecord(XLogReaderState *record,
190
							   XLogReaderData *reader_data, bool *stop_reading);
191
static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime);
192

193
static XLogSegNo segno_start = 0;
194
/* Segment number where target record is located */
195
static XLogSegNo segno_target = 0;
196
/* Next segment number to read by a thread */
197
static XLogSegNo segno_next = 0;
198
/* Number of segments already read by threads */
199
static uint32 segnum_read = 0;
200
/* Number of detected corrupted or absent segments */
201
static uint32 segnum_corrupted = 0;
202
static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER;
203

204
/* copied from timestamp.c */
205
static pg_time_t
206
timestamptz_to_time_t(TimestampTz t)
207
{
208
	pg_time_t	result;
209

210
#ifdef HAVE_INT64_TIMESTAMP
211
	result = (pg_time_t) (t / USECS_PER_SEC +
212
				 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY));
213
#else
214
	result = (pg_time_t) (t +
215
				 ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY));
216
#endif
217
	return result;
218
}
219

220
static const char	   *wal_archivedir = NULL;
221
static uint32			wal_seg_size = 0;
222
/*
223
 * If true a wal reader thread switches to the next segment using
224
 * segno_next.
225
 */
226
static bool				wal_manual_switch = false;
227
/*
228
 * If true a wal reader thread waits for other threads if the thread met absent
229
 * wal segment.
230
 */
231
static bool				wal_consistent_read = false;
232

233
/*
234
 * Variables used within validate_wal() and validateXLogRecord() to stop workers
235
 */
236
static time_t			wal_target_time = 0;
237
static TransactionId	wal_target_xid = InvalidTransactionId;
238
static XLogRecPtr		wal_target_lsn = InvalidXLogRecPtr;
239

240
/*
241
 * Read WAL from the archive directory, from 'startpoint' to 'endpoint' on the
242
 * given timeline. Collect data blocks touched by the WAL records into a page map.
243
 *
244
 * Pagemap extracting is processed using threads. Each thread reads single WAL
245
 * file.
246
 */
247
bool
248
extractPageMap(const char *archivedir, uint32 wal_seg_size,
249
			   XLogRecPtr startpoint, TimeLineID start_tli,
250
			   XLogRecPtr endpoint, TimeLineID end_tli,
251
			   parray *tli_list)
252
{
253
	bool		extract_isok = false;
254

255
	if (start_tli == end_tli)
256
		/* easy case */
257
		extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
258
									  InvalidXLogRecPtr, end_tli, wal_seg_size,
259
									  startpoint, endpoint, false, extractPageInfo,
260
									  NULL, true);
261
	else
262
	{
263
		/* We have to process WAL located on several different xlog intervals,
264
		 * located on different timelines.
265
		 *
266
		 * Consider this example:
267
		 * t3               C-----X <!- We are here
268
		 *                 /
269
		 * t2         B---*-->
270
		 *           /
271
		 * t1 -A----*------->
272
		 *
273
		 * A - prev backup START_LSN
274
		 * B - switchpoint for t2, available as t2->switchpoint
275
		 * C - switch for t3, available as t3->switchpoint
276
		 * X - current backup START_LSN
277
		 *
278
		 * Intervals to be parsed:
279
		 *  - [A,B) on t1
280
		 *  - [B,C) on t2
281
		 *  - [C,X] on t3
282
		 */
283
		int i;
284
		parray       *interval_list = parray_new();
285
		timelineInfo *end_tlinfo = NULL;
286
		timelineInfo *tmp_tlinfo = NULL;
287
		XLogRecPtr    prev_switchpoint = InvalidXLogRecPtr;
288

289
		/* We must find TLI information about final timeline (t3 in example) */
290
		for (i = 0; i < parray_num(tli_list); i++)
291
		{
292
			tmp_tlinfo = parray_get(tli_list, i);
293

294
			if (tmp_tlinfo->tli == end_tli)
295
			{
296
				end_tlinfo = tmp_tlinfo;
297
				break;
298
			}
299
		}
300

301
		/* Iterate over timelines backward,
302
		 * starting with end_tli and ending with start_tli.
303
		 * For every timeline calculate LSN-interval that must be parsed.
304
		 */
305

306
		tmp_tlinfo = end_tlinfo;
307
		while (tmp_tlinfo)
308
		{
309
			lsnInterval *wal_interval = pgut_malloc(sizeof(lsnInterval));
310
			wal_interval->tli = tmp_tlinfo->tli;
311

312
			if (tmp_tlinfo->tli == end_tli)
313
			{
314
				wal_interval->begin_lsn = tmp_tlinfo->switchpoint;
315
				wal_interval->end_lsn = endpoint;
316
			}
317
			else if (tmp_tlinfo->tli == start_tli)
318
			{
319
				wal_interval->begin_lsn = startpoint;
320
				wal_interval->end_lsn = prev_switchpoint;
321
			}
322
			else
323
			{
324
				wal_interval->begin_lsn = tmp_tlinfo->switchpoint;
325
				wal_interval->end_lsn = prev_switchpoint;
326
			}
327

328
			parray_append(interval_list, wal_interval);
329

330
			if (tmp_tlinfo->tli == start_tli)
331
				break;
332

333
			prev_switchpoint = tmp_tlinfo->switchpoint;
334
			tmp_tlinfo = tmp_tlinfo->parent_link;
335
		}
336

337
		for (i = parray_num(interval_list) - 1; i >= 0; i--)
338
		{
339
			bool         inclusive_endpoint;
340
			lsnInterval *tmp_interval = (lsnInterval *) parray_get(interval_list, i);
341

342
			/* In case of replica promotion, endpoints of intermediate
343
			 * timelines can be unreachable.
344
			 */
345
			inclusive_endpoint = false;
346

347
			/* ... but not the end timeline */
348
			if (tmp_interval->tli == end_tli)
349
				inclusive_endpoint = true;
350

351
			extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId,
352
									  InvalidXLogRecPtr, tmp_interval->tli, wal_seg_size,
353
									  tmp_interval->begin_lsn, tmp_interval->end_lsn,
354
									  false, extractPageInfo, NULL, inclusive_endpoint);
355
			if (!extract_isok)
356
				break;
357

358
			pg_free(tmp_interval);
359
		}
360
		pg_free(interval_list);
361
	}
362

363
	return extract_isok;
364
}
365

366
/*
367
 * Ensure that the backup has all wal files needed for recovery to consistent
368
 * state.
369
 *
370
 * WAL records reading is processed using threads. Each thread reads single WAL
371
 * file.
372
 */
373
static void
374
validate_backup_wal_from_start_to_stop(pgBackup *backup,
375
									   const char *archivedir, TimeLineID tli,
376
									   uint32 xlog_seg_size)
377
{
378
	bool		got_endpoint;
379

380
	got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId,
381
								  InvalidXLogRecPtr, tli, xlog_seg_size,
382
								  backup->start_lsn, backup->stop_lsn,
383
								  false, NULL, NULL, true);
384

385
	if (!got_endpoint)
386
	{
387
		/*
388
		 * If we don't have WAL between start_lsn and stop_lsn,
389
		 * the backup is definitely corrupted. Update its status.
390
		 */
391
		write_backup_status(backup, BACKUP_STATUS_CORRUPT, true);
392

393
		elog(WARNING, "There are not enough WAL records to consistenly restore "
394
			"backup %s from START LSN: %X/%X to STOP LSN: %X/%X",
395
			 backup_id_of(backup),
396
			 (uint32) (backup->start_lsn >> 32),
397
			 (uint32) (backup->start_lsn),
398
			 (uint32) (backup->stop_lsn >> 32),
399
			 (uint32) (backup->stop_lsn));
400
	}
401
}
402

403
/*
404
 * Ensure that the backup has all wal files needed for recovery to consistent
405
 * state. And check if we have in archive all files needed to restore the backup
406
 * up to the given recovery target.
407
 */
408
void
409
validate_wal(pgBackup *backup, const char *archivedir,
410
			 time_t target_time, TransactionId target_xid,
411
			 XLogRecPtr target_lsn, TimeLineID tli, uint32 wal_seg_size)
412
{
413
	XLogRecTarget last_rec;
414
	char		last_timestamp[100],
415
				target_timestamp[100];
416
	bool		all_wal = false;
417

418
	if (!XRecOffIsValid(backup->start_lsn))
419
		elog(ERROR, "Invalid start_lsn value %X/%X of backup %s",
420
			 (uint32) (backup->start_lsn >> 32), (uint32) (backup->start_lsn),
421
			 backup_id_of(backup));
422

423
	if (!XRecOffIsValid(backup->stop_lsn))
424
		elog(ERROR, "Invalid stop_lsn value %X/%X of backup %s",
425
			 (uint32) (backup->stop_lsn >> 32), (uint32) (backup->stop_lsn),
426
			 backup_id_of(backup));
427

428
	/*
429
	 * Check that the backup has all wal files needed
430
	 * for recovery to consistent state.
431
	 */
432
	if (backup->stream)
433
	{
434
		char	backup_database_dir[MAXPGPATH];
435
		char	backup_xlog_path[MAXPGPATH];
436

437
		join_path_components(backup_database_dir, backup->root_dir, DATABASE_DIR);
438
		join_path_components(backup_xlog_path, backup_database_dir, PG_XLOG_DIR);
439

440
		validate_backup_wal_from_start_to_stop(backup, backup_xlog_path, tli,
441
											   wal_seg_size);
442
	}
443
	else
444
		validate_backup_wal_from_start_to_stop(backup, (char *) archivedir, tli,
445
											   wal_seg_size);
446

447
	if (backup->status == BACKUP_STATUS_CORRUPT)
448
	{
449
		elog(WARNING, "Backup %s WAL segments are corrupted", backup_id_of(backup));
450
		return;
451
	}
452
	/*
453
	 * If recovery target is provided check that we can restore backup to a
454
	 * recovery target time or xid.
455
	 */
456
	if (!TransactionIdIsValid(target_xid) && target_time == 0 &&
457
		!XRecOffIsValid(target_lsn))
458
	{
459
		/* Recovery target is not given so exit */
460
		elog(INFO, "Backup %s WAL segments are valid", backup_id_of(backup));
461
		return;
462
	}
463

464
	/*
465
	 * If recovery target is provided, ensure that archive files exist in
466
	 * archive directory.
467
	 */
468
	if (dir_is_empty(archivedir, FIO_LOCAL_HOST))
469
		elog(ERROR, "WAL archive is empty. You cannot restore backup to a recovery target without WAL archive.");
470

471
	/*
472
	 * Check if we have in archive all files needed to restore backup
473
	 * up to the given recovery target.
474
	 * In any case we cannot restore to the point before stop_lsn.
475
	 */
476

477
	/* We can restore at least up to the backup end */
478
	last_rec.rec_time = 0;
479
	last_rec.rec_xid = backup->recovery_xid;
480
	last_rec.rec_lsn = backup->stop_lsn;
481

482
	time2iso(last_timestamp, lengthof(last_timestamp), backup->recovery_time, false);
483

484
	if ((TransactionIdIsValid(target_xid) && target_xid == last_rec.rec_xid)
485
		|| (target_time != 0 && backup->recovery_time >= target_time)
486
		|| (XRecOffIsValid(target_lsn) && last_rec.rec_lsn >= target_lsn))
487
		all_wal = true;
488

489
	all_wal = all_wal ||
490
		RunXLogThreads(archivedir, target_time, target_xid, target_lsn,
491
					   tli, wal_seg_size, backup->stop_lsn,
492
					   InvalidXLogRecPtr, true, validateXLogRecord, &last_rec, true);
493
	if (last_rec.rec_time > 0)
494
		time2iso(last_timestamp, lengthof(last_timestamp),
495
				 timestamptz_to_time_t(last_rec.rec_time), false);
496

497
	/* There are all needed WAL records */
498
	if (all_wal)
499
		elog(INFO, "Backup validation completed successfully on time %s, xid " XID_FMT " and LSN %X/%X",
500
			 last_timestamp, last_rec.rec_xid,
501
			 (uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn);
502
	/* Some needed WAL records are absent */
503
	else
504
	{
505
		elog(WARNING, "Recovery can be done up to time %s, xid " XID_FMT " and LSN %X/%X",
506
				last_timestamp, last_rec.rec_xid,
507
			 (uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn);
508

509
		if (target_time > 0)
510
			time2iso(target_timestamp, lengthof(target_timestamp), target_time, false);
511
		if (TransactionIdIsValid(target_xid) && target_time != 0)
512
			elog(ERROR, "Not enough WAL records to time %s and xid " XID_FMT,
513
					target_timestamp, target_xid);
514
		else if (TransactionIdIsValid(target_xid))
515
			elog(ERROR, "Not enough WAL records to xid " XID_FMT,
516
					target_xid);
517
		else if (target_time != 0)
518
			elog(ERROR, "Not enough WAL records to time %s",
519
					target_timestamp);
520
		else if (XRecOffIsValid(target_lsn))
521
			elog(ERROR, "Not enough WAL records to lsn %X/%X",
522
					(uint32) (target_lsn >> 32), (uint32) (target_lsn));
523
	}
524
}
525

526
/*
527
 * Read from archived WAL segments latest recovery time and xid. All necessary
528
 * segments present at archive folder. We waited **stop_lsn** in
529
 * pg_stop_backup().
530
 */
531
bool
532
read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size,
533
				   XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
534
				   time_t *recovery_time)
535
{
536
	XLogRecPtr	startpoint = stop_lsn;
537
	XLogReaderState *xlogreader;
538
	XLogReaderData reader_data;
539
	bool		res;
540

541
	if (!XRecOffIsValid(start_lsn))
542
		elog(ERROR, "Invalid start_lsn value %X/%X",
543
			 (uint32) (start_lsn >> 32), (uint32) (start_lsn));
544

545
	if (!XRecOffIsValid(stop_lsn))
546
		elog(ERROR, "Invalid stop_lsn value %X/%X",
547
			 (uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
548

549
	xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
550
								  false, true, true);
551

552
	/* Read records from stop_lsn down to start_lsn */
553
	do
554
	{
555
		XLogRecord *record;
556
		TimestampTz last_time = 0;
557
		char	   *errormsg;
558

559
#if PG_VERSION_NUM >= 130000
560
		if (XLogRecPtrIsInvalid(startpoint))
561
			startpoint = SizeOfXLogShortPHD;
562
		XLogBeginRead(xlogreader, startpoint);
563
#endif
564

565
		record = WalReadRecord(xlogreader, startpoint, &errormsg);
566
		if (record == NULL)
567
		{
568
			XLogRecPtr	errptr;
569

570
			errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
571

572
			if (errormsg)
573
				elog(ERROR, "Could not read WAL record at %X/%X: %s",
574
					 (uint32) (errptr >> 32), (uint32) (errptr),
575
					 errormsg);
576
			else
577
				elog(ERROR, "Could not read WAL record at %X/%X",
578
					 (uint32) (errptr >> 32), (uint32) (errptr));
579
		}
580

581
		/* Read previous record */
582
		startpoint = record->xl_prev;
583

584
		if (getRecordTimestamp(xlogreader, &last_time))
585
		{
586
			*recovery_time = timestamptz_to_time_t(last_time);
587

588
			/* Found timestamp in WAL record 'record' */
589
			res = true;
590
			goto cleanup;
591
		}
592
	} while (startpoint >= start_lsn);
593

594
	/* Didn't find timestamp from WAL records between start_lsn and stop_lsn */
595
	res = false;
596

597
cleanup:
598
	CleanupXLogPageRead(xlogreader);
599
	XLogReaderFree(xlogreader);
600

601
	return res;
602
}
603

604
/*
605
 * Check if there is a WAL segment file in 'archivedir' which contains
606
 * 'target_lsn'.
607
 */
608
bool
609
wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
610
				 TimeLineID target_tli, uint32 wal_seg_size)
611
{
612
	XLogReaderState *xlogreader;
613
	XLogReaderData reader_data;
614
	char	   *errormsg;
615
	bool		res;
616

617
	if (!XRecOffIsValid(target_lsn))
618
		elog(ERROR, "Invalid target_lsn value %X/%X",
619
			 (uint32) (target_lsn >> 32), (uint32) (target_lsn));
620

621
	xlogreader = InitXLogPageRead(&reader_data, archivedir, target_tli,
622
								  wal_seg_size, false, false, true);
623

624
	if (xlogreader == NULL)
625
			elog(ERROR, "Out of memory");
626

627
	xlogreader->system_identifier = instance_config.system_identifier;
628

629
#if PG_VERSION_NUM >= 130000
630
	if (XLogRecPtrIsInvalid(target_lsn))
631
		target_lsn = SizeOfXLogShortPHD;
632
	XLogBeginRead(xlogreader, target_lsn);
633
#endif
634

635
	res = WalReadRecord(xlogreader, target_lsn, &errormsg) != NULL;
636
	/* Didn't find 'target_lsn' and there is no error, return false */
637

638
	if (errormsg)
639
		elog(WARNING, "Could not read WAL record at %X/%X: %s",
640
				(uint32) (target_lsn >> 32), (uint32) (target_lsn), errormsg);
641

642
	CleanupXLogPageRead(xlogreader);
643
	XLogReaderFree(xlogreader);
644

645
	return res;
646
}
647

648
/*
649
 * Get LSN of a first record within the WAL segment with number 'segno'.
650
 */
651
XLogRecPtr
652
get_first_record_lsn(const char *archivedir, XLogSegNo	segno,
653
					 TimeLineID tli, uint32 wal_seg_size, int timeout)
654
{
655
	XLogReaderState *xlogreader;
656
	XLogReaderData   reader_data;
657
	XLogRecPtr       record = InvalidXLogRecPtr;
658
	XLogRecPtr       startpoint;
659
	char             wal_segment[MAXFNAMELEN];
660
	int              attempts = 0;
661

662
	if (segno <= 1)
663
		elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
664

665
	GetXLogFileName(wal_segment, tli, segno, instance_config.xlog_seg_size);
666

667
	xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
668
								  false, false, true);
669
	if (xlogreader == NULL)
670
			elog(ERROR, "Out of memory");
671
	xlogreader->system_identifier = instance_config.system_identifier;
672

673
	/* Set startpoint to 0 in segno */
674
	GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
675

676
#if PG_VERSION_NUM >= 130000
677
	if (XLogRecPtrIsInvalid(startpoint))
678
		startpoint = SizeOfXLogShortPHD;
679
	XLogBeginRead(xlogreader, startpoint);
680
#endif
681

682
	while (attempts <= timeout)
683
	{
684
		record = XLogFindNextRecord(xlogreader, startpoint);
685

686
		if (XLogRecPtrIsInvalid(record))
687
			record = InvalidXLogRecPtr;
688
		else
689
		{
690
			elog(LOG, "First record in WAL segment \"%s\": %X/%X", wal_segment,
691
					(uint32) (record >> 32), (uint32) (record));
692
			break;
693
		}
694

695
		attempts++;
696
		sleep(1);
697
	}
698

699
	/* cleanup */
700
	CleanupXLogPageRead(xlogreader);
701
	XLogReaderFree(xlogreader);
702

703
	return record;
704
}
705

706

707
/*
708
 * Get LSN of the record next after target lsn.
709
 */
710
XLogRecPtr
711
get_next_record_lsn(const char *archivedir, XLogSegNo	segno,
712
					 TimeLineID tli, uint32 wal_seg_size, int timeout,
713
					 XLogRecPtr target)
714
{
715
	XLogReaderState *xlogreader;
716
	XLogReaderData   reader_data;
717
	XLogRecPtr       startpoint, found;
718
	XLogRecPtr       res = InvalidXLogRecPtr;
719
	char             wal_segment[MAXFNAMELEN];
720
	int              attempts = 0;
721

722
	if (segno <= 1)
723
		elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
724

725
	GetXLogFileName(wal_segment, tli, segno, instance_config.xlog_seg_size);
726

727
	xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
728
								  false, false, true);
729
	if (xlogreader == NULL)
730
			elog(ERROR, "Out of memory");
731
	xlogreader->system_identifier = instance_config.system_identifier;
732

733
	/* Set startpoint to 0 in segno */
734
	GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
735

736
#if PG_VERSION_NUM >= 130000
737
	if (XLogRecPtrIsInvalid(startpoint))
738
		startpoint = SizeOfXLogShortPHD;
739
	XLogBeginRead(xlogreader, startpoint);
740
#endif
741

742
	found = XLogFindNextRecord(xlogreader, startpoint);
743

744
	if (XLogRecPtrIsInvalid(found))
745
	{
746
		if (xlogreader->errormsg_buf[0] != '\0')
747
			elog(WARNING, "Could not read WAL record at %X/%X: %s",
748
				 (uint32) (startpoint >> 32), (uint32) (startpoint),
749
				 xlogreader->errormsg_buf);
750
		else
751
			elog(WARNING, "Could not read WAL record at %X/%X",
752
				 (uint32) (startpoint >> 32), (uint32) (startpoint));
753
		PrintXLogCorruptionMsg(&reader_data, ERROR);
754
	}
755
	startpoint = found;
756

757
	while (attempts <= timeout)
758
	{
759
		XLogRecord *record;
760
		char	   *errormsg;
761

762
		if (interrupted)
763
			elog(ERROR, "Interrupted during WAL reading");
764

765
		record = WalReadRecord(xlogreader, startpoint, &errormsg);
766

767
		if (record == NULL)
768
		{
769
			XLogRecPtr	errptr;
770

771
			errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
772
				startpoint;
773

774
			if (errormsg)
775
				elog(WARNING, "Could not read WAL record at %X/%X: %s",
776
					 (uint32) (errptr >> 32), (uint32) (errptr),
777
					 errormsg);
778
			else
779
				elog(WARNING, "Could not read WAL record at %X/%X",
780
					 (uint32) (errptr >> 32), (uint32) (errptr));
781
			PrintXLogCorruptionMsg(&reader_data, ERROR);
782
		}
783

784
		if (xlogreader->ReadRecPtr >= target)
785
		{
786
			elog(LOG, "Record %X/%X is next after target LSN %X/%X",
787
				(uint32) (xlogreader->ReadRecPtr >> 32), (uint32) (xlogreader->ReadRecPtr),
788
				(uint32) (target >> 32), (uint32) (target));
789
			res = xlogreader->ReadRecPtr;
790
			break;
791
		}
792
		else
793
			startpoint = InvalidXLogRecPtr;
794
	}
795

796
	/* cleanup */
797
	CleanupXLogPageRead(xlogreader);
798
	XLogReaderFree(xlogreader);
799

800
	return res;
801
}
802

803

804
/*
805
 * Get LSN of a record prior to target_lsn.
806
 * If 'start_lsn' is in the segment with number 'segno' then start from 'start_lsn',
807
 * otherwise start from offset 0 within the segment.
808
 *
809
 * Returns LSN of a record which EndRecPtr is greater or equal to target_lsn.
810
 * If 'seek_prev_segment' is true, then look for prior record in prior WAL segment.
811
 *
812
 * it's unclear that "last" in "last_wal_lsn" refers to the
813
 * "closest to stop_lsn backward or forward, depending on seek_prev_segment setting".
814
 */
815
XLogRecPtr
816
get_prior_record_lsn(const char *archivedir, XLogRecPtr start_lsn,
817
				 XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment,
818
				 uint32 wal_seg_size)
819
{
820
	XLogReaderState *xlogreader;
821
	XLogReaderData reader_data;
822
	XLogRecPtr	startpoint;
823
	XLogSegNo	start_segno;
824
	XLogSegNo	segno;
825
	XLogRecPtr	res = InvalidXLogRecPtr;
826

827
	GetXLogSegNo(stop_lsn, segno, wal_seg_size);
828

829
	if (segno <= 1)
830
		elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
831

832
	if (seek_prev_segment)
833
		segno = segno - 1;
834

835
	xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size,
836
								  false, false, true);
837

838
	if (xlogreader == NULL)
839
			elog(ERROR, "Out of memory");
840

841
	xlogreader->system_identifier = instance_config.system_identifier;
842

843
	/*
844
	 * Calculate startpoint. Decide: we should use 'start_lsn' or offset 0.
845
	 */
846
	GetXLogSegNo(start_lsn, start_segno, wal_seg_size);
847
	if (start_segno == segno)
848
	{
849
		startpoint = start_lsn;
850
#if PG_VERSION_NUM >= 130000
851
		if (XLogRecPtrIsInvalid(startpoint))
852
			startpoint = SizeOfXLogShortPHD;
853
		XLogBeginRead(xlogreader, startpoint);
854
#endif
855
	}
856
	else
857
	{
858
		XLogRecPtr	found;
859

860
		GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
861

862
#if PG_VERSION_NUM >= 130000
863
		if (XLogRecPtrIsInvalid(startpoint))
864
			startpoint = SizeOfXLogShortPHD;
865
		XLogBeginRead(xlogreader, startpoint);
866
#endif
867

868
		found = XLogFindNextRecord(xlogreader, startpoint);
869

870
		if (XLogRecPtrIsInvalid(found))
871
		{
872
			if (xlogreader->errormsg_buf[0] != '\0')
873
				elog(WARNING, "Could not read WAL record at %X/%X: %s",
874
					 (uint32) (startpoint >> 32), (uint32) (startpoint),
875
					 xlogreader->errormsg_buf);
876
			else
877
				elog(WARNING, "Could not read WAL record at %X/%X",
878
					 (uint32) (startpoint >> 32), (uint32) (startpoint));
879
			PrintXLogCorruptionMsg(&reader_data, ERROR);
880
		}
881
		startpoint = found;
882
	}
883

884
	while (true)
885
	{
886
		XLogRecord *record;
887
		char	   *errormsg;
888

889
		if (interrupted)
890
			elog(ERROR, "Interrupted during WAL reading");
891

892
		record = WalReadRecord(xlogreader, startpoint, &errormsg);
893
		if (record == NULL)
894
		{
895
			XLogRecPtr	errptr;
896

897
			errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
898
				startpoint;
899

900
			if (errormsg)
901
				elog(WARNING, "Could not read WAL record at %X/%X: %s",
902
					 (uint32) (errptr >> 32), (uint32) (errptr),
903
					 errormsg);
904
			else
905
				elog(WARNING, "Could not read WAL record at %X/%X",
906
					 (uint32) (errptr >> 32), (uint32) (errptr));
907
			PrintXLogCorruptionMsg(&reader_data, ERROR);
908
		}
909

910
		if (xlogreader->EndRecPtr >= stop_lsn)
911
		{
912
			elog(LOG, "Record %X/%X has endpoint %X/%X which is equal or greater than requested LSN %X/%X",
913
				(uint32) (xlogreader->ReadRecPtr >> 32), (uint32) (xlogreader->ReadRecPtr),
914
				(uint32) (xlogreader->EndRecPtr >> 32), (uint32) (xlogreader->EndRecPtr),
915
				(uint32) (stop_lsn >> 32), (uint32) (stop_lsn));
916
			res = xlogreader->ReadRecPtr;
917
			break;
918
		}
919

920
		/* continue reading at next record */
921
		startpoint = InvalidXLogRecPtr;
922
	}
923

924
	CleanupXLogPageRead(xlogreader);
925
	XLogReaderFree(xlogreader);
926

927
	return res;
928
}
929

930
#ifdef HAVE_LIBZ
931
/*
932
 * Show error during work with compressed file
933
 */
934
static const char *
935
get_gz_error(gzFile gzf)
936
{
937
	int			errnum;
938
	const char *errmsg;
939

940
	errmsg = fio_gzerror(gzf, &errnum);
941
	if (errnum == Z_ERRNO)
942
		return strerror(errno);
943
	else
944
		return errmsg;
945
}
946
#endif
947

948
/* XLogreader callback function, to read a WAL page */
949
static int
950
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
951
				   int reqLen, XLogRecPtr targetRecPtr, char *readBuf
952
#if PG_VERSION_NUM < 130000
953
				   ,TimeLineID *pageTLI
954
#endif
955
				   )
956
{
957
	XLogReaderData *reader_data;
958
	uint32		targetPageOff;
959

960
	reader_data = (XLogReaderData *) xlogreader->private_data;
961
	targetPageOff = targetPagePtr % wal_seg_size;
962

963
	if (interrupted || thread_interrupted)
964
		elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
965
			 reader_data->thread_num);
966

967
	/*
968
	 * See if we need to switch to a new segment because the requested record
969
	 * is not in the currently open one.
970
	 */
971
	if (!IsInXLogSeg(targetPagePtr, reader_data->xlogsegno, wal_seg_size))
972
	{
973
		elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X",
974
			 reader_data->thread_num,
975
			 (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr),
976
			 (uint32) (xlogreader->currRecPtr >> 32),
977
			 (uint32) (xlogreader->currRecPtr ));
978

979
		/*
980
		 * If the last record on the page is not complete,
981
		 * we must continue reading pages in the same thread
982
		 */
983
		if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) &&
984
			xlogreader->currRecPtr < targetPagePtr)
985
		{
986
			CleanupXLogPageRead(xlogreader);
987

988
			/*
989
			 * Switch to the next WAL segment after reading contrecord.
990
			 */
991
			if (wal_manual_switch)
992
				reader_data->need_switch = true;
993
		}
994
		else
995
		{
996
			CleanupXLogPageRead(xlogreader);
997
			/*
998
			 * Do not switch to next WAL segment in this function. It is
999
			 * manually switched by a thread routine.
1000
			 */
1001
			if (wal_manual_switch)
1002
			{
1003
				reader_data->need_switch = true;
1004
				return -1;
1005
			}
1006
		}
1007
	}
1008

1009
	GetXLogSegNo(targetPagePtr, reader_data->xlogsegno, wal_seg_size);
1010

1011
	/* Try to switch to the next WAL segment */
1012
	if (!reader_data->xlogexists)
1013
	{
1014
		char		xlogfname[MAXFNAMELEN];
1015
		char		partial_file[MAXPGPATH];
1016

1017
		GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, wal_seg_size);
1018

1019
		join_path_components(reader_data->xlogpath, wal_archivedir, xlogfname);
1020
		snprintf(reader_data->gz_xlogpath, MAXPGPATH, "%s.gz", reader_data->xlogpath);
1021

1022
		/* We fall back to using .partial segment in case if we are running
1023
		 * multi-timeline incremental backup right after standby promotion.
1024
		 * TODO: it should be explicitly enabled.
1025
		 */
1026
		snprintf(partial_file, MAXPGPATH, "%s.partial", reader_data->xlogpath);
1027

1028
		/* If segment do not exists, but the same
1029
		 * segment with '.partial' suffix does, use it instead */
1030
		if (!fileExists(reader_data->xlogpath, FIO_LOCAL_HOST) &&
1031
			fileExists(partial_file, FIO_LOCAL_HOST))
1032
		{
1033
			snprintf(reader_data->xlogpath, MAXPGPATH, "%s", partial_file);
1034
		}
1035

1036
		if (fileExists(reader_data->xlogpath, FIO_LOCAL_HOST))
1037
		{
1038
			elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
1039
				 reader_data->thread_num, reader_data->xlogpath);
1040

1041
			reader_data->xlogexists = true;
1042
			reader_data->xlogfile = fio_open(reader_data->xlogpath,
1043
											 O_RDONLY | PG_BINARY, FIO_LOCAL_HOST);
1044

1045
			if (reader_data->xlogfile < 0)
1046
			{
1047
				elog(WARNING, "Thread [%d]: Could not open WAL segment \"%s\": %s",
1048
					 reader_data->thread_num, reader_data->xlogpath,
1049
					 strerror(errno));
1050
				return -1;
1051
			}
1052
		}
1053
#ifdef HAVE_LIBZ
1054
		/* Try to open compressed WAL segment */
1055
		else if (fileExists(reader_data->gz_xlogpath, FIO_LOCAL_HOST))
1056
		{
1057
			elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
1058
				 reader_data->thread_num, reader_data->gz_xlogpath);
1059

1060
			reader_data->xlogexists = true;
1061
			reader_data->gz_xlogfile = fio_gzopen(reader_data->gz_xlogpath,
1062
													  "rb", -1, FIO_LOCAL_HOST);
1063
			if (reader_data->gz_xlogfile == NULL)
1064
			{
1065
				elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s",
1066
					 reader_data->thread_num, reader_data->gz_xlogpath,
1067
					 strerror(errno));
1068
				return -1;
1069
			}
1070
		}
1071
#endif
1072
		/* Exit without error if WAL segment doesn't exist */
1073
		if (!reader_data->xlogexists)
1074
			return -1;
1075
	}
1076

1077
	/*
1078
	 * At this point, we have the right segment open.
1079
	 */
1080
	Assert(reader_data->xlogexists);
1081

1082
	/*
1083
	 * Do not read same page read earlier from the file, read it from the buffer
1084
	 */
1085
	if (reader_data->prev_page_off != 0 &&
1086
		reader_data->prev_page_off == targetPageOff)
1087
	{
1088
		memcpy(readBuf, reader_data->page_buf, XLOG_BLCKSZ);
1089
#if PG_VERSION_NUM < 130000
1090
		*pageTLI = reader_data->tli;
1091
#endif
1092
		return XLOG_BLCKSZ;
1093
	}
1094

1095
	/* Read the requested page */
1096
	if (reader_data->xlogfile != -1)
1097
	{
1098
		if (fio_seek(reader_data->xlogfile, (off_t) targetPageOff) < 0)
1099
		{
1100
			elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s",
1101
				 reader_data->thread_num, reader_data->xlogpath, strerror(errno));
1102
			return -1;
1103
		}
1104

1105
		if (fio_read(reader_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
1106
		{
1107
			elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s",
1108
				 reader_data->thread_num, reader_data->xlogpath, strerror(errno));
1109
			return -1;
1110
		}
1111
	}
1112
#ifdef HAVE_LIBZ
1113
	else
1114
	{
1115
		if (fio_gzseek(reader_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1)
1116
		{
1117
			elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s",
1118
				reader_data->thread_num, reader_data->gz_xlogpath,
1119
				get_gz_error(reader_data->gz_xlogfile));
1120
			return -1;
1121
		}
1122

1123
		if (fio_gzread(reader_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
1124
		{
1125
			elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s",
1126
				reader_data->thread_num, reader_data->gz_xlogpath,
1127
				get_gz_error(reader_data->gz_xlogfile));
1128
			return -1;
1129
		}
1130
	}
1131
#endif
1132

1133
	memcpy(reader_data->page_buf, readBuf, XLOG_BLCKSZ);
1134
	reader_data->prev_page_off = targetPageOff;
1135
#if PG_VERSION_NUM < 130000
1136
	*pageTLI = reader_data->tli;
1137
#endif
1138
	return XLOG_BLCKSZ;
1139
}
1140

1141
/*
1142
 * Initialize WAL segments reading.
1143
 */
1144
static XLogReaderState *
1145
InitXLogPageRead(XLogReaderData *reader_data, const char *archivedir,
1146
				 TimeLineID tli, uint32 segment_size, bool manual_switch,
1147
				 bool consistent_read, bool allocate_reader)
1148
{
1149
	XLogReaderState *xlogreader = NULL;
1150

1151
	wal_archivedir = archivedir;
1152
	wal_seg_size = segment_size;
1153
	wal_manual_switch = manual_switch;
1154
	wal_consistent_read = consistent_read;
1155

1156
	MemSet(reader_data, 0, sizeof(XLogReaderData));
1157
	reader_data->tli = tli;
1158
	reader_data->xlogfile = -1;
1159

1160
	if (allocate_reader)
1161
	{
1162
		xlogreader = WalReaderAllocate(wal_seg_size, reader_data);
1163
		if (xlogreader == NULL)
1164
			elog(ERROR, "Out of memory");
1165
		xlogreader->system_identifier = instance_config.system_identifier;
1166
	}
1167

1168
	return xlogreader;
1169
}
1170

1171
/*
1172
 * Comparison function to sort xlog_thread_arg array.
1173
 */
1174
static int
1175
xlog_thread_arg_comp(const void *a1, const void *a2)
1176
{
1177
	const xlog_thread_arg *arg1 = a1;
1178
	const xlog_thread_arg *arg2 = a2;
1179

1180
	return arg1->reader_data.xlogsegno - arg2->reader_data.xlogsegno;
1181
}
1182

1183
/*
1184
 * Run WAL processing routines using threads. Start from startpoint up to
1185
 * endpoint. It is possible to send zero endpoint, threads will read WAL
1186
 * infinitely in this case.
1187
 */
1188
static bool
1189
RunXLogThreads(const char *archivedir, time_t target_time,
1190
			   TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli,
1191
			   uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint,
1192
			   bool consistent_read, xlog_record_function process_record,
1193
			   XLogRecTarget *last_rec, bool inclusive_endpoint)
1194
{
1195
	pthread_t  *threads;
1196
	xlog_thread_arg *thread_args;
1197
	int			i;
1198
	int			threads_need = 0;
1199
	XLogSegNo	endSegNo = 0;
1200
	bool		result = true;
1201

1202
	if (!XRecOffIsValid(startpoint) && !XRecOffIsNull(startpoint))
1203
		elog(ERROR, "Invalid startpoint value %X/%X",
1204
			 (uint32) (startpoint >> 32), (uint32) (startpoint));
1205

1206
	if (process_record)
1207
		elog(LOG, "Extracting pagemap from tli %i on range from %X/%X to %X/%X",
1208
				tli,
1209
				(uint32) (startpoint >> 32), (uint32) (startpoint),
1210
				(uint32) (endpoint >> 32), (uint32) (endpoint));
1211

1212
	if (!XLogRecPtrIsInvalid(endpoint))
1213
	{
1214
//		if (XRecOffIsNull(endpoint) && !inclusive_endpoint)
1215
		if (XRecOffIsNull(endpoint))
1216
		{
1217
			GetXLogSegNo(endpoint, endSegNo, segment_size);
1218
			endSegNo--;
1219
		}
1220
		else if (!XRecOffIsValid(endpoint))
1221
		{
1222
			elog(ERROR, "Invalid endpoint value %X/%X",
1223
				(uint32) (endpoint >> 32), (uint32) (endpoint));
1224
		}
1225
		else
1226
			GetXLogSegNo(endpoint, endSegNo, segment_size);
1227
	}
1228

1229
	/* Initialize static variables for workers */
1230
	wal_target_time = target_time;
1231
	wal_target_xid = target_xid;
1232
	wal_target_lsn = target_lsn;
1233

1234
	GetXLogSegNo(startpoint, segno_start, segment_size);
1235
	segno_target = 0;
1236
	GetXLogSegNo(startpoint, segno_next, segment_size);
1237
	segnum_read = 0;
1238
	segnum_corrupted = 0;
1239

1240
	threads = (pthread_t *) pgut_malloc(sizeof(pthread_t) * num_threads);
1241
	thread_args = (xlog_thread_arg *) pgut_malloc(sizeof(xlog_thread_arg) * num_threads);
1242

1243
	/*
1244
	 * Initialize thread args.
1245
	 *
1246
	 * Each thread works with its own WAL segment and we need to adjust
1247
	 * startpoint value for each thread.
1248
	 */
1249
	for (i = 0; i < num_threads; i++)
1250
	{
1251
		xlog_thread_arg *arg = &thread_args[i];
1252

1253
		InitXLogPageRead(&arg->reader_data, archivedir, tli, segment_size, true,
1254
						 consistent_read, false);
1255
		arg->reader_data.xlogsegno = segno_next;
1256
		arg->reader_data.thread_num = i + 1;
1257
		arg->process_record = process_record;
1258
		arg->startpoint = startpoint;
1259
		arg->endpoint = endpoint;
1260
		arg->endSegNo = endSegNo;
1261
		arg->inclusive_endpoint = inclusive_endpoint;
1262
		arg->got_target = false;
1263
		/* By default there is some error */
1264
		arg->ret = 1;
1265

1266
		threads_need++;
1267
		segno_next++;
1268
		/*
1269
		 * If we need to read less WAL segments than num_threads, create less
1270
		 * threads.
1271
		 */
1272
		if (endSegNo != 0 && segno_next > endSegNo)
1273
			break;
1274
		GetXLogRecPtr(segno_next, 0, segment_size, startpoint);
1275
	}
1276

1277
	/* Run threads */
1278
	thread_interrupted = false;
1279
	for (i = 0; i < threads_need; i++)
1280
	{
1281
		elog(VERBOSE, "Start WAL reader thread: %d", i + 1);
1282
		pthread_create(&threads[i], NULL, XLogThreadWorker, &thread_args[i]);
1283
	}
1284

1285
	/* Wait for threads */
1286
	for (i = 0; i < threads_need; i++)
1287
	{
1288
		pthread_join(threads[i], NULL);
1289
		if (thread_args[i].ret == 1)
1290
			result = false;
1291
	}
1292
	thread_interrupted = false;
1293

1294
//  TODO: we must detect difference between actual error (failed to read WAL) and interrupt signal
1295
//	if (interrupted)
1296
//		elog(ERROR, "Interrupted during WAL parsing");
1297

1298
	/* Release threads here, use thread_args only below */
1299
	pfree(threads);
1300
	threads = NULL;
1301

1302
	if (last_rec)
1303
	{
1304
		/*
1305
		 * We need to sort xlog_thread_arg array by xlogsegno to return latest
1306
		 * possible record up to which restore is possible. We need to sort to
1307
		 * detect failed thread between start segment and target segment.
1308
		 *
1309
		 * Loop stops on first failed thread.
1310
		 */
1311
		if (threads_need > 1)
1312
			qsort((void *) thread_args, threads_need, sizeof(xlog_thread_arg),
1313
				  xlog_thread_arg_comp);
1314

1315
		for (i = 0; i < threads_need; i++)
1316
		{
1317
			XLogRecTarget *cur_rec;
1318

1319
			cur_rec = &thread_args[i].reader_data.cur_rec;
1320
			/*
1321
			 * If we got the target return minimum possible record.
1322
			 */
1323
			if (segno_target > 0)
1324
			{
1325
				if (thread_args[i].got_target &&
1326
					thread_args[i].reader_data.xlogsegno == segno_target)
1327
				{
1328
					*last_rec = *cur_rec;
1329
					break;
1330
				}
1331
			}
1332
			/*
1333
			 * Else return maximum possible record up to which restore is
1334
			 * possible.
1335
			 */
1336
			else if (last_rec->rec_lsn < cur_rec->rec_lsn)
1337
				*last_rec = *cur_rec;
1338

1339
			/*
1340
			 * We reached failed thread, so stop here. We cannot use following
1341
			 * WAL records after failed segment.
1342
			 */
1343
			if (thread_args[i].ret != 0)
1344
				break;
1345
		}
1346
	}
1347

1348
	pfree(thread_args);
1349

1350
	return result;
1351
}
1352

1353
/*
1354
 * WAL reader worker.
1355
 */
1356
void *
1357
XLogThreadWorker(void *arg)
1358
{
1359
	xlog_thread_arg *thread_arg = (xlog_thread_arg *) arg;
1360
	XLogReaderData *reader_data = &thread_arg->reader_data;
1361
	XLogReaderState *xlogreader;
1362
	XLogSegNo	nextSegNo = 0;
1363
	XLogRecPtr	found;
1364
	uint32		prev_page_off = 0;
1365
	bool		need_read = true;
1366

1367
	xlogreader = WalReaderAllocate(wal_seg_size, reader_data);
1368

1369
	if (xlogreader == NULL)
1370
		elog(ERROR, "Thread [%d]: out of memory", reader_data->thread_num);
1371
	xlogreader->system_identifier = instance_config.system_identifier;
1372

1373
#if PG_VERSION_NUM >= 130000
1374
	if (XLogRecPtrIsInvalid(thread_arg->startpoint))
1375
		thread_arg->startpoint = SizeOfXLogShortPHD;
1376
	XLogBeginRead(xlogreader, thread_arg->startpoint);
1377
#endif
1378

1379
	found = XLogFindNextRecord(xlogreader, thread_arg->startpoint);
1380

1381
	/*
1382
	 * We get invalid WAL record pointer usually when WAL segment is absent or
1383
	 * is corrupted.
1384
	 */
1385
	if (XLogRecPtrIsInvalid(found))
1386
	{
1387
		if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1388
			need_read = false;
1389
		else
1390
		{
1391
			if (xlogreader->errormsg_buf[0] != '\0')
1392
				elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X: %s",
1393
					reader_data->thread_num,
1394
					(uint32) (thread_arg->startpoint >> 32),
1395
					(uint32) (thread_arg->startpoint),
1396
					xlogreader->errormsg_buf);
1397
			else
1398
				elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1399
					reader_data->thread_num,
1400
					(uint32) (thread_arg->startpoint >> 32),
1401
					(uint32) (thread_arg->startpoint));
1402
			PrintXLogCorruptionMsg(reader_data, ERROR);
1403
		}
1404
	}
1405

1406
	thread_arg->startpoint = found;
1407

1408
	elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X",
1409
		 reader_data->thread_num,
1410
		 (uint32) (thread_arg->startpoint >> 32),
1411
		 (uint32) (thread_arg->startpoint));
1412

1413
	while (need_read)
1414
	{
1415
		XLogRecord *record;
1416
		char	   *errormsg;
1417
		bool		stop_reading = false;
1418

1419
		if (interrupted || thread_interrupted)
1420
			elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
1421
				 reader_data->thread_num);
1422

1423
		/*
1424
		 * We need to switch to the next WAL segment after reading previous
1425
		 * record. It may happen if we read contrecord.
1426
		 */
1427
		if (reader_data->need_switch &&
1428
			!SwitchThreadToNextWal(xlogreader, thread_arg))
1429
			break;
1430

1431
		record = WalReadRecord(xlogreader, thread_arg->startpoint, &errormsg);
1432

1433
		if (record == NULL)
1434
		{
1435
			XLogRecPtr	errptr;
1436

1437
			/*
1438
			 * There is no record, try to switch to the next WAL segment.
1439
			 * Usually SimpleXLogPageRead() does it by itself. But here we need
1440
			 * to do it manually to support threads.
1441
			 */
1442
#if PG_VERSION_NUM >= 150000
1443
			if (reader_data->need_switch && (
1444
					errormsg == NULL ||
1445
					/*
1446
					 * Pg15 now informs if "contrecord" is missing.
1447
					 * TODO: probably we should abort reading logs at this moment.
1448
					 * But we continue as we did with bug present in Pg < 15.
1449
					 */
1450
					!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr)))
1451
#else
1452
			if (reader_data->need_switch && errormsg == NULL)
1453
#endif
1454
			{
1455
				if (SwitchThreadToNextWal(xlogreader, thread_arg))
1456
					continue;
1457
				else
1458
					break;
1459
			}
1460

1461
			/*
1462
			 * XLogWaitForConsistency() is normally used only with threads.
1463
			 * Call it here for just in case.
1464
			 */
1465
			if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1466
				break;
1467
			else if (wal_consistent_read)
1468
			{
1469
				XLogSegNo	segno_report;
1470

1471
				pthread_lock(&wal_segment_mutex);
1472
				segno_report = segno_start + segnum_read;
1473
				pthread_mutex_unlock(&wal_segment_mutex);
1474

1475
				/*
1476
				 * Report error message if this is the first corrupted WAL.
1477
				*/
1478
				if (reader_data->xlogsegno > segno_report)
1479
					return NULL;	/* otherwise just stop the thread */
1480
			}
1481

1482
			errptr = thread_arg->startpoint ?
1483
				thread_arg->startpoint : xlogreader->EndRecPtr;
1484

1485
			if (errormsg)
1486
				elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X: %s",
1487
					 reader_data->thread_num,
1488
					 (uint32) (errptr >> 32), (uint32) (errptr),
1489
					 errormsg);
1490
			else
1491
				elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1492
					 reader_data->thread_num,
1493
					 (uint32) (errptr >> 32), (uint32) (errptr));
1494

1495
			/* In we failed to read record located at endpoint position,
1496
			 * and endpoint is not inclusive, do not consider this as an error.
1497
			 */
1498
			if (!thread_arg->inclusive_endpoint &&
1499
				errptr == thread_arg->endpoint)
1500
			{
1501
				elog(LOG, "Thread [%d]: Endpoint %X/%X is not inclusive, switch to the next timeline",
1502
					reader_data->thread_num,
1503
					(uint32) (thread_arg->endpoint >> 32), (uint32) (thread_arg->endpoint));
1504
				break;
1505
			}
1506

1507
			/*
1508
			 * If we don't have all WAL files from prev backup start_lsn to current
1509
			 * start_lsn, we won't be able to build page map and PAGE backup will
1510
			 * be incorrect. Stop it and throw an error.
1511
			 */
1512
			PrintXLogCorruptionMsg(reader_data, ERROR);
1513
		}
1514

1515
		getRecordTimestamp(xlogreader, &reader_data->cur_rec.rec_time);
1516
		if (TransactionIdIsValid(XLogRecGetXid(xlogreader)))
1517
			reader_data->cur_rec.rec_xid = XLogRecGetXid(xlogreader);
1518
		reader_data->cur_rec.rec_lsn = xlogreader->ReadRecPtr;
1519

1520
		if (thread_arg->process_record)
1521
			thread_arg->process_record(xlogreader, reader_data, &stop_reading);
1522
		if (stop_reading)
1523
		{
1524
			thread_arg->got_target = true;
1525

1526
			pthread_lock(&wal_segment_mutex);
1527
			/* We should store least target segment number */
1528
			if (segno_target == 0 || segno_target > reader_data->xlogsegno)
1529
				segno_target = reader_data->xlogsegno;
1530
			pthread_mutex_unlock(&wal_segment_mutex);
1531

1532
			break;
1533
		}
1534

1535
		/*
1536
		 * Check if other thread got the target segment. Check it not very
1537
		 * often, only every WAL page.
1538
		 */
1539
		if (wal_consistent_read && prev_page_off != 0 &&
1540
			prev_page_off != reader_data->prev_page_off)
1541
		{
1542
			XLogSegNo	segno;
1543

1544
			pthread_lock(&wal_segment_mutex);
1545
			segno = segno_target;
1546
			pthread_mutex_unlock(&wal_segment_mutex);
1547

1548
			if (segno != 0 && segno < reader_data->xlogsegno)
1549
				break;
1550
		}
1551
		prev_page_off = reader_data->prev_page_off;
1552

1553
		/* continue reading at next record */
1554
		thread_arg->startpoint = InvalidXLogRecPtr;
1555

1556
		GetXLogSegNo(xlogreader->EndRecPtr, nextSegNo, wal_seg_size);
1557

1558
		if (thread_arg->endSegNo != 0 &&
1559
			!XLogRecPtrIsInvalid(thread_arg->endpoint) &&
1560
			/*
1561
			 * Consider thread_arg->endSegNo and thread_arg->endpoint only if
1562
			 * they are valid.
1563
			 */
1564
			xlogreader->ReadRecPtr >= thread_arg->endpoint &&
1565
			nextSegNo >= thread_arg->endSegNo)
1566
			break;
1567
	}
1568

1569
	CleanupXLogPageRead(xlogreader);
1570
	XLogReaderFree(xlogreader);
1571

1572
	/* Extracting is successful */
1573
	thread_arg->ret = 0;
1574
	return NULL;
1575
}
1576

1577
/*
1578
 * Do manual switch to the next WAL segment.
1579
 *
1580
 * Returns false if the reader reaches the end of a WAL segment list.
1581
 */
1582
static bool
1583
SwitchThreadToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg)
1584
{
1585
	XLogReaderData *reader_data;
1586
	XLogRecPtr	found;
1587

1588
	reader_data = (XLogReaderData *) xlogreader->private_data;
1589
	reader_data->need_switch = false;
1590

1591
	/* Critical section */
1592
	pthread_lock(&wal_segment_mutex);
1593
	Assert(segno_next);
1594
	reader_data->xlogsegno = segno_next;
1595
	segnum_read++;
1596
	segno_next++;
1597
	pthread_mutex_unlock(&wal_segment_mutex);
1598

1599
	/* We've reached the end */
1600
	if (arg->endSegNo != 0 && reader_data->xlogsegno > arg->endSegNo)
1601
		return false;
1602

1603
	/* Adjust next record position */
1604
	GetXLogRecPtr(reader_data->xlogsegno, 0, wal_seg_size, arg->startpoint);
1605
	/* We need to close previously opened file if it wasn't closed earlier */
1606
	CleanupXLogPageRead(xlogreader);
1607
	/* Skip over the page header and contrecord if any */
1608
	found = XLogFindNextRecord(xlogreader, arg->startpoint);
1609

1610
	/*
1611
	 * We get invalid WAL record pointer usually when WAL segment is
1612
	 * absent or is corrupted.
1613
	 */
1614
	if (XLogRecPtrIsInvalid(found))
1615
	{
1616
		/*
1617
		 * Check if we need to stop reading. We stop if other thread found a
1618
		 * target segment.
1619
		 */
1620
		if (wal_consistent_read && XLogWaitForConsistency(xlogreader))
1621
			return false;
1622
		else if (wal_consistent_read)
1623
		{
1624
			XLogSegNo	segno_report;
1625

1626
			pthread_lock(&wal_segment_mutex);
1627
			segno_report = segno_start + segnum_read;
1628
			pthread_mutex_unlock(&wal_segment_mutex);
1629

1630
			/*
1631
			 * Report error message if this is the first corrupted WAL.
1632
			 */
1633
			if (reader_data->xlogsegno > segno_report)
1634
				return false;	/* otherwise just stop the thread */
1635
		}
1636

1637
		elog(WARNING, "Thread [%d]: Could not read WAL record at %X/%X",
1638
			 reader_data->thread_num,
1639
			 (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
1640
		PrintXLogCorruptionMsg(reader_data, ERROR);
1641
	}
1642
	arg->startpoint = found;
1643

1644
	elog(VERBOSE, "Thread [%d]: Switched to LSN %X/%X",
1645
		 reader_data->thread_num,
1646
		 (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
1647

1648
	return true;
1649
}
1650

1651
/*
1652
 * Wait for other threads since the current thread couldn't read its segment.
1653
 * We need to decide is it fail or not.
1654
 *
1655
 * Returns true if there is no failure and previous target segment was found.
1656
 * Otherwise return false.
1657
 */
1658
static bool
1659
XLogWaitForConsistency(XLogReaderState *xlogreader)
1660
{
1661
	uint32		segnum_need;
1662
	XLogReaderData *reader_data =(XLogReaderData *) xlogreader->private_data;
1663
	bool		log_message = true;
1664

1665
	segnum_need = reader_data->xlogsegno - segno_start;
1666
	while (true)
1667
	{
1668
		uint32		segnum_current_read;
1669
		XLogSegNo	segno;
1670

1671
		if (log_message)
1672
		{
1673
			char		xlogfname[MAXFNAMELEN];
1674

1675
			GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno,
1676
							wal_seg_size);
1677

1678
			elog(VERBOSE, "Thread [%d]: Possible WAL corruption in %s. Wait for other threads to decide is this a failure",
1679
				 reader_data->thread_num, xlogfname);
1680
			log_message = false;
1681
		}
1682

1683
		if (interrupted || thread_interrupted)
1684
			elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
1685
				 reader_data->thread_num);
1686

1687
		pthread_lock(&wal_segment_mutex);
1688
		segnum_current_read = segnum_read + segnum_corrupted;
1689
		segno = segno_target;
1690
		pthread_mutex_unlock(&wal_segment_mutex);
1691

1692
		/* Other threads read all previous segments and didn't find target */
1693
		if (segnum_need <= segnum_current_read)
1694
		{
1695
			/* Mark current segment as corrupted */
1696
			pthread_lock(&wal_segment_mutex);
1697
			segnum_corrupted++;
1698
			pthread_mutex_unlock(&wal_segment_mutex);
1699
			return false;
1700
		}
1701

1702
		if (segno != 0 && segno < reader_data->xlogsegno)
1703
			return true;
1704

1705
		pg_usleep(500000L);	/* 500 ms */
1706
	}
1707

1708
	/* We shouldn't reach it */
1709
	return false;
1710
}
1711

1712
/*
1713
 * Cleanup after WAL segment reading.
1714
 */
1715
static void
1716
CleanupXLogPageRead(XLogReaderState *xlogreader)
1717
{
1718
	XLogReaderData *reader_data;
1719

1720
	reader_data = (XLogReaderData *) xlogreader->private_data;
1721
	if (reader_data->xlogfile >= 0)
1722
	{
1723
		fio_close(reader_data->xlogfile);
1724
		reader_data->xlogfile = -1;
1725
	}
1726
#ifdef HAVE_LIBZ
1727
	else if (reader_data->gz_xlogfile != NULL)
1728
	{
1729
		fio_gzclose(reader_data->gz_xlogfile);
1730
		reader_data->gz_xlogfile = NULL;
1731
	}
1732
#endif
1733
	reader_data->prev_page_off = 0;
1734
	reader_data->xlogexists = false;
1735
}
1736

1737
static void
1738
PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel)
1739
{
1740
	if (reader_data->xlogpath[0] != 0)
1741
	{
1742
		/*
1743
		 * XLOG reader couldn't read WAL segment.
1744
		 * We throw a WARNING here to be able to update backup status.
1745
		 */
1746
		if (!reader_data->xlogexists)
1747
			elog(elevel, "Thread [%d]: WAL segment \"%s\" is absent",
1748
				 reader_data->thread_num, reader_data->xlogpath);
1749
		else if (reader_data->xlogfile != -1)
1750
			elog(elevel, "Thread [%d]: Possible WAL corruption. "
1751
						 "Error has occured during reading WAL segment \"%s\"",
1752
				 reader_data->thread_num, reader_data->xlogpath);
1753
#ifdef HAVE_LIBZ
1754
		else if (reader_data->gz_xlogfile != NULL)
1755
			elog(elevel, "Thread [%d]: Possible WAL corruption. "
1756
						 "Error has occured during reading WAL segment \"%s\"",
1757
				 reader_data->thread_num, reader_data->gz_xlogpath);
1758
#endif
1759
	}
1760
	else
1761
	{
1762
		/* Cannot tell what happened specifically */
1763
		elog(elevel, "Thread [%d]: An error occured during WAL reading",
1764
			 reader_data->thread_num);
1765
	}
1766
}
1767

1768
/*
1769
 * Extract information about blocks modified in this record.
1770
 */
1771
static void
1772
extractPageInfo(XLogReaderState *record, XLogReaderData *reader_data,
1773
				bool *stop_reading)
1774
{
1775
	uint8		block_id;
1776
	RmgrId		rmid = XLogRecGetRmid(record);
1777
	uint8		info = XLogRecGetInfo(record);
1778
	uint8		rminfo = info & ~XLR_INFO_MASK;
1779

1780
	/* Is this a special record type that I recognize? */
1781

1782
	if (rmid == RM_DBASE_ID
1783
#if PG_VERSION_NUM >= 150000
1784
		&& (rminfo == XLOG_DBASE_CREATE_WAL_LOG || rminfo == XLOG_DBASE_CREATE_FILE_COPY))
1785
#else
1786
		&& rminfo == XLOG_DBASE_CREATE)
1787
#endif
1788
	{
1789
		/*
1790
		 * New databases can be safely ignored. They would be completely
1791
		 * copied if found.
1792
		 */
1793
	}
1794
	else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
1795
	{
1796
		/*
1797
		 * An existing database was dropped. It is fine to ignore that
1798
		 * they will be removed appropriately.
1799
		 */
1800
	}
1801
	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_CREATE)
1802
	{
1803
		/*
1804
		 * We can safely ignore these. The file will be removed when
1805
		 * combining the backups in the case of differential on.
1806
		 */
1807
	}
1808
	else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_TRUNCATE)
1809
	{
1810
		/*
1811
		 * We can safely ignore these. When we compare the sizes later on,
1812
		 * we'll notice that they differ, and copy the missing tail from
1813
		 * source system.
1814
		 */
1815
	}
1816
	else if (rmid == RM_XACT_ID &&
1817
			 ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT ||
1818
			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED ||
1819
			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_ABORT ||
1820
			  (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_ABORT_PREPARED))
1821
	{
1822
		/*
1823
		 * These records can include "dropped rels". We can safely ignore
1824
		 * them, we will see that they are missing and copy them from the
1825
		 * source.
1826
		 */
1827
	}
1828
	else if (info & XLR_SPECIAL_REL_UPDATE)
1829
	{
1830
		/*
1831
		 * This record type modifies a relation file in some special way, but
1832
		 * we don't recognize the type. That's bad - we don't know how to
1833
		 * track that change.
1834
		 */
1835
		elog(ERROR, "WAL record modifies a relation, but record type is not recognized\n"
1836
			 "lsn: %X/%X, rmgr: %s, info: %02X",
1837
		  (uint32) (record->ReadRecPtr >> 32), (uint32) (record->ReadRecPtr),
1838
				 RmgrNames[rmid], info);
1839
	}
1840

1841
#if PG_VERSION_NUM >= 150000
1842
	for (block_id = 0; block_id <= record->record->max_block_id; block_id++)
1843
#else
1844
	for (block_id = 0; block_id <= record->max_block_id; block_id++)
1845
#endif
1846
	{
1847
		RelFileNode rnode;
1848
		ForkNumber	forknum;
1849
		BlockNumber blkno;
1850

1851
#if PG_VERSION_NUM >= 150000
1852
		if (!XLogRecGetBlockTagExtended(record, block_id, &rnode, &forknum, &blkno, NULL))
1853
#else
1854
		if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
1855
#endif
1856
			continue;
1857

1858
		/* We only care about the main fork; others are copied as is */
1859
		if (forknum != MAIN_FORKNUM)
1860
			continue;
1861

1862
		process_block_change(forknum, rnode, blkno);
1863
	}
1864
}
1865

1866
/*
1867
 * Check the current read WAL record during validation.
1868
 */
1869
static void
1870
validateXLogRecord(XLogReaderState *record, XLogReaderData *reader_data,
1871
				   bool *stop_reading)
1872
{
1873
	/* Check target xid */
1874
	if (TransactionIdIsValid(wal_target_xid) &&
1875
		wal_target_xid == reader_data->cur_rec.rec_xid)
1876
		*stop_reading = true;
1877
	/* Check target time */
1878
	else if (wal_target_time != 0 &&
1879
			 timestamptz_to_time_t(reader_data->cur_rec.rec_time) >= wal_target_time)
1880
		*stop_reading = true;
1881
	/* Check target lsn */
1882
	else if (XRecOffIsValid(wal_target_lsn) &&
1883
			 reader_data->cur_rec.rec_lsn >= wal_target_lsn)
1884
		*stop_reading = true;
1885
}
1886

1887
/*
1888
 * Extract timestamp from WAL record.
1889
 *
1890
 * If the record contains a timestamp, returns true, and saves the timestamp
1891
 * in *recordXtime. If the record type has no timestamp, returns false.
1892
 * Currently, only transaction commit/abort records and restore points contain
1893
 * timestamps.
1894
 */
1895
static bool
1896
getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
1897
{
1898
	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1899
	uint8		xact_info = info & XLOG_XACT_OPMASK;
1900
	uint8		rmid = XLogRecGetRmid(record);
1901

1902
	if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
1903
	{
1904
		*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
1905
		return true;
1906
	}
1907
	else if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_COMMIT ||
1908
							   xact_info == XLOG_XACT_COMMIT_PREPARED))
1909
	{
1910
		*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
1911
		return true;
1912
	}
1913
	else if (rmid == RM_XACT_ID && (xact_info == XLOG_XACT_ABORT ||
1914
							   xact_info == XLOG_XACT_ABORT_PREPARED))
1915
	{
1916
		*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
1917
		return true;
1918
	}
1919

1920
	return false;
1921
}
1922

1923
bool validate_wal_segment(TimeLineID tli, XLogSegNo segno, const char *prefetch_dir, uint32 wal_seg_size)
1924
{
1925
	XLogRecPtr startpoint;
1926
	XLogRecPtr endpoint;
1927

1928
	bool rc;
1929
	int tmp_num_threads = num_threads;
1930
	num_threads = 1;
1931

1932
	/* calculate startpoint and endpoint */
1933
	GetXLogRecPtr(segno, 0, wal_seg_size, startpoint);
1934
	GetXLogRecPtr(segno+1, 0, wal_seg_size, endpoint);
1935

1936
	/* disable multi-threading */
1937
	num_threads = 1;
1938

1939
	rc = RunXLogThreads(prefetch_dir, 0, InvalidTransactionId,
1940
						InvalidXLogRecPtr, tli, wal_seg_size,
1941
						startpoint, endpoint, false, NULL, NULL, true);
1942

1943
	num_threads = tmp_num_threads;
1944

1945
	return rc;
1946
}
1947

1948
static XLogRecord* WalReadRecord(XLogReaderState *xlogreader, XLogRecPtr startpoint, char **errormsg)
1949
{
1950

1951
#if PG_VERSION_NUM >= 130000
1952
	return XLogReadRecord(xlogreader, errormsg);
1953
#else
1954
	return XLogReadRecord(xlogreader, startpoint, errormsg);
1955
#endif
1956

1957
}
1958

1959
static XLogReaderState* WalReaderAllocate(uint32 wal_seg_size, XLogReaderData *reader_data)
1960
{
1961

1962
#if PG_VERSION_NUM >= 130000
1963
	return XLogReaderAllocate(wal_seg_size, NULL,
1964
								XL_ROUTINE(.page_read = &SimpleXLogPageRead),
1965
								reader_data);
1966
#elif PG_VERSION_NUM >= 110000
1967
	return XLogReaderAllocate(wal_seg_size, &SimpleXLogPageRead,
1968
								reader_data);
1969
#else
1970
	return XLogReaderAllocate(&SimpleXLogPageRead, reader_data);
1971
#endif
1972
}
1973

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.