pg_probackup

Форк
0
/
stream.c 
779 строк · 22.8 Кб
1
/*-------------------------------------------------------------------------
2
 *
3
 * stream.c: pg_probackup specific code for WAL streaming
4
 *
5
 * Portions Copyright (c) 2015-2020, Postgres Professional
6
 *
7
 *-------------------------------------------------------------------------
8
 */
9

10
#include "pg_probackup.h"
11
#include "receivelog.h"
12
#include "streamutil.h"
13
#include "access/timeline.h"
14

15
#include <time.h>
16
#include <unistd.h>
17

18
/*
19
 * global variable needed by ReceiveXlogStream()
20
 *
21
 * standby_message_timeout controls how often we send a message
22
 * back to the primary letting it know our progress, in milliseconds.
23
 *
24
 * in pg_probackup we use a default setting = 10 sec
25
 */
26
static int	standby_message_timeout = 10 * 1000;
27

28
/* stop_backup_lsn is set by pg_stop_backup() to stop streaming */
29
XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
30
static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr;
31

32
/*
33
 * How long we should wait for streaming end in seconds.
34
 * Retrieved as checkpoint_timeout + checkpoint_timeout * 0.1
35
 */
36
static uint32 stream_stop_timeout = 0;
37
/* Time in which we started to wait for streaming end */
38
static time_t stream_stop_begin = 0;
39

40
/*
41
 * We need to wait end of WAL streaming before execute pg_stop_backup().
42
 */
43
typedef struct
44
{
45
	char       basedir[MAXPGPATH];
46
	PGconn	   *conn;
47

48
	/*
49
	 * Return value from the thread.
50
	 * 0 means there is no error, 1 - there is an error.
51
	 */
52
	int			ret;
53

54
	XLogRecPtr	startpos;
55
	TimeLineID	starttli;
56
} StreamThreadArg;
57

58
static pthread_t stream_thread;
59
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
60

61
static parray *xlog_files_list = NULL;
62
static bool do_crc = true;
63

64
static void IdentifySystem(StreamThreadArg *stream_thread_arg);
65
static int checkpoint_timeout(PGconn *backup_conn);
66
static void *StreamLog(void *arg);
67
static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
68
                           bool segment_finished);
69
static void add_walsegment_to_filelist(parray *filelist, uint32 timeline,
70
                                       XLogRecPtr xlogpos, char *basedir,
71
                                       uint32 xlog_seg_size);
72
static void add_history_file_to_filelist(parray *filelist, uint32 timeline,
73
										 char *basedir);
74

75
/*
76
 * Run IDENTIFY_SYSTEM through a given connection and
77
 * check system identifier and timeline are matching
78
 */
79
static void
80
IdentifySystem(StreamThreadArg *stream_thread_arg)
81
{
82
	PGresult	*res;
83

84
	uint64 stream_conn_sysidentifier = 0;
85
	char *stream_conn_sysidentifier_str;
86
	TimeLineID stream_conn_tli = 0;
87

88
	if (!CheckServerVersionForStreaming(stream_thread_arg->conn))
89
	{
90
		PQfinish(stream_thread_arg->conn);
91
		/*
92
		 * Error message already written in CheckServerVersionForStreaming().
93
		 * There's no hope of recovering from a version mismatch, so don't
94
		 * retry.
95
		 */
96
		elog(ERROR, "Cannot continue backup because stream connect has failed.");
97
	}
98

99
	/*
100
	 * Identify server, obtain server system identifier and timeline
101
	 */
102
	res = pgut_execute(stream_thread_arg->conn, "IDENTIFY_SYSTEM", 0, NULL);
103

104
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
105
	{
106
		elog(WARNING,"Could not send replication command \"%s\": %s",
107
						"IDENTIFY_SYSTEM", PQerrorMessage(stream_thread_arg->conn));
108
		PQfinish(stream_thread_arg->conn);
109
		elog(ERROR, "Cannot continue backup because stream connect has failed.");
110
	}
111

112
	stream_conn_sysidentifier_str = PQgetvalue(res, 0, 0);
113
	stream_conn_tli = atoll(PQgetvalue(res, 0, 1));
114

115
	/* Additional sanity, primary for PG 9.5,
116
	 * where system id can be obtained only via "IDENTIFY SYSTEM"
117
	 */
118
	if (!parse_uint64(stream_conn_sysidentifier_str, &stream_conn_sysidentifier, 0))
119
		elog(ERROR, "%s is not system_identifier", stream_conn_sysidentifier_str);
120

121
	if (stream_conn_sysidentifier != instance_config.system_identifier)
122
		elog(ERROR, "System identifier mismatch. Connected PostgreSQL instance has system id: "
123
			"" UINT64_FORMAT ". Expected: " UINT64_FORMAT ".",
124
					stream_conn_sysidentifier, instance_config.system_identifier);
125

126
	if (stream_conn_tli != current.tli)
127
		elog(ERROR, "Timeline identifier mismatch. "
128
			"Connected PostgreSQL instance has timeline id: %X. Expected: %X.",
129
			stream_conn_tli, current.tli);
130

131
	PQclear(res);
132
}
133

134
/*
135
 * Retrieve checkpoint_timeout GUC value in seconds.
136
 */
137
static int
138
checkpoint_timeout(PGconn *backup_conn)
139
{
140
	PGresult   *res;
141
	const char *val;
142
	const char *hintmsg;
143
	int			val_int;
144

145
	res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL);
146
	val = PQgetvalue(res, 0, 0);
147

148
	if (!parse_int(val, &val_int, OPTION_UNIT_S, &hintmsg))
149
	{
150
		PQclear(res);
151
		if (hintmsg)
152
			elog(ERROR, "Invalid value of checkout_timeout %s: %s", val,
153
				 hintmsg);
154
		else
155
			elog(ERROR, "Invalid value of checkout_timeout %s", val);
156
	}
157

158
	PQclear(res);
159

160
	return val_int;
161
}
162

163
/*
164
 * CreateReplicationSlot_compat() -- wrapper for CreateReplicationSlot() used in StreamLog()
165
 * src/bin/pg_basebackup/streamutil.c
166
 * CreateReplicationSlot() has different signatures on different PG versions:
167
 * PG 15
168
 * bool
169
 * CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
170
 *                                           bool is_temporary, bool is_physical, bool reserve_wal,
171
 *                                           bool slot_exists_ok, bool two_phase)
172
 * PG 11-14
173
 * bool
174
 * CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
175
 *                                           bool is_temporary, bool is_physical, bool reserve_wal,
176
 *                                           bool slot_exists_ok)
177
 * PG 9.5-10
178
 * CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
179
 *                                           bool is_physical, bool slot_exists_ok)
180
 * NOTE: PG 9.6 and 10 support reserve_wal in
181
 * pg_catalog.pg_create_physical_replication_slot(slot_name name [, immediately_reserve boolean])
182
 * and
183
 * CREATE_REPLICATION_SLOT slot_name { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin }
184
 * replication protocol command, but CreateReplicationSlot() C function doesn't
185
 */
186
static bool
187
CreateReplicationSlot_compat(PGconn *conn, const char *slot_name, const char *plugin,
188
                                          bool is_temporary, bool is_physical,
189
                                          bool slot_exists_ok)
190
{
191
#if PG_VERSION_NUM >= 150000
192
	return CreateReplicationSlot(conn, slot_name, plugin, is_temporary, is_physical,
193
		/* reserve_wal = */ true, slot_exists_ok, /* two_phase = */ false);
194
#elif PG_VERSION_NUM >= 110000
195
	return CreateReplicationSlot(conn, slot_name, plugin, is_temporary, is_physical,
196
		/* reserve_wal = */ true, slot_exists_ok);
197
#elif PG_VERSION_NUM >= 100000
198
	/*
199
	 * PG-10 doesn't support creating temp_slot by calling CreateReplicationSlot(), but
200
	 * it will be created by setting StreamCtl.temp_slot later in StreamLog()
201
	 */
202
	if (!is_temporary)
203
		return CreateReplicationSlot(conn, slot_name, plugin, /*is_temporary,*/ is_physical, /*reserve_wal,*/ slot_exists_ok);
204
	else
205
		return true;
206
#else
207
	/* these parameters not supported in PG < 10 */
208
	Assert(!is_temporary);
209
	return CreateReplicationSlot(conn, slot_name, plugin, /*is_temporary,*/ is_physical, /*reserve_wal,*/ slot_exists_ok);
210
#endif
211
}
212

213
/*
214
 * Start the log streaming
215
 */
216
static void *
217
StreamLog(void *arg)
218
{
219
	StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
220

221
	/*
222
	 * Always start streaming at the beginning of a segment
223
	 */
224
	stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size;
225

226
	xlog_files_list = parray_new();
227

228
	/* Initialize timeout */
229
	stream_stop_begin = 0;
230

231
	/* Create repslot */
232
#if PG_VERSION_NUM >= 100000
233
	if (temp_slot || perm_slot)
234
		if (!CreateReplicationSlot_compat(stream_arg->conn, replication_slot, NULL, temp_slot, true, false))
235
#else
236
	if (perm_slot)
237
		if (!CreateReplicationSlot_compat(stream_arg->conn, replication_slot, NULL, false, true, false))
238
#endif
239
		{
240
			interrupted = true;
241
			elog(ERROR, "Couldn't create physical replication slot %s", replication_slot);
242
		}
243

244
	/*
245
	 * Start the replication
246
	 */
247
	if (replication_slot)
248
		elog(LOG, "started streaming WAL at %X/%X (timeline %u) using%s slot %s",
249
			(uint32) (stream_arg->startpos >> 32), (uint32) stream_arg->startpos,
250
			stream_arg->starttli,
251
#if PG_VERSION_NUM >= 100000
252
			temp_slot ? " temporary" : "",
253
#else
254
			"",
255
#endif
256
			replication_slot);
257
	else
258
		elog(LOG, "started streaming WAL at %X/%X (timeline %u)",
259
			 (uint32) (stream_arg->startpos >> 32), (uint32) stream_arg->startpos,
260
			  stream_arg->starttli);
261

262
#if PG_VERSION_NUM >= 90600
263
	{
264
		StreamCtl	ctl;
265

266
		MemSet(&ctl, 0, sizeof(ctl));
267

268
		ctl.startpos = stream_arg->startpos;
269
		ctl.timeline = stream_arg->starttli;
270
		ctl.sysidentifier = NULL;
271
		ctl.stream_stop = stop_streaming;
272
		ctl.standby_message_timeout = standby_message_timeout;
273
		ctl.partial_suffix = NULL;
274
		ctl.synchronous = false;
275
		ctl.mark_done = false;
276

277
#if PG_VERSION_NUM >= 100000
278
#if PG_VERSION_NUM >= 150000
279
		ctl.walmethod = CreateWalDirectoryMethod(
280
			stream_arg->basedir,
281
			PG_COMPRESSION_NONE,
282
			0,
283
			false);
284
#else /* PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 150000 */
285
		ctl.walmethod = CreateWalDirectoryMethod(
286
			stream_arg->basedir,
287
//			(instance_config.compress_alg == NONE_COMPRESS) ? 0 : instance_config.compress_level,
288
			0,
289
			false);
290
#endif /* PG_VERSION_NUM >= 150000 */
291
		ctl.replication_slot = replication_slot;
292
		ctl.stop_socket = PGINVALID_SOCKET;
293
		ctl.do_sync = false; /* We sync all files at the end of backup */
294
//		ctl.mark_done        /* for future use in s3 */
295
#if PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 110000
296
		/* StreamCtl.temp_slot used only for PG-10, in PG>10, temp_slots are created by calling CreateReplicationSlot() */
297
		ctl.temp_slot = temp_slot;
298
#endif /* PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 110000 */
299
#else /* PG_VERSION_NUM < 100000 */
300
		ctl.basedir = (char *) stream_arg->basedir;
301
#endif /* PG_VERSION_NUM >= 100000 */
302

303
		if (ReceiveXlogStream(stream_arg->conn, &ctl) == false)
304
		{
305
			interrupted = true;
306
			elog(ERROR, "Problem in receivexlog");
307
		}
308

309
#if PG_VERSION_NUM >= 100000
310
#if PG_VERSION_NUM >= 160000
311
	if (!ctl.walmethod->ops->finish(ctl.walmethod))
312
#else
313
		if (!ctl.walmethod->finish())
314
#endif
315
		{
316
			interrupted = true;
317
			elog(ERROR, "Could not finish writing WAL files: %s",
318
				 strerror(errno));
319
		}
320
#endif /* PG_VERSION_NUM >= 100000 */
321
	}
322
#else /* PG_VERSION_NUM < 90600 */
323
	/* PG-9.5 */
324
	if (ReceiveXlogStream(stream_arg->conn, stream_arg->startpos, stream_arg->starttli,
325
						NULL, (char *) stream_arg->basedir, stop_streaming,
326
						standby_message_timeout, NULL, false, false) == false)
327
	{
328
		interrupted = true;
329
		elog(ERROR, "Problem in receivexlog");
330
	}
331
#endif /* PG_VERSION_NUM >= 90600 */
332

333
	/* be paranoid and sort xlog_files_list,
334
	 * so if stop_lsn segno is already in the list,
335
	 * then list must be sorted to detect duplicates.
336
	 */
337
	parray_qsort(xlog_files_list, pgFileCompareRelPathWithExternal);
338

339
	/* Add the last segment to the list */
340
	add_walsegment_to_filelist(xlog_files_list, stream_arg->starttli,
341
                               stop_stream_lsn, (char *) stream_arg->basedir,
342
                               instance_config.xlog_seg_size);
343

344
	/* append history file to walsegment filelist */
345
	add_history_file_to_filelist(xlog_files_list, stream_arg->starttli, (char *) stream_arg->basedir);
346

347
	/*
348
	 * TODO: remove redundant WAL segments
349
	 * walk pg_wal and remove files with segno greater that of stop_lsn`s segno +1
350
	 */
351

352
	elog(LOG, "finished streaming WAL at %X/%X (timeline %u)",
353
		 (uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, stream_arg->starttli);
354
	stream_arg->ret = 0;
355

356
	PQfinish(stream_arg->conn);
357
	stream_arg->conn = NULL;
358

359
	return NULL;
360
}
361

362
/*
363
 * for ReceiveXlogStream
364
 *
365
 * The stream_stop callback will be called every time data
366
 * is received, and whenever a segment is completed. If it returns
367
 * true, the streaming will stop and the function
368
 * return. As long as it returns false, streaming will continue
369
 * indefinitely.
370
 *
371
 * Stop WAL streaming if current 'xlogpos' exceeds 'stop_backup_lsn', which is
372
 * set by pg_stop_backup().
373
 *
374
 */
375
static bool
376
stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
377
{
378
	static uint32 prevtimeline = 0;
379
	static XLogRecPtr prevpos = InvalidXLogRecPtr;
380

381
	/* check for interrupt */
382
	if (interrupted || thread_interrupted)
383
		elog(ERROR, "Interrupted during WAL streaming");
384

385
	/* we assume that we get called once at the end of each segment */
386
	if (segment_finished)
387
	{
388
		elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
389
		     (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
390

391
		add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos,
392
		                           (char*) stream_thread_arg.basedir,
393
		                           instance_config.xlog_seg_size);
394
	}
395

396
	/*
397
	 * Note that we report the previous, not current, position here. After a
398
	 * timeline switch, xlogpos points to the beginning of the segment because
399
	 * that's where we always begin streaming. Reporting the end of previous
400
	 * timeline isn't totally accurate, because the next timeline can begin
401
	 * slightly before the end of the WAL that we received on the previous
402
	 * timeline, but it's close enough for reporting purposes.
403
	 */
404
	if (prevtimeline != 0 && prevtimeline != timeline)
405
		elog(LOG, _("switched to timeline %u at %X/%X\n"),
406
			 timeline, (uint32) (prevpos >> 32), (uint32) prevpos);
407

408
	if (!XLogRecPtrIsInvalid(stop_backup_lsn))
409
	{
410
		if (xlogpos >= stop_backup_lsn)
411
		{
412
			stop_stream_lsn = xlogpos;
413
			return true;
414
		}
415

416
		/* pg_stop_backup() was executed, wait for the completion of stream */
417
		if (stream_stop_begin == 0)
418
		{
419
			elog(INFO, "Wait for LSN %X/%X to be streamed",
420
				 (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn);
421

422
			stream_stop_begin = time(NULL);
423
		}
424

425
		if (time(NULL) - stream_stop_begin > stream_stop_timeout)
426
			elog(ERROR, "Target LSN %X/%X could not be streamed in %d seconds",
427
				 (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn,
428
				 stream_stop_timeout);
429
	}
430

431
	prevtimeline = timeline;
432
	prevpos = xlogpos;
433

434
	return false;
435
}
436

437

438
/* --- External API --- */
439

440
/*
441
 * Maybe add a StreamOptions struct ?
442
 * Backup conn only needed to calculate stream_stop_timeout. Think about refactoring it.
443
 */
444
parray*
445
get_history_streaming(ConnectionOptions *conn_opt, TimeLineID tli, parray *backup_list)
446
{
447
	PGresult     *res;
448
	PGconn	     *conn;
449
	char         *history;
450
	char          query[128];
451
	parray	     *result = NULL;
452
	parray       *tli_list = NULL;
453
	timelineInfo *tlinfo = NULL;
454
	int           i,j;
455

456
	snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", tli);
457

458
	/*
459
	 * Connect in replication mode to the server.
460
	 */
461
	conn = pgut_connect_replication(conn_opt->pghost,
462
									conn_opt->pgport,
463
									conn_opt->pgdatabase,
464
									conn_opt->pguser,
465
									false);
466

467
	if (!conn)
468
		return NULL;
469

470
	res = PQexec(conn, query);
471
	PQfinish(conn);
472

473
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
474
	{
475
		elog(WARNING, "Could not send replication command \"%s\": %s",
476
					query, PQresultErrorMessage(res));
477
		PQclear(res);
478
		return NULL;
479
	}
480

481
	/*
482
	 * The response to TIMELINE_HISTORY is a single row result set
483
	 * with two fields: filename and content
484
	 */
485

486
	if (PQnfields(res) != 2 || PQntuples(res) != 1)
487
	{
488
		elog(WARNING, "Unexpected response to TIMELINE_HISTORY command: "
489
				"got %d rows and %d fields, expected %d rows and %d fields",
490
				PQntuples(res), PQnfields(res), 1, 2);
491
		PQclear(res);
492
		return NULL;
493
	}
494

495
	history = pgut_strdup(PQgetvalue(res, 0, 1));
496
	result = parse_tli_history_buffer(history, tli);
497

498
	/* some cleanup */
499
	pg_free(history);
500
	PQclear(res);
501

502
	if (result)
503
		tlinfo = timelineInfoNew(tli);
504
	else
505
		return NULL;
506

507
	/* transform TimeLineHistoryEntry into timelineInfo */
508
	for (i = parray_num(result) -1; i >= 0; i--)
509
	{
510
		TimeLineHistoryEntry *tln = (TimeLineHistoryEntry *) parray_get(result, i);
511

512
		tlinfo->parent_tli = tln->tli;
513
		tlinfo->switchpoint = tln->end;
514

515
		if (!tli_list)
516
			tli_list = parray_new();
517

518
		parray_append(tli_list, tlinfo);
519

520
		/* Next tli */
521
		tlinfo = timelineInfoNew(tln->tli);
522

523
		/* oldest tli */
524
		if (i == 0)
525
		{
526
			tlinfo->tli = tln->tli;
527
			tlinfo->parent_tli = 0;
528
			tlinfo->switchpoint = 0;
529
			parray_append(tli_list, tlinfo);
530
		}
531
	}
532

533
	/* link parent to child */
534
	for (i = 0; i < parray_num(tli_list); i++)
535
	{
536
		tlinfo = (timelineInfo *) parray_get(tli_list, i);
537

538
		for (j = 0; j < parray_num(tli_list); j++)
539
		{
540
			timelineInfo *tlinfo_parent = (timelineInfo *) parray_get(tli_list, j);
541

542
			if (tlinfo->parent_tli == tlinfo_parent->tli)
543
			{
544
				tlinfo->parent_link = tlinfo_parent;
545
				break;
546
			}
547
		}
548
	}
549

550
	/* add backups to each timeline info */
551
	for (i = 0; i < parray_num(tli_list); i++)
552
	{
553
		tlinfo = parray_get(tli_list, i);
554
		for (j = 0; j < parray_num(backup_list); j++)
555
		{
556
			pgBackup *backup = parray_get(backup_list, j);
557
			if (tlinfo->tli == backup->tli)
558
			{
559
				if (tlinfo->backups == NULL)
560
					tlinfo->backups = parray_new();
561
				parray_append(tlinfo->backups, backup);
562
			}
563
		}
564
	}
565

566
	/* cleanup */
567
	parray_walk(result, pg_free);
568
	pg_free(result);
569

570
	return tli_list;
571
}
572

573
parray*
574
parse_tli_history_buffer(char *history, TimeLineID tli)
575
{
576
	char   *curLine = history;
577
	TimeLineHistoryEntry *entry;
578
	TimeLineHistoryEntry *last_timeline = NULL;
579
	parray *result = NULL;
580

581
	/* Parse timeline history buffer string by string */
582
	while (curLine)
583
	{
584
		char    tempStr[1024];
585
		char   *nextLine = strchr(curLine, '\n');
586
		int     curLineLen = nextLine ? (nextLine-curLine) : strlen(curLine);
587

588
		memcpy(tempStr, curLine, curLineLen);
589
		tempStr[curLineLen] = '\0';  // NUL-terminate!
590
		curLine = nextLine ? (nextLine+1) : NULL;
591

592
		if (curLineLen > 0)
593
		{
594
			char	   *ptr;
595
			TimeLineID	tli;
596
			uint32		switchpoint_hi;
597
			uint32		switchpoint_lo;
598
			int			nfields;
599

600
			for (ptr = tempStr; *ptr; ptr++)
601
			{
602
				if (!isspace((unsigned char) *ptr))
603
					break;
604
			}
605
			if (*ptr == '\0' || *ptr == '#')
606
				continue;
607

608
			nfields = sscanf(tempStr, "%u\t%X/%X", &tli, &switchpoint_hi, &switchpoint_lo);
609

610
			if (nfields < 1)
611
			{
612
				/* expect a numeric timeline ID as first field of line */
613
				elog(ERROR, "Syntax error in timeline history: \"%s\". Expected a numeric timeline ID.", tempStr);
614
			}
615
			if (nfields != 3)
616
				elog(ERROR, "Syntax error in timeline history: \"%s\". Expected a transaction log switchpoint location.", tempStr);
617

618
			if (last_timeline && tli <= last_timeline->tli)
619
				elog(ERROR, "Timeline IDs must be in increasing sequence: \"%s\"", tempStr);
620

621
			entry = pgut_new(TimeLineHistoryEntry);
622
			entry->tli = tli;
623
			entry->end = ((uint64) switchpoint_hi << 32) | switchpoint_lo;
624

625
			last_timeline = entry;
626
			/* Build list with newest item first */
627
			if (!result)
628
				result = parray_new();
629
			parray_append(result, entry);
630
			elog(VERBOSE, "parse_tli_history_buffer() found entry: tli = %X, end = %X/%X",
631
				tli, switchpoint_hi, switchpoint_lo);
632

633
			/* we ignore the remainder of each line */
634
		}
635
	}
636

637
	return result;
638
}
639

640
/*
641
 * Maybe add a StreamOptions struct ?
642
 * Backup conn only needed to calculate stream_stop_timeout. Think about refactoring it.
643
 */
644
void
645
start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOptions *conn_opt,
646
					XLogRecPtr startpos, TimeLineID starttli, bool is_backup)
647
{
648
	/* calculate crc only when running backup, catchup has no need for it */
649
	do_crc = is_backup;
650
	/* How long we should wait for streaming end after pg_stop_backup */
651
	stream_stop_timeout = checkpoint_timeout(backup_conn);
652
	//TODO Add a comment about this calculation
653
	stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1;
654

655
	strlcpy(stream_thread_arg.basedir, stream_dst_path, sizeof(stream_thread_arg.basedir));
656

657
	/*
658
	 * Connect in replication mode to the server.
659
	 */
660
	stream_thread_arg.conn = pgut_connect_replication(conn_opt->pghost,
661
													  conn_opt->pgport,
662
													  conn_opt->pgdatabase,
663
													  conn_opt->pguser,
664
													  true);
665
	/* sanity check*/
666
	IdentifySystem(&stream_thread_arg);
667

668
	/* Set error exit code as default */
669
	stream_thread_arg.ret = 1;
670
	/* we must use startpos as start_lsn from start_backup */
671
	stream_thread_arg.startpos = startpos;
672
	stream_thread_arg.starttli = starttli;
673

674
	thread_interrupted = false;
675
	pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
676
}
677

678
/*
679
 * Wait for the completion of stream
680
 * append list of streamed xlog files
681
 * into backup_files_list (if it is not NULL)
682
 */
683
int
684
wait_WAL_streaming_end(parray *backup_files_list)
685
{
686
    pthread_join(stream_thread, NULL);
687

688
    if(backup_files_list != NULL)
689
        parray_concat(backup_files_list, xlog_files_list);
690
    parray_free(xlog_files_list);
691
    return stream_thread_arg.ret;
692
}
693

694
/* Append streamed WAL segment to filelist  */
695
void
696
add_walsegment_to_filelist(parray *filelist, uint32 timeline, XLogRecPtr xlogpos, char *basedir, uint32 xlog_seg_size)
697
{
698
    XLogSegNo xlog_segno;
699
    char wal_segment_name[MAXFNAMELEN];
700
    char wal_segment_relpath[MAXPGPATH];
701
    char wal_segment_fullpath[MAXPGPATH];
702
    pgFile *file = NULL;
703
    pgFile **existing_file = NULL;
704

705
    GetXLogSegNo(xlogpos, xlog_segno, xlog_seg_size);
706

707
    /*
708
     * When xlogpos points to the zero offset (0/3000000),
709
     * it means that previous segment was just successfully streamed.
710
     * When xlogpos points to the positive offset,
711
     * then current segment is successfully streamed.
712
     */
713
    if (WalSegmentOffset(xlogpos, xlog_seg_size) == 0)
714
        xlog_segno--;
715

716
    GetXLogFileName(wal_segment_name, timeline, xlog_segno, xlog_seg_size);
717

718
    join_path_components(wal_segment_fullpath, basedir, wal_segment_name);
719
    join_path_components(wal_segment_relpath, PG_XLOG_DIR, wal_segment_name);
720

721
    file = pgFileNew(wal_segment_fullpath, wal_segment_relpath, false, 0, FIO_BACKUP_HOST);
722

723
    /*
724
     * Check if file is already in the list
725
     * stop_lsn segment can be added to this list twice, so
726
     * try not to add duplicates
727
     */
728

729
    existing_file = (pgFile **) parray_bsearch(filelist, file, pgFileCompareRelPathWithExternal);
730

731
    if (existing_file)
732
    {
733
        if (do_crc)
734
            (*existing_file)->crc = pgFileGetCRC(wal_segment_fullpath, true, false);
735
        (*existing_file)->write_size = xlog_seg_size;
736
        (*existing_file)->uncompressed_size = xlog_seg_size;
737

738
        return;
739
    }
740

741
    if (do_crc)
742
        file->crc = pgFileGetCRC(wal_segment_fullpath, true, false);
743

744
    /* Should we recheck it using stat? */
745
    file->write_size = xlog_seg_size;
746
    file->uncompressed_size = xlog_seg_size;
747

748
    /* append file to filelist */
749
    parray_append(filelist, file);
750
}
751

752
/* Append history file to filelist  */
753
void
754
add_history_file_to_filelist(parray *filelist, uint32 timeline, char *basedir)
755
{
756
    char filename[MAXFNAMELEN];
757
    char fullpath[MAXPGPATH];
758
    char relpath[MAXPGPATH];
759
    pgFile *file = NULL;
760

761
    /* Timeline 1 does not have a history file */
762
    if (timeline == 1)
763
        return;
764

765
    snprintf(filename, lengthof(filename), "%08X.history", timeline);
766
    join_path_components(fullpath, basedir, filename);
767
    join_path_components(relpath, PG_XLOG_DIR, filename);
768

769
    file = pgFileNew(fullpath, relpath, false, 0, FIO_BACKUP_HOST);
770

771
    /* calculate crc */
772
    if (do_crc)
773
        file->crc = pgFileGetCRC(fullpath, true, false);
774
    file->write_size = file->size;
775
    file->uncompressed_size = file->size;
776

777
    /* append file to filelist */
778
    parray_append(filelist, file);
779
}
780

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.