git

Форк
0
/
streaming.c 
551 строка · 12.6 Кб
1
/*
2
 * Copyright (c) 2011, Google Inc.
3
 */
4

5
#define USE_THE_REPOSITORY_VARIABLE
6

7
#include "git-compat-util.h"
8
#include "convert.h"
9
#include "environment.h"
10
#include "streaming.h"
11
#include "repository.h"
12
#include "object-file.h"
13
#include "object-store-ll.h"
14
#include "replace-object.h"
15
#include "packfile.h"
16

17
typedef int (*open_istream_fn)(struct git_istream *,
18
			       struct repository *,
19
			       const struct object_id *,
20
			       enum object_type *);
21
typedef int (*close_istream_fn)(struct git_istream *);
22
typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
23

24
#define FILTER_BUFFER (1024*16)
25

26
struct filtered_istream {
27
	struct git_istream *upstream;
28
	struct stream_filter *filter;
29
	char ibuf[FILTER_BUFFER];
30
	char obuf[FILTER_BUFFER];
31
	int i_end, i_ptr;
32
	int o_end, o_ptr;
33
	int input_finished;
34
};
35

36
struct git_istream {
37
	open_istream_fn open;
38
	close_istream_fn close;
39
	read_istream_fn read;
40

41
	unsigned long size; /* inflated size of full object */
42
	git_zstream z;
43
	enum { z_unused, z_used, z_done, z_error } z_state;
44

45
	union {
46
		struct {
47
			char *buf; /* from oid_object_info_extended() */
48
			unsigned long read_ptr;
49
		} incore;
50

51
		struct {
52
			void *mapped;
53
			unsigned long mapsize;
54
			char hdr[32];
55
			int hdr_avail;
56
			int hdr_used;
57
		} loose;
58

59
		struct {
60
			struct packed_git *pack;
61
			off_t pos;
62
		} in_pack;
63

64
		struct filtered_istream filtered;
65
	} u;
66
};
67

68
/*****************************************************************
69
 *
70
 * Common helpers
71
 *
72
 *****************************************************************/
73

74
static void close_deflated_stream(struct git_istream *st)
75
{
76
	if (st->z_state == z_used)
77
		git_inflate_end(&st->z);
78
}
79

80

81
/*****************************************************************
82
 *
83
 * Filtered stream
84
 *
85
 *****************************************************************/
86

87
static int close_istream_filtered(struct git_istream *st)
88
{
89
	free_stream_filter(st->u.filtered.filter);
90
	return close_istream(st->u.filtered.upstream);
91
}
92

93
static ssize_t read_istream_filtered(struct git_istream *st, char *buf,
94
				     size_t sz)
95
{
96
	struct filtered_istream *fs = &(st->u.filtered);
97
	size_t filled = 0;
98

99
	while (sz) {
100
		/* do we already have filtered output? */
101
		if (fs->o_ptr < fs->o_end) {
102
			size_t to_move = fs->o_end - fs->o_ptr;
103
			if (sz < to_move)
104
				to_move = sz;
105
			memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
106
			fs->o_ptr += to_move;
107
			sz -= to_move;
108
			filled += to_move;
109
			continue;
110
		}
111
		fs->o_end = fs->o_ptr = 0;
112

113
		/* do we have anything to feed the filter with? */
114
		if (fs->i_ptr < fs->i_end) {
115
			size_t to_feed = fs->i_end - fs->i_ptr;
116
			size_t to_receive = FILTER_BUFFER;
117
			if (stream_filter(fs->filter,
118
					  fs->ibuf + fs->i_ptr, &to_feed,
119
					  fs->obuf, &to_receive))
120
				return -1;
121
			fs->i_ptr = fs->i_end - to_feed;
122
			fs->o_end = FILTER_BUFFER - to_receive;
123
			continue;
124
		}
125

126
		/* tell the filter to drain upon no more input */
127
		if (fs->input_finished) {
128
			size_t to_receive = FILTER_BUFFER;
129
			if (stream_filter(fs->filter,
130
					  NULL, NULL,
131
					  fs->obuf, &to_receive))
132
				return -1;
133
			fs->o_end = FILTER_BUFFER - to_receive;
134
			if (!fs->o_end)
135
				break;
136
			continue;
137
		}
138
		fs->i_end = fs->i_ptr = 0;
139

140
		/* refill the input from the upstream */
141
		if (!fs->input_finished) {
142
			fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
143
			if (fs->i_end < 0)
144
				return -1;
145
			if (fs->i_end)
146
				continue;
147
		}
148
		fs->input_finished = 1;
149
	}
150
	return filled;
151
}
152

153
static struct git_istream *attach_stream_filter(struct git_istream *st,
154
						struct stream_filter *filter)
155
{
156
	struct git_istream *ifs = xmalloc(sizeof(*ifs));
157
	struct filtered_istream *fs = &(ifs->u.filtered);
158

159
	ifs->close = close_istream_filtered;
160
	ifs->read = read_istream_filtered;
161
	fs->upstream = st;
162
	fs->filter = filter;
163
	fs->i_end = fs->i_ptr = 0;
164
	fs->o_end = fs->o_ptr = 0;
165
	fs->input_finished = 0;
166
	ifs->size = -1; /* unknown */
167
	return ifs;
168
}
169

170
/*****************************************************************
171
 *
172
 * Loose object stream
173
 *
174
 *****************************************************************/
175

176
static ssize_t read_istream_loose(struct git_istream *st, char *buf, size_t sz)
177
{
178
	size_t total_read = 0;
179

180
	switch (st->z_state) {
181
	case z_done:
182
		return 0;
183
	case z_error:
184
		return -1;
185
	default:
186
		break;
187
	}
188

189
	if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
190
		size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
191
		if (sz < to_copy)
192
			to_copy = sz;
193
		memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
194
		st->u.loose.hdr_used += to_copy;
195
		total_read += to_copy;
196
	}
197

198
	while (total_read < sz) {
199
		int status;
200

201
		st->z.next_out = (unsigned char *)buf + total_read;
202
		st->z.avail_out = sz - total_read;
203
		status = git_inflate(&st->z, Z_FINISH);
204

205
		total_read = st->z.next_out - (unsigned char *)buf;
206

207
		if (status == Z_STREAM_END) {
208
			git_inflate_end(&st->z);
209
			st->z_state = z_done;
210
			break;
211
		}
212
		if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
213
			git_inflate_end(&st->z);
214
			st->z_state = z_error;
215
			return -1;
216
		}
217
	}
218
	return total_read;
219
}
220

221
static int close_istream_loose(struct git_istream *st)
222
{
223
	close_deflated_stream(st);
224
	munmap(st->u.loose.mapped, st->u.loose.mapsize);
225
	return 0;
226
}
227

228
static int open_istream_loose(struct git_istream *st, struct repository *r,
229
			      const struct object_id *oid,
230
			      enum object_type *type)
231
{
232
	struct object_info oi = OBJECT_INFO_INIT;
233
	oi.sizep = &st->size;
234
	oi.typep = type;
235

236
	st->u.loose.mapped = map_loose_object(r, oid, &st->u.loose.mapsize);
237
	if (!st->u.loose.mapped)
238
		return -1;
239
	switch (unpack_loose_header(&st->z, st->u.loose.mapped,
240
				    st->u.loose.mapsize, st->u.loose.hdr,
241
				    sizeof(st->u.loose.hdr), NULL)) {
242
	case ULHR_OK:
243
		break;
244
	case ULHR_BAD:
245
	case ULHR_TOO_LONG:
246
		goto error;
247
	}
248
	if (parse_loose_header(st->u.loose.hdr, &oi) < 0 || *type < 0)
249
		goto error;
250

251
	st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
252
	st->u.loose.hdr_avail = st->z.total_out;
253
	st->z_state = z_used;
254
	st->close = close_istream_loose;
255
	st->read = read_istream_loose;
256

257
	return 0;
258
error:
259
	git_inflate_end(&st->z);
260
	munmap(st->u.loose.mapped, st->u.loose.mapsize);
261
	return -1;
262
}
263

264

265
/*****************************************************************
266
 *
267
 * Non-delta packed object stream
268
 *
269
 *****************************************************************/
270

271
static ssize_t read_istream_pack_non_delta(struct git_istream *st, char *buf,
272
					   size_t sz)
273
{
274
	size_t total_read = 0;
275

276
	switch (st->z_state) {
277
	case z_unused:
278
		memset(&st->z, 0, sizeof(st->z));
279
		git_inflate_init(&st->z);
280
		st->z_state = z_used;
281
		break;
282
	case z_done:
283
		return 0;
284
	case z_error:
285
		return -1;
286
	case z_used:
287
		break;
288
	}
289

290
	while (total_read < sz) {
291
		int status;
292
		struct pack_window *window = NULL;
293
		unsigned char *mapped;
294

295
		mapped = use_pack(st->u.in_pack.pack, &window,
296
				  st->u.in_pack.pos, &st->z.avail_in);
297

298
		st->z.next_out = (unsigned char *)buf + total_read;
299
		st->z.avail_out = sz - total_read;
300
		st->z.next_in = mapped;
301
		status = git_inflate(&st->z, Z_FINISH);
302

303
		st->u.in_pack.pos += st->z.next_in - mapped;
304
		total_read = st->z.next_out - (unsigned char *)buf;
305
		unuse_pack(&window);
306

307
		if (status == Z_STREAM_END) {
308
			git_inflate_end(&st->z);
309
			st->z_state = z_done;
310
			break;
311
		}
312

313
		/*
314
		 * Unlike the loose object case, we do not have to worry here
315
		 * about running out of input bytes and spinning infinitely. If
316
		 * we get Z_BUF_ERROR due to too few input bytes, then we'll
317
		 * replenish them in the next use_pack() call when we loop. If
318
		 * we truly hit the end of the pack (i.e., because it's corrupt
319
		 * or truncated), then use_pack() catches that and will die().
320
		 */
321
		if (status != Z_OK && status != Z_BUF_ERROR) {
322
			git_inflate_end(&st->z);
323
			st->z_state = z_error;
324
			return -1;
325
		}
326
	}
327
	return total_read;
328
}
329

330
static int close_istream_pack_non_delta(struct git_istream *st)
331
{
332
	close_deflated_stream(st);
333
	return 0;
334
}
335

336
static int open_istream_pack_non_delta(struct git_istream *st,
337
				       struct repository *r UNUSED,
338
				       const struct object_id *oid UNUSED,
339
				       enum object_type *type UNUSED)
340
{
341
	struct pack_window *window;
342
	enum object_type in_pack_type;
343

344
	window = NULL;
345

346
	in_pack_type = unpack_object_header(st->u.in_pack.pack,
347
					    &window,
348
					    &st->u.in_pack.pos,
349
					    &st->size);
350
	unuse_pack(&window);
351
	switch (in_pack_type) {
352
	default:
353
		return -1; /* we do not do deltas for now */
354
	case OBJ_COMMIT:
355
	case OBJ_TREE:
356
	case OBJ_BLOB:
357
	case OBJ_TAG:
358
		break;
359
	}
360
	st->z_state = z_unused;
361
	st->close = close_istream_pack_non_delta;
362
	st->read = read_istream_pack_non_delta;
363

364
	return 0;
365
}
366

367

368
/*****************************************************************
369
 *
370
 * In-core stream
371
 *
372
 *****************************************************************/
373

374
static int close_istream_incore(struct git_istream *st)
375
{
376
	free(st->u.incore.buf);
377
	return 0;
378
}
379

380
static ssize_t read_istream_incore(struct git_istream *st, char *buf, size_t sz)
381
{
382
	size_t read_size = sz;
383
	size_t remainder = st->size - st->u.incore.read_ptr;
384

385
	if (remainder <= read_size)
386
		read_size = remainder;
387
	if (read_size) {
388
		memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
389
		st->u.incore.read_ptr += read_size;
390
	}
391
	return read_size;
392
}
393

394
static int open_istream_incore(struct git_istream *st, struct repository *r,
395
			       const struct object_id *oid, enum object_type *type)
396
{
397
	struct object_info oi = OBJECT_INFO_INIT;
398

399
	st->u.incore.read_ptr = 0;
400
	st->close = close_istream_incore;
401
	st->read = read_istream_incore;
402

403
	oi.typep = type;
404
	oi.sizep = &st->size;
405
	oi.contentp = (void **)&st->u.incore.buf;
406
	return oid_object_info_extended(r, oid, &oi,
407
					OBJECT_INFO_DIE_IF_CORRUPT);
408
}
409

410
/*****************************************************************************
411
 * static helpers variables and functions for users of streaming interface
412
 *****************************************************************************/
413

414
static int istream_source(struct git_istream *st,
415
			  struct repository *r,
416
			  const struct object_id *oid,
417
			  enum object_type *type)
418
{
419
	unsigned long size;
420
	int status;
421
	struct object_info oi = OBJECT_INFO_INIT;
422

423
	oi.typep = type;
424
	oi.sizep = &size;
425
	status = oid_object_info_extended(r, oid, &oi, 0);
426
	if (status < 0)
427
		return status;
428

429
	switch (oi.whence) {
430
	case OI_LOOSE:
431
		st->open = open_istream_loose;
432
		return 0;
433
	case OI_PACKED:
434
		if (!oi.u.packed.is_delta && big_file_threshold < size) {
435
			st->u.in_pack.pack = oi.u.packed.pack;
436
			st->u.in_pack.pos = oi.u.packed.offset;
437
			st->open = open_istream_pack_non_delta;
438
			return 0;
439
		}
440
		/* fallthru */
441
	default:
442
		st->open = open_istream_incore;
443
		return 0;
444
	}
445
}
446

447
/****************************************************************
448
 * Users of streaming interface
449
 ****************************************************************/
450

451
int close_istream(struct git_istream *st)
452
{
453
	int r = st->close(st);
454
	free(st);
455
	return r;
456
}
457

458
ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
459
{
460
	return st->read(st, buf, sz);
461
}
462

463
struct git_istream *open_istream(struct repository *r,
464
				 const struct object_id *oid,
465
				 enum object_type *type,
466
				 unsigned long *size,
467
				 struct stream_filter *filter)
468
{
469
	struct git_istream *st = xmalloc(sizeof(*st));
470
	const struct object_id *real = lookup_replace_object(r, oid);
471
	int ret = istream_source(st, r, real, type);
472

473
	if (ret) {
474
		free(st);
475
		return NULL;
476
	}
477

478
	if (st->open(st, r, real, type)) {
479
		if (open_istream_incore(st, r, real, type)) {
480
			free(st);
481
			return NULL;
482
		}
483
	}
484
	if (filter) {
485
		/* Add "&& !is_null_stream_filter(filter)" for performance */
486
		struct git_istream *nst = attach_stream_filter(st, filter);
487
		if (!nst) {
488
			close_istream(st);
489
			return NULL;
490
		}
491
		st = nst;
492
	}
493

494
	*size = st->size;
495
	return st;
496
}
497

498
int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
499
		      int can_seek)
500
{
501
	struct git_istream *st;
502
	enum object_type type;
503
	unsigned long sz;
504
	ssize_t kept = 0;
505
	int result = -1;
506

507
	st = open_istream(the_repository, oid, &type, &sz, filter);
508
	if (!st) {
509
		if (filter)
510
			free_stream_filter(filter);
511
		return result;
512
	}
513
	if (type != OBJ_BLOB)
514
		goto close_and_exit;
515
	for (;;) {
516
		char buf[1024 * 16];
517
		ssize_t wrote, holeto;
518
		ssize_t readlen = read_istream(st, buf, sizeof(buf));
519

520
		if (readlen < 0)
521
			goto close_and_exit;
522
		if (!readlen)
523
			break;
524
		if (can_seek && sizeof(buf) == readlen) {
525
			for (holeto = 0; holeto < readlen; holeto++)
526
				if (buf[holeto])
527
					break;
528
			if (readlen == holeto) {
529
				kept += holeto;
530
				continue;
531
			}
532
		}
533

534
		if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
535
			goto close_and_exit;
536
		else
537
			kept = 0;
538
		wrote = write_in_full(fd, buf, readlen);
539

540
		if (wrote < 0)
541
			goto close_and_exit;
542
	}
543
	if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
544
		     xwrite(fd, "", 1) != 1))
545
		goto close_and_exit;
546
	result = 0;
547

548
 close_and_exit:
549
	close_istream(st);
550
	return result;
551
}
552

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.