git
/
streaming.c
551 строка · 12.6 Кб
1/*
2* Copyright (c) 2011, Google Inc.
3*/
4
5#define USE_THE_REPOSITORY_VARIABLE6
7#include "git-compat-util.h"8#include "convert.h"9#include "environment.h"10#include "streaming.h"11#include "repository.h"12#include "object-file.h"13#include "object-store-ll.h"14#include "replace-object.h"15#include "packfile.h"16
17typedef int (*open_istream_fn)(struct git_istream *,18struct repository *,19const struct object_id *,20enum object_type *);21typedef int (*close_istream_fn)(struct git_istream *);22typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);23
24#define FILTER_BUFFER (1024*16)25
26struct filtered_istream {27struct git_istream *upstream;28struct stream_filter *filter;29char ibuf[FILTER_BUFFER];30char obuf[FILTER_BUFFER];31int i_end, i_ptr;32int o_end, o_ptr;33int input_finished;34};35
36struct git_istream {37open_istream_fn open;38close_istream_fn close;39read_istream_fn read;40
41unsigned long size; /* inflated size of full object */42git_zstream z;43enum { z_unused, z_used, z_done, z_error } z_state;44
45union {46struct {47char *buf; /* from oid_object_info_extended() */48unsigned long read_ptr;49} incore;50
51struct {52void *mapped;53unsigned long mapsize;54char hdr[32];55int hdr_avail;56int hdr_used;57} loose;58
59struct {60struct packed_git *pack;61off_t pos;62} in_pack;63
64struct filtered_istream filtered;65} u;66};67
68/*****************************************************************
69*
70* Common helpers
71*
72*****************************************************************/
73
74static void close_deflated_stream(struct git_istream *st)75{
76if (st->z_state == z_used)77git_inflate_end(&st->z);78}
79
80
81/*****************************************************************
82*
83* Filtered stream
84*
85*****************************************************************/
86
87static int close_istream_filtered(struct git_istream *st)88{
89free_stream_filter(st->u.filtered.filter);90return close_istream(st->u.filtered.upstream);91}
92
93static ssize_t read_istream_filtered(struct git_istream *st, char *buf,94size_t sz)95{
96struct filtered_istream *fs = &(st->u.filtered);97size_t filled = 0;98
99while (sz) {100/* do we already have filtered output? */101if (fs->o_ptr < fs->o_end) {102size_t to_move = fs->o_end - fs->o_ptr;103if (sz < to_move)104to_move = sz;105memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);106fs->o_ptr += to_move;107sz -= to_move;108filled += to_move;109continue;110}111fs->o_end = fs->o_ptr = 0;112
113/* do we have anything to feed the filter with? */114if (fs->i_ptr < fs->i_end) {115size_t to_feed = fs->i_end - fs->i_ptr;116size_t to_receive = FILTER_BUFFER;117if (stream_filter(fs->filter,118fs->ibuf + fs->i_ptr, &to_feed,119fs->obuf, &to_receive))120return -1;121fs->i_ptr = fs->i_end - to_feed;122fs->o_end = FILTER_BUFFER - to_receive;123continue;124}125
126/* tell the filter to drain upon no more input */127if (fs->input_finished) {128size_t to_receive = FILTER_BUFFER;129if (stream_filter(fs->filter,130NULL, NULL,131fs->obuf, &to_receive))132return -1;133fs->o_end = FILTER_BUFFER - to_receive;134if (!fs->o_end)135break;136continue;137}138fs->i_end = fs->i_ptr = 0;139
140/* refill the input from the upstream */141if (!fs->input_finished) {142fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);143if (fs->i_end < 0)144return -1;145if (fs->i_end)146continue;147}148fs->input_finished = 1;149}150return filled;151}
152
153static struct git_istream *attach_stream_filter(struct git_istream *st,154struct stream_filter *filter)155{
156struct git_istream *ifs = xmalloc(sizeof(*ifs));157struct filtered_istream *fs = &(ifs->u.filtered);158
159ifs->close = close_istream_filtered;160ifs->read = read_istream_filtered;161fs->upstream = st;162fs->filter = filter;163fs->i_end = fs->i_ptr = 0;164fs->o_end = fs->o_ptr = 0;165fs->input_finished = 0;166ifs->size = -1; /* unknown */167return ifs;168}
169
170/*****************************************************************
171*
172* Loose object stream
173*
174*****************************************************************/
175
176static ssize_t read_istream_loose(struct git_istream *st, char *buf, size_t sz)177{
178size_t total_read = 0;179
180switch (st->z_state) {181case z_done:182return 0;183case z_error:184return -1;185default:186break;187}188
189if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {190size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;191if (sz < to_copy)192to_copy = sz;193memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);194st->u.loose.hdr_used += to_copy;195total_read += to_copy;196}197
198while (total_read < sz) {199int status;200
201st->z.next_out = (unsigned char *)buf + total_read;202st->z.avail_out = sz - total_read;203status = git_inflate(&st->z, Z_FINISH);204
205total_read = st->z.next_out - (unsigned char *)buf;206
207if (status == Z_STREAM_END) {208git_inflate_end(&st->z);209st->z_state = z_done;210break;211}212if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {213git_inflate_end(&st->z);214st->z_state = z_error;215return -1;216}217}218return total_read;219}
220
221static int close_istream_loose(struct git_istream *st)222{
223close_deflated_stream(st);224munmap(st->u.loose.mapped, st->u.loose.mapsize);225return 0;226}
227
228static int open_istream_loose(struct git_istream *st, struct repository *r,229const struct object_id *oid,230enum object_type *type)231{
232struct object_info oi = OBJECT_INFO_INIT;233oi.sizep = &st->size;234oi.typep = type;235
236st->u.loose.mapped = map_loose_object(r, oid, &st->u.loose.mapsize);237if (!st->u.loose.mapped)238return -1;239switch (unpack_loose_header(&st->z, st->u.loose.mapped,240st->u.loose.mapsize, st->u.loose.hdr,241sizeof(st->u.loose.hdr), NULL)) {242case ULHR_OK:243break;244case ULHR_BAD:245case ULHR_TOO_LONG:246goto error;247}248if (parse_loose_header(st->u.loose.hdr, &oi) < 0 || *type < 0)249goto error;250
251st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;252st->u.loose.hdr_avail = st->z.total_out;253st->z_state = z_used;254st->close = close_istream_loose;255st->read = read_istream_loose;256
257return 0;258error:259git_inflate_end(&st->z);260munmap(st->u.loose.mapped, st->u.loose.mapsize);261return -1;262}
263
264
265/*****************************************************************
266*
267* Non-delta packed object stream
268*
269*****************************************************************/
270
271static ssize_t read_istream_pack_non_delta(struct git_istream *st, char *buf,272size_t sz)273{
274size_t total_read = 0;275
276switch (st->z_state) {277case z_unused:278memset(&st->z, 0, sizeof(st->z));279git_inflate_init(&st->z);280st->z_state = z_used;281break;282case z_done:283return 0;284case z_error:285return -1;286case z_used:287break;288}289
290while (total_read < sz) {291int status;292struct pack_window *window = NULL;293unsigned char *mapped;294
295mapped = use_pack(st->u.in_pack.pack, &window,296st->u.in_pack.pos, &st->z.avail_in);297
298st->z.next_out = (unsigned char *)buf + total_read;299st->z.avail_out = sz - total_read;300st->z.next_in = mapped;301status = git_inflate(&st->z, Z_FINISH);302
303st->u.in_pack.pos += st->z.next_in - mapped;304total_read = st->z.next_out - (unsigned char *)buf;305unuse_pack(&window);306
307if (status == Z_STREAM_END) {308git_inflate_end(&st->z);309st->z_state = z_done;310break;311}312
313/*314* Unlike the loose object case, we do not have to worry here
315* about running out of input bytes and spinning infinitely. If
316* we get Z_BUF_ERROR due to too few input bytes, then we'll
317* replenish them in the next use_pack() call when we loop. If
318* we truly hit the end of the pack (i.e., because it's corrupt
319* or truncated), then use_pack() catches that and will die().
320*/
321if (status != Z_OK && status != Z_BUF_ERROR) {322git_inflate_end(&st->z);323st->z_state = z_error;324return -1;325}326}327return total_read;328}
329
330static int close_istream_pack_non_delta(struct git_istream *st)331{
332close_deflated_stream(st);333return 0;334}
335
336static int open_istream_pack_non_delta(struct git_istream *st,337struct repository *r UNUSED,338const struct object_id *oid UNUSED,339enum object_type *type UNUSED)340{
341struct pack_window *window;342enum object_type in_pack_type;343
344window = NULL;345
346in_pack_type = unpack_object_header(st->u.in_pack.pack,347&window,348&st->u.in_pack.pos,349&st->size);350unuse_pack(&window);351switch (in_pack_type) {352default:353return -1; /* we do not do deltas for now */354case OBJ_COMMIT:355case OBJ_TREE:356case OBJ_BLOB:357case OBJ_TAG:358break;359}360st->z_state = z_unused;361st->close = close_istream_pack_non_delta;362st->read = read_istream_pack_non_delta;363
364return 0;365}
366
367
368/*****************************************************************
369*
370* In-core stream
371*
372*****************************************************************/
373
374static int close_istream_incore(struct git_istream *st)375{
376free(st->u.incore.buf);377return 0;378}
379
380static ssize_t read_istream_incore(struct git_istream *st, char *buf, size_t sz)381{
382size_t read_size = sz;383size_t remainder = st->size - st->u.incore.read_ptr;384
385if (remainder <= read_size)386read_size = remainder;387if (read_size) {388memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);389st->u.incore.read_ptr += read_size;390}391return read_size;392}
393
394static int open_istream_incore(struct git_istream *st, struct repository *r,395const struct object_id *oid, enum object_type *type)396{
397struct object_info oi = OBJECT_INFO_INIT;398
399st->u.incore.read_ptr = 0;400st->close = close_istream_incore;401st->read = read_istream_incore;402
403oi.typep = type;404oi.sizep = &st->size;405oi.contentp = (void **)&st->u.incore.buf;406return oid_object_info_extended(r, oid, &oi,407OBJECT_INFO_DIE_IF_CORRUPT);408}
409
410/*****************************************************************************
411* static helpers variables and functions for users of streaming interface
412*****************************************************************************/
413
414static int istream_source(struct git_istream *st,415struct repository *r,416const struct object_id *oid,417enum object_type *type)418{
419unsigned long size;420int status;421struct object_info oi = OBJECT_INFO_INIT;422
423oi.typep = type;424oi.sizep = &size;425status = oid_object_info_extended(r, oid, &oi, 0);426if (status < 0)427return status;428
429switch (oi.whence) {430case OI_LOOSE:431st->open = open_istream_loose;432return 0;433case OI_PACKED:434if (!oi.u.packed.is_delta && big_file_threshold < size) {435st->u.in_pack.pack = oi.u.packed.pack;436st->u.in_pack.pos = oi.u.packed.offset;437st->open = open_istream_pack_non_delta;438return 0;439}440/* fallthru */441default:442st->open = open_istream_incore;443return 0;444}445}
446
447/****************************************************************
448* Users of streaming interface
449****************************************************************/
450
451int close_istream(struct git_istream *st)452{
453int r = st->close(st);454free(st);455return r;456}
457
458ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)459{
460return st->read(st, buf, sz);461}
462
463struct git_istream *open_istream(struct repository *r,464const struct object_id *oid,465enum object_type *type,466unsigned long *size,467struct stream_filter *filter)468{
469struct git_istream *st = xmalloc(sizeof(*st));470const struct object_id *real = lookup_replace_object(r, oid);471int ret = istream_source(st, r, real, type);472
473if (ret) {474free(st);475return NULL;476}477
478if (st->open(st, r, real, type)) {479if (open_istream_incore(st, r, real, type)) {480free(st);481return NULL;482}483}484if (filter) {485/* Add "&& !is_null_stream_filter(filter)" for performance */486struct git_istream *nst = attach_stream_filter(st, filter);487if (!nst) {488close_istream(st);489return NULL;490}491st = nst;492}493
494*size = st->size;495return st;496}
497
498int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,499int can_seek)500{
501struct git_istream *st;502enum object_type type;503unsigned long sz;504ssize_t kept = 0;505int result = -1;506
507st = open_istream(the_repository, oid, &type, &sz, filter);508if (!st) {509if (filter)510free_stream_filter(filter);511return result;512}513if (type != OBJ_BLOB)514goto close_and_exit;515for (;;) {516char buf[1024 * 16];517ssize_t wrote, holeto;518ssize_t readlen = read_istream(st, buf, sizeof(buf));519
520if (readlen < 0)521goto close_and_exit;522if (!readlen)523break;524if (can_seek && sizeof(buf) == readlen) {525for (holeto = 0; holeto < readlen; holeto++)526if (buf[holeto])527break;528if (readlen == holeto) {529kept += holeto;530continue;531}532}533
534if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)535goto close_and_exit;536else537kept = 0;538wrote = write_in_full(fd, buf, readlen);539
540if (wrote < 0)541goto close_and_exit;542}543if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||544xwrite(fd, "", 1) != 1))545goto close_and_exit;546result = 0;547
548close_and_exit:549close_istream(st);550return result;551}
552