qemu
/
job.c
1264 строки · 31.6 Кб
1/*
2* Background jobs (long-running operations)
3*
4* Copyright (c) 2011 IBM Corp.
5* Copyright (c) 2012, 2018 Red Hat, Inc.
6*
7* Permission is hereby granted, free of charge, to any person obtaining a copy
8* of this software and associated documentation files (the "Software"), to deal
9* in the Software without restriction, including without limitation the rights
10* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11* copies of the Software, and to permit persons to whom the Software is
12* furnished to do so, subject to the following conditions:
13*
14* The above copyright notice and this permission notice shall be included in
15* all copies or substantial portions of the Software.
16*
17* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23* THE SOFTWARE.
24*/
25
26#include "qemu/osdep.h"
27#include "qapi/error.h"
28#include "qemu/job.h"
29#include "qemu/id.h"
30#include "qemu/main-loop.h"
31#include "block/aio-wait.h"
32#include "trace/trace-root.h"
33#include "qapi/qapi-events-job.h"
34
35/*
36* The job API is composed of two categories of functions.
37*
38* The first includes functions used by the monitor. The monitor is
39* peculiar in that it accesses the job list with job_get, and
40* therefore needs consistency across job_get and the actual operation
41* (e.g. job_user_cancel). To achieve this consistency, the caller
42* calls job_lock/job_unlock itself around the whole operation.
43*
44*
45* The second includes functions used by the job drivers and sometimes
46* by the core block layer. These delegate the locking to the callee instead.
47*/
48
49/*
50* job_mutex protects the jobs list, but also makes the
51* struct job fields thread-safe.
52*/
53QemuMutex job_mutex;
54
55/* Protected by job_mutex */
56static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57
58/* Job State Transition Table */
59bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60/* U, C, R, P, Y, S, W, D, X, E, N */
61/* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62/* C: */ [JOB_STATUS_CREATED] = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63/* R: */ [JOB_STATUS_RUNNING] = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64/* P: */ [JOB_STATUS_PAUSED] = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65/* Y: */ [JOB_STATUS_READY] = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66/* S: */ [JOB_STATUS_STANDBY] = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67/* W: */ [JOB_STATUS_WAITING] = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68/* D: */ [JOB_STATUS_PENDING] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69/* X: */ [JOB_STATUS_ABORTING] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70/* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71/* N: */ [JOB_STATUS_NULL] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72};
73
74bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75/* U, C, R, P, Y, S, W, D, X, E, N */
76[JOB_VERB_CANCEL] = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77[JOB_VERB_PAUSE] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78[JOB_VERB_RESUME] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79[JOB_VERB_SET_SPEED] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80[JOB_VERB_COMPLETE] = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81[JOB_VERB_FINALIZE] = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82[JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83[JOB_VERB_CHANGE] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
84};
85
86/* Transactional group of jobs */
87struct JobTxn {
88
89/* Is this txn being cancelled? */
90bool aborting;
91
92/* List of jobs */
93QLIST_HEAD(, Job) jobs;
94
95/* Reference count */
96int refcnt;
97};
98
99void job_lock(void)
100{
101qemu_mutex_lock(&job_mutex);
102}
103
104void job_unlock(void)
105{
106qemu_mutex_unlock(&job_mutex);
107}
108
109static void __attribute__((__constructor__)) job_init(void)
110{
111qemu_mutex_init(&job_mutex);
112}
113
114JobTxn *job_txn_new(void)
115{
116JobTxn *txn = g_new0(JobTxn, 1);
117QLIST_INIT(&txn->jobs);
118txn->refcnt = 1;
119return txn;
120}
121
122/* Called with job_mutex held. */
123static void job_txn_ref_locked(JobTxn *txn)
124{
125txn->refcnt++;
126}
127
128void job_txn_unref_locked(JobTxn *txn)
129{
130if (txn && --txn->refcnt == 0) {
131g_free(txn);
132}
133}
134
135void job_txn_unref(JobTxn *txn)
136{
137JOB_LOCK_GUARD();
138job_txn_unref_locked(txn);
139}
140
141/**
142* @txn: The transaction (may be NULL)
143* @job: Job to add to the transaction
144*
145* Add @job to the transaction. The @job must not already be in a transaction.
146* The caller must call either job_txn_unref() or job_completed() to release
147* the reference that is automatically grabbed here.
148*
149* If @txn is NULL, the function does nothing.
150*
151* Called with job_mutex held.
152*/
153static void job_txn_add_job_locked(JobTxn *txn, Job *job)
154{
155if (!txn) {
156return;
157}
158
159assert(!job->txn);
160job->txn = txn;
161
162QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
163job_txn_ref_locked(txn);
164}
165
166/* Called with job_mutex held. */
167static void job_txn_del_job_locked(Job *job)
168{
169if (job->txn) {
170QLIST_REMOVE(job, txn_list);
171job_txn_unref_locked(job->txn);
172job->txn = NULL;
173}
174}
175
176/* Called with job_mutex held, but releases it temporarily. */
177static int job_txn_apply_locked(Job *job, int fn(Job *))
178{
179Job *other_job, *next;
180JobTxn *txn = job->txn;
181int rc = 0;
182
183/*
184* Similar to job_completed_txn_abort, we take each job's lock before
185* applying fn, but since we assume that outer_ctx is held by the caller,
186* we need to release it here to avoid holding the lock twice - which would
187* break AIO_WAIT_WHILE from within fn.
188*/
189job_ref_locked(job);
190
191QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
192rc = fn(other_job);
193if (rc) {
194break;
195}
196}
197
198job_unref_locked(job);
199return rc;
200}
201
202bool job_is_internal(Job *job)
203{
204return (job->id == NULL);
205}
206
207/* Called with job_mutex held. */
208static void job_state_transition_locked(Job *job, JobStatus s1)
209{
210JobStatus s0 = job->status;
211assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
212trace_job_state_transition(job, job->ret,
213JobSTT[s0][s1] ? "allowed" : "disallowed",
214JobStatus_str(s0), JobStatus_str(s1));
215assert(JobSTT[s0][s1]);
216job->status = s1;
217
218if (!job_is_internal(job) && s1 != s0) {
219qapi_event_send_job_status_change(job->id, job->status);
220}
221}
222
223int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
224{
225JobStatus s0 = job->status;
226assert(verb >= 0 && verb < JOB_VERB__MAX);
227trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
228JobVerbTable[verb][s0] ? "allowed" : "prohibited");
229if (JobVerbTable[verb][s0]) {
230return 0;
231}
232error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
233job->id, JobStatus_str(s0), JobVerb_str(verb));
234return -EPERM;
235}
236
237JobType job_type(const Job *job)
238{
239return job->driver->job_type;
240}
241
242const char *job_type_str(const Job *job)
243{
244return JobType_str(job_type(job));
245}
246
247bool job_is_cancelled_locked(Job *job)
248{
249/* force_cancel may be true only if cancelled is true, too */
250assert(job->cancelled || !job->force_cancel);
251return job->force_cancel;
252}
253
254bool job_is_cancelled(Job *job)
255{
256JOB_LOCK_GUARD();
257return job_is_cancelled_locked(job);
258}
259
260/* Called with job_mutex held. */
261static bool job_cancel_requested_locked(Job *job)
262{
263return job->cancelled;
264}
265
266bool job_cancel_requested(Job *job)
267{
268JOB_LOCK_GUARD();
269return job_cancel_requested_locked(job);
270}
271
272bool job_is_ready_locked(Job *job)
273{
274switch (job->status) {
275case JOB_STATUS_UNDEFINED:
276case JOB_STATUS_CREATED:
277case JOB_STATUS_RUNNING:
278case JOB_STATUS_PAUSED:
279case JOB_STATUS_WAITING:
280case JOB_STATUS_PENDING:
281case JOB_STATUS_ABORTING:
282case JOB_STATUS_CONCLUDED:
283case JOB_STATUS_NULL:
284return false;
285case JOB_STATUS_READY:
286case JOB_STATUS_STANDBY:
287return true;
288default:
289g_assert_not_reached();
290}
291return false;
292}
293
294bool job_is_ready(Job *job)
295{
296JOB_LOCK_GUARD();
297return job_is_ready_locked(job);
298}
299
300bool job_is_completed_locked(Job *job)
301{
302switch (job->status) {
303case JOB_STATUS_UNDEFINED:
304case JOB_STATUS_CREATED:
305case JOB_STATUS_RUNNING:
306case JOB_STATUS_PAUSED:
307case JOB_STATUS_READY:
308case JOB_STATUS_STANDBY:
309return false;
310case JOB_STATUS_WAITING:
311case JOB_STATUS_PENDING:
312case JOB_STATUS_ABORTING:
313case JOB_STATUS_CONCLUDED:
314case JOB_STATUS_NULL:
315return true;
316default:
317g_assert_not_reached();
318}
319return false;
320}
321
322static bool job_is_completed(Job *job)
323{
324JOB_LOCK_GUARD();
325return job_is_completed_locked(job);
326}
327
328static bool job_started_locked(Job *job)
329{
330return job->co;
331}
332
333/* Called with job_mutex held. */
334static bool job_should_pause_locked(Job *job)
335{
336return job->pause_count > 0;
337}
338
339Job *job_next_locked(Job *job)
340{
341if (!job) {
342return QLIST_FIRST(&jobs);
343}
344return QLIST_NEXT(job, job_list);
345}
346
347Job *job_next(Job *job)
348{
349JOB_LOCK_GUARD();
350return job_next_locked(job);
351}
352
353Job *job_get_locked(const char *id)
354{
355Job *job;
356
357QLIST_FOREACH(job, &jobs, job_list) {
358if (job->id && !strcmp(id, job->id)) {
359return job;
360}
361}
362
363return NULL;
364}
365
366void job_set_aio_context(Job *job, AioContext *ctx)
367{
368/* protect against read in job_finish_sync_locked and job_start */
369GLOBAL_STATE_CODE();
370/* protect against read in job_do_yield_locked */
371JOB_LOCK_GUARD();
372/* ensure the job is quiescent while the AioContext is changed */
373assert(job->paused || job_is_completed_locked(job));
374job->aio_context = ctx;
375}
376
377/* Called with job_mutex *not* held. */
378static void job_sleep_timer_cb(void *opaque)
379{
380Job *job = opaque;
381
382job_enter(job);
383}
384
385void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
386AioContext *ctx, int flags, BlockCompletionFunc *cb,
387void *opaque, Error **errp)
388{
389Job *job;
390
391JOB_LOCK_GUARD();
392
393if (job_id) {
394if (flags & JOB_INTERNAL) {
395error_setg(errp, "Cannot specify job ID for internal job");
396return NULL;
397}
398if (!id_wellformed(job_id)) {
399error_setg(errp, "Invalid job ID '%s'", job_id);
400return NULL;
401}
402if (job_get_locked(job_id)) {
403error_setg(errp, "Job ID '%s' already in use", job_id);
404return NULL;
405}
406} else if (!(flags & JOB_INTERNAL)) {
407error_setg(errp, "An explicit job ID is required");
408return NULL;
409}
410
411job = g_malloc0(driver->instance_size);
412job->driver = driver;
413job->id = g_strdup(job_id);
414job->refcnt = 1;
415job->aio_context = ctx;
416job->busy = false;
417job->paused = true;
418job->pause_count = 1;
419job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
420job->auto_dismiss = !(flags & JOB_MANUAL_DISMISS);
421job->cb = cb;
422job->opaque = opaque;
423
424progress_init(&job->progress);
425
426notifier_list_init(&job->on_finalize_cancelled);
427notifier_list_init(&job->on_finalize_completed);
428notifier_list_init(&job->on_pending);
429notifier_list_init(&job->on_ready);
430notifier_list_init(&job->on_idle);
431
432job_state_transition_locked(job, JOB_STATUS_CREATED);
433aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
434QEMU_CLOCK_REALTIME, SCALE_NS,
435job_sleep_timer_cb, job);
436
437QLIST_INSERT_HEAD(&jobs, job, job_list);
438
439/* Single jobs are modeled as single-job transactions for sake of
440* consolidating the job management logic */
441if (!txn) {
442txn = job_txn_new();
443job_txn_add_job_locked(txn, job);
444job_txn_unref_locked(txn);
445} else {
446job_txn_add_job_locked(txn, job);
447}
448
449return job;
450}
451
452void job_ref_locked(Job *job)
453{
454++job->refcnt;
455}
456
457void job_unref_locked(Job *job)
458{
459GLOBAL_STATE_CODE();
460
461if (--job->refcnt == 0) {
462assert(job->status == JOB_STATUS_NULL);
463assert(!timer_pending(&job->sleep_timer));
464assert(!job->txn);
465
466if (job->driver->free) {
467job_unlock();
468job->driver->free(job);
469job_lock();
470}
471
472QLIST_REMOVE(job, job_list);
473
474progress_destroy(&job->progress);
475error_free(job->err);
476g_free(job->id);
477g_free(job);
478}
479}
480
481void job_progress_update(Job *job, uint64_t done)
482{
483progress_work_done(&job->progress, done);
484}
485
486void job_progress_set_remaining(Job *job, uint64_t remaining)
487{
488progress_set_remaining(&job->progress, remaining);
489}
490
491void job_progress_increase_remaining(Job *job, uint64_t delta)
492{
493progress_increase_remaining(&job->progress, delta);
494}
495
496/**
497* To be called when a cancelled job is finalised.
498* Called with job_mutex held.
499*/
500static void job_event_cancelled_locked(Job *job)
501{
502notifier_list_notify(&job->on_finalize_cancelled, job);
503}
504
505/**
506* To be called when a successfully completed job is finalised.
507* Called with job_mutex held.
508*/
509static void job_event_completed_locked(Job *job)
510{
511notifier_list_notify(&job->on_finalize_completed, job);
512}
513
514/* Called with job_mutex held. */
515static void job_event_pending_locked(Job *job)
516{
517notifier_list_notify(&job->on_pending, job);
518}
519
520/* Called with job_mutex held. */
521static void job_event_ready_locked(Job *job)
522{
523notifier_list_notify(&job->on_ready, job);
524}
525
526/* Called with job_mutex held. */
527static void job_event_idle_locked(Job *job)
528{
529notifier_list_notify(&job->on_idle, job);
530}
531
532void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
533{
534if (!job_started_locked(job)) {
535return;
536}
537if (job->deferred_to_main_loop) {
538return;
539}
540
541if (job->busy) {
542return;
543}
544
545if (fn && !fn(job)) {
546return;
547}
548
549assert(!job->deferred_to_main_loop);
550timer_del(&job->sleep_timer);
551job->busy = true;
552job_unlock();
553aio_co_wake(job->co);
554job_lock();
555}
556
557void job_enter(Job *job)
558{
559JOB_LOCK_GUARD();
560job_enter_cond_locked(job, NULL);
561}
562
563/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
564* Reentering the job coroutine with job_enter() before the timer has expired
565* is allowed and cancels the timer.
566*
567* If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
568* called explicitly.
569*
570* Called with job_mutex held, but releases it temporarily.
571*/
572static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
573{
574AioContext *next_aio_context;
575
576if (ns != -1) {
577timer_mod(&job->sleep_timer, ns);
578}
579job->busy = false;
580job_event_idle_locked(job);
581job_unlock();
582qemu_coroutine_yield();
583job_lock();
584
585next_aio_context = job->aio_context;
586/*
587* Coroutine has resumed, but in the meanwhile the job AioContext
588* might have changed via bdrv_try_change_aio_context(), so we need to move
589* the coroutine too in the new aiocontext.
590*/
591while (qemu_get_current_aio_context() != next_aio_context) {
592job_unlock();
593aio_co_reschedule_self(next_aio_context);
594job_lock();
595next_aio_context = job->aio_context;
596}
597
598/* Set by job_enter_cond_locked() before re-entering the coroutine. */
599assert(job->busy);
600}
601
602/* Called with job_mutex held, but releases it temporarily. */
603static void coroutine_fn job_pause_point_locked(Job *job)
604{
605assert(job && job_started_locked(job));
606
607if (!job_should_pause_locked(job)) {
608return;
609}
610if (job_is_cancelled_locked(job)) {
611return;
612}
613
614if (job->driver->pause) {
615job_unlock();
616job->driver->pause(job);
617job_lock();
618}
619
620if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
621JobStatus status = job->status;
622job_state_transition_locked(job, status == JOB_STATUS_READY
623? JOB_STATUS_STANDBY
624: JOB_STATUS_PAUSED);
625job->paused = true;
626job_do_yield_locked(job, -1);
627job->paused = false;
628job_state_transition_locked(job, status);
629}
630
631if (job->driver->resume) {
632job_unlock();
633job->driver->resume(job);
634job_lock();
635}
636}
637
638void coroutine_fn job_pause_point(Job *job)
639{
640JOB_LOCK_GUARD();
641job_pause_point_locked(job);
642}
643
644void coroutine_fn job_yield(Job *job)
645{
646JOB_LOCK_GUARD();
647assert(job->busy);
648
649/* Check cancellation *before* setting busy = false, too! */
650if (job_is_cancelled_locked(job)) {
651return;
652}
653
654if (!job_should_pause_locked(job)) {
655job_do_yield_locked(job, -1);
656}
657
658job_pause_point_locked(job);
659}
660
661void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
662{
663JOB_LOCK_GUARD();
664assert(job->busy);
665
666/* Check cancellation *before* setting busy = false, too! */
667if (job_is_cancelled_locked(job)) {
668return;
669}
670
671if (!job_should_pause_locked(job)) {
672job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
673}
674
675job_pause_point_locked(job);
676}
677
678/* Assumes the job_mutex is held */
679static bool job_timer_not_pending_locked(Job *job)
680{
681return !timer_pending(&job->sleep_timer);
682}
683
684void job_pause_locked(Job *job)
685{
686job->pause_count++;
687if (!job->paused) {
688job_enter_cond_locked(job, NULL);
689}
690}
691
692void job_pause(Job *job)
693{
694JOB_LOCK_GUARD();
695job_pause_locked(job);
696}
697
698void job_resume_locked(Job *job)
699{
700assert(job->pause_count > 0);
701job->pause_count--;
702if (job->pause_count) {
703return;
704}
705
706/* kick only if no timer is pending */
707job_enter_cond_locked(job, job_timer_not_pending_locked);
708}
709
710void job_resume(Job *job)
711{
712JOB_LOCK_GUARD();
713job_resume_locked(job);
714}
715
716void job_user_pause_locked(Job *job, Error **errp)
717{
718if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
719return;
720}
721if (job->user_paused) {
722error_setg(errp, "Job is already paused");
723return;
724}
725job->user_paused = true;
726job_pause_locked(job);
727}
728
729bool job_user_paused_locked(Job *job)
730{
731return job->user_paused;
732}
733
734void job_user_resume_locked(Job *job, Error **errp)
735{
736assert(job);
737GLOBAL_STATE_CODE();
738if (!job->user_paused || job->pause_count <= 0) {
739error_setg(errp, "Can't resume a job that was not paused");
740return;
741}
742if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
743return;
744}
745if (job->driver->user_resume) {
746job_unlock();
747job->driver->user_resume(job);
748job_lock();
749}
750job->user_paused = false;
751job_resume_locked(job);
752}
753
754/* Called with job_mutex held, but releases it temporarily. */
755static void job_do_dismiss_locked(Job *job)
756{
757assert(job);
758job->busy = false;
759job->paused = false;
760job->deferred_to_main_loop = true;
761
762job_txn_del_job_locked(job);
763
764job_state_transition_locked(job, JOB_STATUS_NULL);
765job_unref_locked(job);
766}
767
768void job_dismiss_locked(Job **jobptr, Error **errp)
769{
770Job *job = *jobptr;
771/* similarly to _complete, this is QMP-interface only. */
772assert(job->id);
773if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
774return;
775}
776
777job_do_dismiss_locked(job);
778*jobptr = NULL;
779}
780
781void job_early_fail(Job *job)
782{
783JOB_LOCK_GUARD();
784assert(job->status == JOB_STATUS_CREATED);
785job_do_dismiss_locked(job);
786}
787
788/* Called with job_mutex held. */
789static void job_conclude_locked(Job *job)
790{
791job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
792if (job->auto_dismiss || !job_started_locked(job)) {
793job_do_dismiss_locked(job);
794}
795}
796
797/* Called with job_mutex held. */
798static void job_update_rc_locked(Job *job)
799{
800if (!job->ret && job_is_cancelled_locked(job)) {
801job->ret = -ECANCELED;
802}
803if (job->ret) {
804if (!job->err) {
805error_setg(&job->err, "%s", strerror(-job->ret));
806}
807job_state_transition_locked(job, JOB_STATUS_ABORTING);
808}
809}
810
811static void job_commit(Job *job)
812{
813assert(!job->ret);
814GLOBAL_STATE_CODE();
815if (job->driver->commit) {
816job->driver->commit(job);
817}
818}
819
820static void job_abort(Job *job)
821{
822assert(job->ret);
823GLOBAL_STATE_CODE();
824if (job->driver->abort) {
825job->driver->abort(job);
826}
827}
828
829static void job_clean(Job *job)
830{
831GLOBAL_STATE_CODE();
832if (job->driver->clean) {
833job->driver->clean(job);
834}
835}
836
837/*
838* Called with job_mutex held, but releases it temporarily.
839*/
840static int job_finalize_single_locked(Job *job)
841{
842int job_ret;
843
844assert(job_is_completed_locked(job));
845
846/* Ensure abort is called for late-transactional failures */
847job_update_rc_locked(job);
848
849job_ret = job->ret;
850job_unlock();
851
852if (!job_ret) {
853job_commit(job);
854} else {
855job_abort(job);
856}
857job_clean(job);
858
859if (job->cb) {
860job->cb(job->opaque, job_ret);
861}
862
863job_lock();
864
865/* Emit events only if we actually started */
866if (job_started_locked(job)) {
867if (job_is_cancelled_locked(job)) {
868job_event_cancelled_locked(job);
869} else {
870job_event_completed_locked(job);
871}
872}
873
874job_txn_del_job_locked(job);
875job_conclude_locked(job);
876return 0;
877}
878
879/*
880* Called with job_mutex held, but releases it temporarily.
881*/
882static void job_cancel_async_locked(Job *job, bool force)
883{
884GLOBAL_STATE_CODE();
885if (job->driver->cancel) {
886job_unlock();
887force = job->driver->cancel(job, force);
888job_lock();
889} else {
890/* No .cancel() means the job will behave as if force-cancelled */
891force = true;
892}
893
894if (job->user_paused) {
895/* Do not call job_enter here, the caller will handle it. */
896if (job->driver->user_resume) {
897job_unlock();
898job->driver->user_resume(job);
899job_lock();
900}
901job->user_paused = false;
902assert(job->pause_count > 0);
903job->pause_count--;
904}
905
906/*
907* Ignore soft cancel requests after the job is already done
908* (We will still invoke job->driver->cancel() above, but if the
909* job driver supports soft cancelling and the job is done, that
910* should be a no-op, too. We still call it so it can override
911* @force.)
912*/
913if (force || !job->deferred_to_main_loop) {
914job->cancelled = true;
915/* To prevent 'force == false' overriding a previous 'force == true' */
916job->force_cancel |= force;
917}
918}
919
920/*
921* Called with job_mutex held, but releases it temporarily.
922*/
923static void job_completed_txn_abort_locked(Job *job)
924{
925JobTxn *txn = job->txn;
926Job *other_job;
927
928if (txn->aborting) {
929/*
930* We are cancelled by another job, which will handle everything.
931*/
932return;
933}
934txn->aborting = true;
935job_txn_ref_locked(txn);
936
937job_ref_locked(job);
938
939/* Other jobs are effectively cancelled by us, set the status for
940* them; this job, however, may or may not be cancelled, depending
941* on the caller, so leave it. */
942QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
943if (other_job != job) {
944/*
945* This is a transaction: If one job failed, no result will matter.
946* Therefore, pass force=true to terminate all other jobs as quickly
947* as possible.
948*/
949job_cancel_async_locked(other_job, true);
950}
951}
952while (!QLIST_EMPTY(&txn->jobs)) {
953other_job = QLIST_FIRST(&txn->jobs);
954if (!job_is_completed_locked(other_job)) {
955assert(job_cancel_requested_locked(other_job));
956job_finish_sync_locked(other_job, NULL, NULL);
957}
958job_finalize_single_locked(other_job);
959}
960
961job_unref_locked(job);
962job_txn_unref_locked(txn);
963}
964
965/* Called with job_mutex held, but releases it temporarily */
966static int job_prepare_locked(Job *job)
967{
968int ret;
969
970GLOBAL_STATE_CODE();
971
972if (job->ret == 0 && job->driver->prepare) {
973job_unlock();
974ret = job->driver->prepare(job);
975job_lock();
976job->ret = ret;
977job_update_rc_locked(job);
978}
979
980return job->ret;
981}
982
983/* Called with job_mutex held */
984static int job_needs_finalize_locked(Job *job)
985{
986return !job->auto_finalize;
987}
988
989/* Called with job_mutex held */
990static void job_do_finalize_locked(Job *job)
991{
992int rc;
993assert(job && job->txn);
994
995/* prepare the transaction to complete */
996rc = job_txn_apply_locked(job, job_prepare_locked);
997if (rc) {
998job_completed_txn_abort_locked(job);
999} else {
1000job_txn_apply_locked(job, job_finalize_single_locked);
1001}
1002}
1003
1004void job_finalize_locked(Job *job, Error **errp)
1005{
1006assert(job && job->id);
1007if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1008return;
1009}
1010job_do_finalize_locked(job);
1011}
1012
1013/* Called with job_mutex held. */
1014static int job_transition_to_pending_locked(Job *job)
1015{
1016job_state_transition_locked(job, JOB_STATUS_PENDING);
1017if (!job->auto_finalize) {
1018job_event_pending_locked(job);
1019}
1020return 0;
1021}
1022
1023void job_transition_to_ready(Job *job)
1024{
1025JOB_LOCK_GUARD();
1026job_state_transition_locked(job, JOB_STATUS_READY);
1027job_event_ready_locked(job);
1028}
1029
1030/* Called with job_mutex held. */
1031static void job_completed_txn_success_locked(Job *job)
1032{
1033JobTxn *txn = job->txn;
1034Job *other_job;
1035
1036job_state_transition_locked(job, JOB_STATUS_WAITING);
1037
1038/*
1039* Successful completion, see if there are other running jobs in this
1040* txn.
1041*/
1042QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1043if (!job_is_completed_locked(other_job)) {
1044return;
1045}
1046assert(other_job->ret == 0);
1047}
1048
1049job_txn_apply_locked(job, job_transition_to_pending_locked);
1050
1051/* If no jobs need manual finalization, automatically do so */
1052if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1053job_do_finalize_locked(job);
1054}
1055}
1056
1057/* Called with job_mutex held. */
1058static void job_completed_locked(Job *job)
1059{
1060assert(job && job->txn && !job_is_completed_locked(job));
1061
1062job_update_rc_locked(job);
1063trace_job_completed(job, job->ret);
1064if (job->ret) {
1065job_completed_txn_abort_locked(job);
1066} else {
1067job_completed_txn_success_locked(job);
1068}
1069}
1070
1071/**
1072* Useful only as a type shim for aio_bh_schedule_oneshot.
1073* Called with job_mutex *not* held.
1074*/
1075static void job_exit(void *opaque)
1076{
1077Job *job = (Job *)opaque;
1078JOB_LOCK_GUARD();
1079job_ref_locked(job);
1080
1081/* This is a lie, we're not quiescent, but still doing the completion
1082* callbacks. However, completion callbacks tend to involve operations that
1083* drain block nodes, and if .drained_poll still returned true, we would
1084* deadlock. */
1085job->busy = false;
1086job_event_idle_locked(job);
1087
1088job_completed_locked(job);
1089job_unref_locked(job);
1090}
1091
1092/**
1093* All jobs must allow a pause point before entering their job proper. This
1094* ensures that jobs can be paused prior to being started, then resumed later.
1095*/
1096static void coroutine_fn job_co_entry(void *opaque)
1097{
1098Job *job = opaque;
1099int ret;
1100
1101assert(job && job->driver && job->driver->run);
1102WITH_JOB_LOCK_GUARD() {
1103assert(job->aio_context == qemu_get_current_aio_context());
1104job_pause_point_locked(job);
1105}
1106ret = job->driver->run(job, &job->err);
1107WITH_JOB_LOCK_GUARD() {
1108job->ret = ret;
1109job->deferred_to_main_loop = true;
1110job->busy = true;
1111}
1112aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1113}
1114
1115void job_start(Job *job)
1116{
1117assert(qemu_in_main_thread());
1118
1119WITH_JOB_LOCK_GUARD() {
1120assert(job && !job_started_locked(job) && job->paused &&
1121job->driver && job->driver->run);
1122job->co = qemu_coroutine_create(job_co_entry, job);
1123job->pause_count--;
1124job->busy = true;
1125job->paused = false;
1126job_state_transition_locked(job, JOB_STATUS_RUNNING);
1127}
1128aio_co_enter(job->aio_context, job->co);
1129}
1130
1131void job_cancel_locked(Job *job, bool force)
1132{
1133if (job->status == JOB_STATUS_CONCLUDED) {
1134job_do_dismiss_locked(job);
1135return;
1136}
1137job_cancel_async_locked(job, force);
1138if (!job_started_locked(job)) {
1139job_completed_locked(job);
1140} else if (job->deferred_to_main_loop) {
1141/*
1142* job_cancel_async() ignores soft-cancel requests for jobs
1143* that are already done (i.e. deferred to the main loop). We
1144* have to check again whether the job is really cancelled.
1145* (job_cancel_requested() and job_is_cancelled() are equivalent
1146* here, because job_cancel_async() will make soft-cancel
1147* requests no-ops when deferred_to_main_loop is true. We
1148* choose to call job_is_cancelled() to show that we invoke
1149* job_completed_txn_abort() only for force-cancelled jobs.)
1150*/
1151if (job_is_cancelled_locked(job)) {
1152job_completed_txn_abort_locked(job);
1153}
1154} else {
1155job_enter_cond_locked(job, NULL);
1156}
1157}
1158
1159void job_user_cancel_locked(Job *job, bool force, Error **errp)
1160{
1161if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1162return;
1163}
1164job_cancel_locked(job, force);
1165}
1166
1167/* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1168* be used with job_finish_sync_locked() without the need for (rather nasty)
1169* function pointer casts there.
1170*
1171* Called with job_mutex held.
1172*/
1173static void job_cancel_err_locked(Job *job, Error **errp)
1174{
1175job_cancel_locked(job, false);
1176}
1177
1178/**
1179* Same as job_cancel_err(), but force-cancel.
1180* Called with job_mutex held.
1181*/
1182static void job_force_cancel_err_locked(Job *job, Error **errp)
1183{
1184job_cancel_locked(job, true);
1185}
1186
1187int job_cancel_sync_locked(Job *job, bool force)
1188{
1189if (force) {
1190return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1191} else {
1192return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1193}
1194}
1195
1196int job_cancel_sync(Job *job, bool force)
1197{
1198JOB_LOCK_GUARD();
1199return job_cancel_sync_locked(job, force);
1200}
1201
1202void job_cancel_sync_all(void)
1203{
1204Job *job;
1205JOB_LOCK_GUARD();
1206
1207while ((job = job_next_locked(NULL))) {
1208job_cancel_sync_locked(job, true);
1209}
1210}
1211
1212int job_complete_sync_locked(Job *job, Error **errp)
1213{
1214return job_finish_sync_locked(job, job_complete_locked, errp);
1215}
1216
1217void job_complete_locked(Job *job, Error **errp)
1218{
1219/* Should not be reachable via external interface for internal jobs */
1220assert(job->id);
1221GLOBAL_STATE_CODE();
1222if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1223return;
1224}
1225if (job_cancel_requested_locked(job) || !job->driver->complete) {
1226error_setg(errp, "The active block job '%s' cannot be completed",
1227job->id);
1228return;
1229}
1230
1231job_unlock();
1232job->driver->complete(job, errp);
1233job_lock();
1234}
1235
1236int job_finish_sync_locked(Job *job,
1237void (*finish)(Job *, Error **errp),
1238Error **errp)
1239{
1240Error *local_err = NULL;
1241int ret;
1242GLOBAL_STATE_CODE();
1243
1244job_ref_locked(job);
1245
1246if (finish) {
1247finish(job, &local_err);
1248}
1249if (local_err) {
1250error_propagate(errp, local_err);
1251job_unref_locked(job);
1252return -EBUSY;
1253}
1254
1255job_unlock();
1256AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1257(job_enter(job), !job_is_completed(job)));
1258job_lock();
1259
1260ret = (job_is_cancelled_locked(job) && job->ret == 0)
1261? -ECANCELED : job->ret;
1262job_unref_locked(job);
1263return ret;
1264}
1265