git

Форк
0
/
bulk-checkin.c 
389 строк · 10.4 Кб
1
/*
2
 * Copyright (c) 2011, Google Inc.
3
 */
4

5
#define USE_THE_REPOSITORY_VARIABLE
6

7
#include "git-compat-util.h"
8
#include "bulk-checkin.h"
9
#include "environment.h"
10
#include "gettext.h"
11
#include "hex.h"
12
#include "lockfile.h"
13
#include "repository.h"
14
#include "csum-file.h"
15
#include "pack.h"
16
#include "strbuf.h"
17
#include "tmp-objdir.h"
18
#include "packfile.h"
19
#include "object-file.h"
20
#include "object-store-ll.h"
21

22
static int odb_transaction_nesting;
23

24
static struct tmp_objdir *bulk_fsync_objdir;
25

26
static struct bulk_checkin_packfile {
27
	char *pack_tmp_name;
28
	struct hashfile *f;
29
	off_t offset;
30
	struct pack_idx_option pack_idx_opts;
31

32
	struct pack_idx_entry **written;
33
	uint32_t alloc_written;
34
	uint32_t nr_written;
35
} bulk_checkin_packfile;
36

37
static void finish_tmp_packfile(struct strbuf *basename,
38
				const char *pack_tmp_name,
39
				struct pack_idx_entry **written_list,
40
				uint32_t nr_written,
41
				struct pack_idx_option *pack_idx_opts,
42
				unsigned char hash[])
43
{
44
	char *idx_tmp_name = NULL;
45

46
	stage_tmp_packfiles(basename, pack_tmp_name, written_list, nr_written,
47
			    NULL, pack_idx_opts, hash, &idx_tmp_name);
48
	rename_tmp_packfile_idx(basename, &idx_tmp_name);
49

50
	free(idx_tmp_name);
51
}
52

53
static void flush_bulk_checkin_packfile(struct bulk_checkin_packfile *state)
54
{
55
	unsigned char hash[GIT_MAX_RAWSZ];
56
	struct strbuf packname = STRBUF_INIT;
57
	int i;
58

59
	if (!state->f)
60
		return;
61

62
	if (state->nr_written == 0) {
63
		close(state->f->fd);
64
		free_hashfile(state->f);
65
		unlink(state->pack_tmp_name);
66
		goto clear_exit;
67
	} else if (state->nr_written == 1) {
68
		finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK,
69
				  CSUM_HASH_IN_STREAM | CSUM_FSYNC | CSUM_CLOSE);
70
	} else {
71
		int fd = finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK, 0);
72
		fixup_pack_header_footer(fd, hash, state->pack_tmp_name,
73
					 state->nr_written, hash,
74
					 state->offset);
75
		close(fd);
76
	}
77

78
	strbuf_addf(&packname, "%s/pack/pack-%s.", get_object_directory(),
79
		    hash_to_hex(hash));
80
	finish_tmp_packfile(&packname, state->pack_tmp_name,
81
			    state->written, state->nr_written,
82
			    &state->pack_idx_opts, hash);
83
	for (i = 0; i < state->nr_written; i++)
84
		free(state->written[i]);
85

86
clear_exit:
87
	free(state->pack_tmp_name);
88
	free(state->written);
89
	memset(state, 0, sizeof(*state));
90

91
	strbuf_release(&packname);
92
	/* Make objects we just wrote available to ourselves */
93
	reprepare_packed_git(the_repository);
94
}
95

96
/*
97
 * Cleanup after batch-mode fsync_object_files.
98
 */
99
static void flush_batch_fsync(void)
100
{
101
	struct strbuf temp_path = STRBUF_INIT;
102
	struct tempfile *temp;
103

104
	if (!bulk_fsync_objdir)
105
		return;
106

107
	/*
108
	 * Issue a full hardware flush against a temporary file to ensure
109
	 * that all objects are durable before any renames occur. The code in
110
	 * fsync_loose_object_bulk_checkin has already issued a writeout
111
	 * request, but it has not flushed any writeback cache in the storage
112
	 * hardware or any filesystem logs. This fsync call acts as a barrier
113
	 * to ensure that the data in each new object file is durable before
114
	 * the final name is visible.
115
	 */
116
	strbuf_addf(&temp_path, "%s/bulk_fsync_XXXXXX", get_object_directory());
117
	temp = xmks_tempfile(temp_path.buf);
118
	fsync_or_die(get_tempfile_fd(temp), get_tempfile_path(temp));
119
	delete_tempfile(&temp);
120
	strbuf_release(&temp_path);
121

122
	/*
123
	 * Make the object files visible in the primary ODB after their data is
124
	 * fully durable.
125
	 */
126
	tmp_objdir_migrate(bulk_fsync_objdir);
127
	bulk_fsync_objdir = NULL;
128
}
129

130
static int already_written(struct bulk_checkin_packfile *state, struct object_id *oid)
131
{
132
	int i;
133

134
	/* The object may already exist in the repository */
135
	if (repo_has_object_file(the_repository, oid))
136
		return 1;
137

138
	/* Might want to keep the list sorted */
139
	for (i = 0; i < state->nr_written; i++)
140
		if (oideq(&state->written[i]->oid, oid))
141
			return 1;
142

143
	/* This is a new object we need to keep */
144
	return 0;
145
}
146

147
/*
148
 * Read the contents from fd for size bytes, streaming it to the
149
 * packfile in state while updating the hash in ctx. Signal a failure
150
 * by returning a negative value when the resulting pack would exceed
151
 * the pack size limit and this is not the first object in the pack,
152
 * so that the caller can discard what we wrote from the current pack
153
 * by truncating it and opening a new one. The caller will then call
154
 * us again after rewinding the input fd.
155
 *
156
 * The already_hashed_to pointer is kept untouched by the caller to
157
 * make sure we do not hash the same byte when we are called
158
 * again. This way, the caller does not have to checkpoint its hash
159
 * status before calling us just in case we ask it to call us again
160
 * with a new pack.
161
 */
162
static int stream_blob_to_pack(struct bulk_checkin_packfile *state,
163
			       git_hash_ctx *ctx, off_t *already_hashed_to,
164
			       int fd, size_t size, const char *path,
165
			       unsigned flags)
166
{
167
	git_zstream s;
168
	unsigned char ibuf[16384];
169
	unsigned char obuf[16384];
170
	unsigned hdrlen;
171
	int status = Z_OK;
172
	int write_object = (flags & HASH_WRITE_OBJECT);
173
	off_t offset = 0;
174

175
	git_deflate_init(&s, pack_compression_level);
176

177
	hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), OBJ_BLOB, size);
178
	s.next_out = obuf + hdrlen;
179
	s.avail_out = sizeof(obuf) - hdrlen;
180

181
	while (status != Z_STREAM_END) {
182
		if (size && !s.avail_in) {
183
			ssize_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf);
184
			ssize_t read_result = read_in_full(fd, ibuf, rsize);
185
			if (read_result < 0)
186
				die_errno("failed to read from '%s'", path);
187
			if (read_result != rsize)
188
				die("failed to read %d bytes from '%s'",
189
				    (int)rsize, path);
190
			offset += rsize;
191
			if (*already_hashed_to < offset) {
192
				size_t hsize = offset - *already_hashed_to;
193
				if (rsize < hsize)
194
					hsize = rsize;
195
				if (hsize)
196
					the_hash_algo->update_fn(ctx, ibuf, hsize);
197
				*already_hashed_to = offset;
198
			}
199
			s.next_in = ibuf;
200
			s.avail_in = rsize;
201
			size -= rsize;
202
		}
203

204
		status = git_deflate(&s, size ? 0 : Z_FINISH);
205

206
		if (!s.avail_out || status == Z_STREAM_END) {
207
			if (write_object) {
208
				size_t written = s.next_out - obuf;
209

210
				/* would we bust the size limit? */
211
				if (state->nr_written &&
212
				    pack_size_limit_cfg &&
213
				    pack_size_limit_cfg < state->offset + written) {
214
					git_deflate_abort(&s);
215
					return -1;
216
				}
217

218
				hashwrite(state->f, obuf, written);
219
				state->offset += written;
220
			}
221
			s.next_out = obuf;
222
			s.avail_out = sizeof(obuf);
223
		}
224

225
		switch (status) {
226
		case Z_OK:
227
		case Z_BUF_ERROR:
228
		case Z_STREAM_END:
229
			continue;
230
		default:
231
			die("unexpected deflate failure: %d", status);
232
		}
233
	}
234
	git_deflate_end(&s);
235
	return 0;
236
}
237

238
/* Lazily create backing packfile for the state */
239
static void prepare_to_stream(struct bulk_checkin_packfile *state,
240
			      unsigned flags)
241
{
242
	if (!(flags & HASH_WRITE_OBJECT) || state->f)
243
		return;
244

245
	state->f = create_tmp_packfile(&state->pack_tmp_name);
246
	reset_pack_idx_option(&state->pack_idx_opts);
247

248
	/* Pretend we are going to write only one object */
249
	state->offset = write_pack_header(state->f, 1);
250
	if (!state->offset)
251
		die_errno("unable to write pack header");
252
}
253

254
static int deflate_blob_to_pack(struct bulk_checkin_packfile *state,
255
				struct object_id *result_oid,
256
				int fd, size_t size,
257
				const char *path, unsigned flags)
258
{
259
	off_t seekback, already_hashed_to;
260
	git_hash_ctx ctx;
261
	unsigned char obuf[16384];
262
	unsigned header_len;
263
	struct hashfile_checkpoint checkpoint = {0};
264
	struct pack_idx_entry *idx = NULL;
265

266
	seekback = lseek(fd, 0, SEEK_CUR);
267
	if (seekback == (off_t) -1)
268
		return error("cannot find the current offset");
269

270
	header_len = format_object_header((char *)obuf, sizeof(obuf),
271
					  OBJ_BLOB, size);
272
	the_hash_algo->init_fn(&ctx);
273
	the_hash_algo->update_fn(&ctx, obuf, header_len);
274
	the_hash_algo->init_fn(&checkpoint.ctx);
275

276
	/* Note: idx is non-NULL when we are writing */
277
	if ((flags & HASH_WRITE_OBJECT) != 0)
278
		CALLOC_ARRAY(idx, 1);
279

280
	already_hashed_to = 0;
281

282
	while (1) {
283
		prepare_to_stream(state, flags);
284
		if (idx) {
285
			hashfile_checkpoint(state->f, &checkpoint);
286
			idx->offset = state->offset;
287
			crc32_begin(state->f);
288
		}
289
		if (!stream_blob_to_pack(state, &ctx, &already_hashed_to,
290
					 fd, size, path, flags))
291
			break;
292
		/*
293
		 * Writing this object to the current pack will make
294
		 * it too big; we need to truncate it, start a new
295
		 * pack, and write into it.
296
		 */
297
		if (!idx)
298
			BUG("should not happen");
299
		hashfile_truncate(state->f, &checkpoint);
300
		state->offset = checkpoint.offset;
301
		flush_bulk_checkin_packfile(state);
302
		if (lseek(fd, seekback, SEEK_SET) == (off_t) -1)
303
			return error("cannot seek back");
304
	}
305
	the_hash_algo->final_oid_fn(result_oid, &ctx);
306
	if (!idx)
307
		return 0;
308

309
	idx->crc32 = crc32_end(state->f);
310
	if (already_written(state, result_oid)) {
311
		hashfile_truncate(state->f, &checkpoint);
312
		state->offset = checkpoint.offset;
313
		free(idx);
314
	} else {
315
		oidcpy(&idx->oid, result_oid);
316
		ALLOC_GROW(state->written,
317
			   state->nr_written + 1,
318
			   state->alloc_written);
319
		state->written[state->nr_written++] = idx;
320
	}
321
	return 0;
322
}
323

324
void prepare_loose_object_bulk_checkin(void)
325
{
326
	/*
327
	 * We lazily create the temporary object directory
328
	 * the first time an object might be added, since
329
	 * callers may not know whether any objects will be
330
	 * added at the time they call begin_odb_transaction.
331
	 */
332
	if (!odb_transaction_nesting || bulk_fsync_objdir)
333
		return;
334

335
	bulk_fsync_objdir = tmp_objdir_create("bulk-fsync");
336
	if (bulk_fsync_objdir)
337
		tmp_objdir_replace_primary_odb(bulk_fsync_objdir, 0);
338
}
339

340
void fsync_loose_object_bulk_checkin(int fd, const char *filename)
341
{
342
	/*
343
	 * If we have an active ODB transaction, we issue a call that
344
	 * cleans the filesystem page cache but avoids a hardware flush
345
	 * command. Later on we will issue a single hardware flush
346
	 * before renaming the objects to their final names as part of
347
	 * flush_batch_fsync.
348
	 */
349
	if (!bulk_fsync_objdir ||
350
	    git_fsync(fd, FSYNC_WRITEOUT_ONLY) < 0) {
351
		if (errno == ENOSYS)
352
			warning(_("core.fsyncMethod = batch is unsupported on this platform"));
353
		fsync_or_die(fd, filename);
354
	}
355
}
356

357
int index_blob_bulk_checkin(struct object_id *oid,
358
			    int fd, size_t size,
359
			    const char *path, unsigned flags)
360
{
361
	int status = deflate_blob_to_pack(&bulk_checkin_packfile, oid, fd, size,
362
					  path, flags);
363
	if (!odb_transaction_nesting)
364
		flush_bulk_checkin_packfile(&bulk_checkin_packfile);
365
	return status;
366
}
367

368
void begin_odb_transaction(void)
369
{
370
	odb_transaction_nesting += 1;
371
}
372

373
void flush_odb_transaction(void)
374
{
375
	flush_batch_fsync();
376
	flush_bulk_checkin_packfile(&bulk_checkin_packfile);
377
}
378

379
void end_odb_transaction(void)
380
{
381
	odb_transaction_nesting -= 1;
382
	if (odb_transaction_nesting < 0)
383
		BUG("Unbalanced ODB transaction nesting");
384

385
	if (odb_transaction_nesting)
386
		return;
387

388
	flush_odb_transaction();
389
}
390

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.