git
/
parallel-checkout.c
676 строк · 18.2 Кб
1#define USE_THE_REPOSITORY_VARIABLE2
3#include "git-compat-util.h"4#include "config.h"5#include "entry.h"6#include "gettext.h"7#include "hash.h"8#include "hex.h"9#include "parallel-checkout.h"10#include "pkt-line.h"11#include "progress.h"12#include "read-cache-ll.h"13#include "run-command.h"14#include "sigchain.h"15#include "streaming.h"16#include "symlinks.h"17#include "thread-utils.h"18#include "trace2.h"19
20struct pc_worker {21struct child_process cp;22size_t next_item_to_complete, nr_items_to_complete;23};24
25struct parallel_checkout {26enum pc_status status;27struct parallel_checkout_item *items; /* The parallel checkout queue. */28size_t nr, alloc;29struct progress *progress;30unsigned int *progress_cnt;31};32
33static struct parallel_checkout parallel_checkout;34
35enum pc_status parallel_checkout_status(void)36{
37return parallel_checkout.status;38}
39
40static const int DEFAULT_THRESHOLD_FOR_PARALLELISM = 100;41static const int DEFAULT_NUM_WORKERS = 1;42
43void get_parallel_checkout_configs(int *num_workers, int *threshold)44{
45char *env_workers = getenv("GIT_TEST_CHECKOUT_WORKERS");46
47if (env_workers && *env_workers) {48if (strtol_i(env_workers, 10, num_workers)) {49die(_("invalid value for '%s': '%s'"),50"GIT_TEST_CHECKOUT_WORKERS", env_workers);51}52if (*num_workers < 1)53*num_workers = online_cpus();54
55*threshold = 0;56return;57}58
59if (git_config_get_int("checkout.workers", num_workers))60*num_workers = DEFAULT_NUM_WORKERS;61else if (*num_workers < 1)62*num_workers = online_cpus();63
64if (git_config_get_int("checkout.thresholdForParallelism", threshold))65*threshold = DEFAULT_THRESHOLD_FOR_PARALLELISM;66}
67
68void init_parallel_checkout(void)69{
70if (parallel_checkout.status != PC_UNINITIALIZED)71BUG("parallel checkout already initialized");72
73parallel_checkout.status = PC_ACCEPTING_ENTRIES;74}
75
76static void finish_parallel_checkout(void)77{
78if (parallel_checkout.status == PC_UNINITIALIZED)79BUG("cannot finish parallel checkout: not initialized yet");80
81free(parallel_checkout.items);82memset(¶llel_checkout, 0, sizeof(parallel_checkout));83}
84
85static int is_eligible_for_parallel_checkout(const struct cache_entry *ce,86const struct conv_attrs *ca)87{
88enum conv_attrs_classification c;89size_t packed_item_size;90
91/*92* Symlinks cannot be checked out in parallel as, in case of path
93* collision, they could racily replace leading directories of other
94* entries being checked out. Submodules are checked out in child
95* processes, which have their own parallel checkout queues.
96*/
97if (!S_ISREG(ce->ce_mode))98return 0;99
100packed_item_size = sizeof(struct pc_item_fixed_portion) + ce->ce_namelen +101(ca->working_tree_encoding ? strlen(ca->working_tree_encoding) : 0);102
103/*104* The amount of data we send to the workers per checkout item is
105* typically small (75~300B). So unless we find an insanely huge path
106* of 64KB, we should never reach the 65KB limit of one pkt-line. If
107* that does happen, we let the sequential code handle the item.
108*/
109if (packed_item_size > LARGE_PACKET_DATA_MAX)110return 0;111
112c = classify_conv_attrs(ca);113switch (c) {114case CA_CLASS_INCORE:115return 1;116
117case CA_CLASS_INCORE_FILTER:118/*119* It would be safe to allow concurrent instances of
120* single-file smudge filters, like rot13, but we should not
121* assume that all filters are parallel-process safe. So we
122* don't allow this.
123*/
124return 0;125
126case CA_CLASS_INCORE_PROCESS:127/*128* The parallel queue and the delayed queue are not compatible,
129* so they must be kept completely separated. And we can't tell
130* if a long-running process will delay its response without
131* actually asking it to perform the filtering. Therefore, this
132* type of filter is not allowed in parallel checkout.
133*
134* Furthermore, there should only be one instance of the
135* long-running process filter as we don't know how it is
136* managing its own concurrency. So, spreading the entries that
137* requisite such a filter among the parallel workers would
138* require a lot more inter-process communication. We would
139* probably have to designate a single process to interact with
140* the filter and send all the necessary data to it, for each
141* entry.
142*/
143return 0;144
145case CA_CLASS_STREAMABLE:146return 1;147
148default:149BUG("unsupported conv_attrs classification '%d'", c);150}151}
152
153int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca,154int *checkout_counter)155{
156struct parallel_checkout_item *pc_item;157
158if (parallel_checkout.status != PC_ACCEPTING_ENTRIES ||159!is_eligible_for_parallel_checkout(ce, ca))160return -1;161
162ALLOC_GROW(parallel_checkout.items, parallel_checkout.nr + 1,163parallel_checkout.alloc);164
165pc_item = ¶llel_checkout.items[parallel_checkout.nr];166pc_item->ce = ce;167memcpy(&pc_item->ca, ca, sizeof(pc_item->ca));168pc_item->status = PC_ITEM_PENDING;169pc_item->id = parallel_checkout.nr;170pc_item->checkout_counter = checkout_counter;171parallel_checkout.nr++;172
173return 0;174}
175
176size_t pc_queue_size(void)177{
178return parallel_checkout.nr;179}
180
181static void advance_progress_meter(void)182{
183if (parallel_checkout.progress) {184(*parallel_checkout.progress_cnt)++;185display_progress(parallel_checkout.progress,186*parallel_checkout.progress_cnt);187}188}
189
190static int handle_results(struct checkout *state)191{
192int ret = 0;193size_t i;194int have_pending = 0;195
196/*197* We first update the successfully written entries with the collected
198* stat() data, so that they can be found by mark_colliding_entries(),
199* in the next loop, when necessary.
200*/
201for (i = 0; i < parallel_checkout.nr; i++) {202struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i];203if (pc_item->status == PC_ITEM_WRITTEN)204update_ce_after_write(state, pc_item->ce, &pc_item->st);205}206
207for (i = 0; i < parallel_checkout.nr; i++) {208struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i];209
210switch(pc_item->status) {211case PC_ITEM_WRITTEN:212if (pc_item->checkout_counter)213(*pc_item->checkout_counter)++;214break;215case PC_ITEM_COLLIDED:216/*217* The entry could not be checked out due to a path
218* collision with another entry. Since there can only
219* be one entry of each colliding group on the disk, we
220* could skip trying to check out this one and move on.
221* However, this would leave the unwritten entries with
222* null stat() fields on the index, which could
223* potentially slow down subsequent operations that
224* require refreshing it: git would not be able to
225* trust st_size and would have to go to the filesystem
226* to see if the contents match (see ie_modified()).
227*
228* Instead, let's pay the overhead only once, now, and
229* call checkout_entry_ca() again for this file, to
230* have its stat() data stored in the index. This also
231* has the benefit of adding this entry and its
232* colliding pair to the collision report message.
233* Additionally, this overwriting behavior is consistent
234* with what the sequential checkout does, so it doesn't
235* add any extra overhead.
236*/
237ret |= checkout_entry_ca(pc_item->ce, &pc_item->ca,238state, NULL,239pc_item->checkout_counter);240advance_progress_meter();241break;242case PC_ITEM_PENDING:243have_pending = 1;244/* fall through */245case PC_ITEM_FAILED:246ret = -1;247break;248default:249BUG("unknown checkout item status in parallel checkout");250}251}252
253if (have_pending)254error("parallel checkout finished with pending entries");255
256return ret;257}
258
259static int reset_fd(int fd, const char *path)260{
261if (lseek(fd, 0, SEEK_SET) != 0)262return error_errno("failed to rewind descriptor of '%s'", path);263if (ftruncate(fd, 0))264return error_errno("failed to truncate file '%s'", path);265return 0;266}
267
268static int write_pc_item_to_fd(struct parallel_checkout_item *pc_item, int fd,269const char *path)270{
271int ret;272struct stream_filter *filter;273struct strbuf buf = STRBUF_INIT;274char *blob;275size_t size;276ssize_t wrote;277
278/* Sanity check */279assert(is_eligible_for_parallel_checkout(pc_item->ce, &pc_item->ca));280
281filter = get_stream_filter_ca(&pc_item->ca, &pc_item->ce->oid);282if (filter) {283if (stream_blob_to_fd(fd, &pc_item->ce->oid, filter, 1)) {284/* On error, reset fd to try writing without streaming */285if (reset_fd(fd, path))286return -1;287} else {288return 0;289}290}291
292blob = read_blob_entry(pc_item->ce, &size);293if (!blob)294return error("cannot read object %s '%s'",295oid_to_hex(&pc_item->ce->oid), pc_item->ce->name);296
297/*298* checkout metadata is used to give context for external process
299* filters. Files requiring such filters are not eligible for parallel
300* checkout, so pass NULL. Note: if that changes, the metadata must also
301* be passed from the main process to the workers.
302*/
303ret = convert_to_working_tree_ca(&pc_item->ca, pc_item->ce->name,304blob, size, &buf, NULL);305
306if (ret) {307size_t newsize;308free(blob);309blob = strbuf_detach(&buf, &newsize);310size = newsize;311}312
313wrote = write_in_full(fd, blob, size);314free(blob);315if (wrote < 0)316return error("unable to write file '%s'", path);317
318return 0;319}
320
321static int close_and_clear(int *fd)322{
323int ret = 0;324
325if (*fd >= 0) {326ret = close(*fd);327*fd = -1;328}329
330return ret;331}
332
333void write_pc_item(struct parallel_checkout_item *pc_item,334struct checkout *state)335{
336unsigned int mode = (pc_item->ce->ce_mode & 0100) ? 0777 : 0666;337int fd = -1, fstat_done = 0;338struct strbuf path = STRBUF_INIT;339const char *dir_sep;340
341strbuf_add(&path, state->base_dir, state->base_dir_len);342strbuf_add(&path, pc_item->ce->name, pc_item->ce->ce_namelen);343
344dir_sep = find_last_dir_sep(path.buf);345
346/*347* The leading dirs should have been already created by now. But, in
348* case of path collisions, one of the dirs could have been replaced by
349* a symlink (checked out after we enqueued this entry for parallel
350* checkout). Thus, we must check the leading dirs again.
351*/
352if (dir_sep && !has_dirs_only_path(path.buf, dir_sep - path.buf,353state->base_dir_len)) {354pc_item->status = PC_ITEM_COLLIDED;355trace2_data_string("pcheckout", NULL, "collision/dirname", path.buf);356goto out;357}358
359fd = open(path.buf, O_WRONLY | O_CREAT | O_EXCL, mode);360
361if (fd < 0) {362if (errno == EEXIST || errno == EISDIR) {363/*364* Errors which probably represent a path collision.
365* Suppress the error message and mark the item to be
366* retried later, sequentially. ENOTDIR and ENOENT are
367* also interesting, but the above has_dirs_only_path()
368* call should have already caught these cases.
369*/
370pc_item->status = PC_ITEM_COLLIDED;371trace2_data_string("pcheckout", NULL,372"collision/basename", path.buf);373} else {374error_errno("failed to open file '%s'", path.buf);375pc_item->status = PC_ITEM_FAILED;376}377goto out;378}379
380if (write_pc_item_to_fd(pc_item, fd, path.buf)) {381/* Error was already reported. */382pc_item->status = PC_ITEM_FAILED;383close_and_clear(&fd);384unlink(path.buf);385goto out;386}387
388fstat_done = fstat_checkout_output(fd, state, &pc_item->st);389
390if (close_and_clear(&fd)) {391error_errno("unable to close file '%s'", path.buf);392pc_item->status = PC_ITEM_FAILED;393goto out;394}395
396if (state->refresh_cache && !fstat_done && lstat(path.buf, &pc_item->st) < 0) {397error_errno("unable to stat just-written file '%s'", path.buf);398pc_item->status = PC_ITEM_FAILED;399goto out;400}401
402pc_item->status = PC_ITEM_WRITTEN;403
404out:405strbuf_release(&path);406}
407
408static void send_one_item(int fd, struct parallel_checkout_item *pc_item)409{
410size_t len_data;411char *data, *variant;412struct pc_item_fixed_portion *fixed_portion;413const char *working_tree_encoding = pc_item->ca.working_tree_encoding;414size_t name_len = pc_item->ce->ce_namelen;415size_t working_tree_encoding_len = working_tree_encoding ?416strlen(working_tree_encoding) : 0;417
418/*419* Any changes in the calculation of the message size must also be made
420* in is_eligible_for_parallel_checkout().
421*/
422len_data = sizeof(struct pc_item_fixed_portion) + name_len +423working_tree_encoding_len;424
425data = xmalloc(len_data);426
427fixed_portion = (struct pc_item_fixed_portion *)data;428fixed_portion->id = pc_item->id;429fixed_portion->ce_mode = pc_item->ce->ce_mode;430fixed_portion->crlf_action = pc_item->ca.crlf_action;431fixed_portion->ident = pc_item->ca.ident;432fixed_portion->name_len = name_len;433fixed_portion->working_tree_encoding_len = working_tree_encoding_len;434oidcpy(&fixed_portion->oid, &pc_item->ce->oid);435
436variant = data + sizeof(*fixed_portion);437if (working_tree_encoding_len) {438memcpy(variant, working_tree_encoding, working_tree_encoding_len);439variant += working_tree_encoding_len;440}441memcpy(variant, pc_item->ce->name, name_len);442
443packet_write(fd, data, len_data);444
445free(data);446}
447
448static void send_batch(int fd, size_t start, size_t nr)449{
450size_t i;451sigchain_push(SIGPIPE, SIG_IGN);452for (i = 0; i < nr; i++)453send_one_item(fd, ¶llel_checkout.items[start + i]);454packet_flush(fd);455sigchain_pop(SIGPIPE);456}
457
458static struct pc_worker *setup_workers(struct checkout *state, int num_workers)459{
460struct pc_worker *workers;461int i, workers_with_one_extra_item;462size_t base_batch_size, batch_beginning = 0;463
464ALLOC_ARRAY(workers, num_workers);465
466for (i = 0; i < num_workers; i++) {467struct child_process *cp = &workers[i].cp;468
469child_process_init(cp);470cp->git_cmd = 1;471cp->in = -1;472cp->out = -1;473cp->clean_on_exit = 1;474strvec_push(&cp->args, "checkout--worker");475if (state->base_dir_len)476strvec_pushf(&cp->args, "--prefix=%s", state->base_dir);477if (start_command(cp))478die("failed to spawn checkout worker");479}480
481base_batch_size = parallel_checkout.nr / num_workers;482workers_with_one_extra_item = parallel_checkout.nr % num_workers;483
484for (i = 0; i < num_workers; i++) {485struct pc_worker *worker = &workers[i];486size_t batch_size = base_batch_size;487
488/* distribute the extra work evenly */489if (i < workers_with_one_extra_item)490batch_size++;491
492send_batch(worker->cp.in, batch_beginning, batch_size);493worker->next_item_to_complete = batch_beginning;494worker->nr_items_to_complete = batch_size;495
496batch_beginning += batch_size;497}498
499return workers;500}
501
502static void finish_workers(struct pc_worker *workers, int num_workers)503{
504int i;505
506/*507* Close pipes before calling finish_command() to let the workers
508* exit asynchronously and avoid spending extra time on wait().
509*/
510for (i = 0; i < num_workers; i++) {511struct child_process *cp = &workers[i].cp;512if (cp->in >= 0)513close(cp->in);514if (cp->out >= 0)515close(cp->out);516}517
518for (i = 0; i < num_workers; i++) {519int rc = finish_command(&workers[i].cp);520if (rc > 128) {521/*522* For a normal non-zero exit, the worker should have
523* already printed something useful to stderr. But a
524* death by signal should be mentioned to the user.
525*/
526error("checkout worker %d died of signal %d", i, rc - 128);527}528}529
530free(workers);531}
532
533static inline void assert_pc_item_result_size(int got, int exp)534{
535if (got != exp)536BUG("wrong result size from checkout worker (got %dB, exp %dB)",537got, exp);538}
539
540static void parse_and_save_result(const char *buffer, int len,541struct pc_worker *worker)542{
543struct pc_item_result *res;544struct parallel_checkout_item *pc_item;545struct stat *st = NULL;546
547if (len < PC_ITEM_RESULT_BASE_SIZE)548BUG("too short result from checkout worker (got %dB, exp >=%dB)",549len, (int)PC_ITEM_RESULT_BASE_SIZE);550
551res = (struct pc_item_result *)buffer;552
553/*554* Worker should send either the full result struct on success, or
555* just the base (i.e. no stat data), otherwise.
556*/
557if (res->status == PC_ITEM_WRITTEN) {558assert_pc_item_result_size(len, (int)sizeof(struct pc_item_result));559st = &res->st;560} else {561assert_pc_item_result_size(len, (int)PC_ITEM_RESULT_BASE_SIZE);562}563
564if (!worker->nr_items_to_complete)565BUG("received result from supposedly finished checkout worker");566if (res->id != worker->next_item_to_complete)567BUG("unexpected item id from checkout worker (got %"PRIuMAX", exp %"PRIuMAX")",568(uintmax_t)res->id, (uintmax_t)worker->next_item_to_complete);569
570worker->next_item_to_complete++;571worker->nr_items_to_complete--;572
573pc_item = ¶llel_checkout.items[res->id];574pc_item->status = res->status;575if (st)576pc_item->st = *st;577
578if (res->status != PC_ITEM_COLLIDED)579advance_progress_meter();580}
581
582static void gather_results_from_workers(struct pc_worker *workers,583int num_workers)584{
585int i, active_workers = num_workers;586struct pollfd *pfds;587
588CALLOC_ARRAY(pfds, num_workers);589for (i = 0; i < num_workers; i++) {590pfds[i].fd = workers[i].cp.out;591pfds[i].events = POLLIN;592}593
594while (active_workers) {595int nr = poll(pfds, num_workers, -1);596
597if (nr < 0) {598if (errno == EINTR)599continue;600die_errno("failed to poll checkout workers");601}602
603for (i = 0; i < num_workers && nr > 0; i++) {604struct pc_worker *worker = &workers[i];605struct pollfd *pfd = &pfds[i];606
607if (!pfd->revents)608continue;609
610if (pfd->revents & POLLIN) {611int len = packet_read(pfd->fd, packet_buffer,612sizeof(packet_buffer), 0);613
614if (len < 0) {615BUG("packet_read() returned negative value");616} else if (!len) {617pfd->fd = -1;618active_workers--;619} else {620parse_and_save_result(packet_buffer,621len, worker);622}623} else if (pfd->revents & POLLHUP) {624pfd->fd = -1;625active_workers--;626} else if (pfd->revents & (POLLNVAL | POLLERR)) {627die("error polling from checkout worker");628}629
630nr--;631}632}633
634free(pfds);635}
636
637static void write_items_sequentially(struct checkout *state)638{
639size_t i;640
641for (i = 0; i < parallel_checkout.nr; i++) {642struct parallel_checkout_item *pc_item = ¶llel_checkout.items[i];643write_pc_item(pc_item, state);644if (pc_item->status != PC_ITEM_COLLIDED)645advance_progress_meter();646}647}
648
649int run_parallel_checkout(struct checkout *state, int num_workers, int threshold,650struct progress *progress, unsigned int *progress_cnt)651{
652int ret;653
654if (parallel_checkout.status != PC_ACCEPTING_ENTRIES)655BUG("cannot run parallel checkout: uninitialized or already running");656
657parallel_checkout.status = PC_RUNNING;658parallel_checkout.progress = progress;659parallel_checkout.progress_cnt = progress_cnt;660
661if (parallel_checkout.nr < num_workers)662num_workers = parallel_checkout.nr;663
664if (num_workers <= 1 || parallel_checkout.nr < threshold) {665write_items_sequentially(state);666} else {667struct pc_worker *workers = setup_workers(state, num_workers);668gather_results_from_workers(workers, num_workers);669finish_workers(workers, num_workers);670}671
672ret = handle_results(state);673
674finish_parallel_checkout();675return ret;676}
677