glusterfs

Форк
0
1028 строк · 26.2 Кб
1
/*
2
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3
  This file is part of GlusterFS.
4

5
  This file is licensed to you under your choice of the GNU Lesser
6
  General Public License, version 3 or any later version (LGPLv3 or
7
  later), or the GNU General Public License, version 2 (GPLv2), in all
8
  cases as published by the Free Software Foundation.
9
*/
10

11
#include <glusterfs/glusterfs.h>
12
#include <glusterfs/logging.h>
13
#include <glusterfs/dict.h>
14
#include "io-cache.h"
15
#include "ioc-mem-types.h"
16
#include <assert.h>
17
#include <sys/time.h>
18
#include "io-cache-messages.h"
19
char
20
ioc_empty(struct ioc_cache *cache)
21
{
22
    char is_empty = -1;
23

24
    GF_VALIDATE_OR_GOTO("io-cache", cache, out);
25

26
    is_empty = list_empty(&cache->page_lru);
27

28
out:
29
    return is_empty;
30
}
31

32
ioc_page_t *
33
__ioc_page_get(ioc_inode_t *ioc_inode, off_t offset)
34
{
35
    ioc_page_t *page = NULL;
36
    ioc_table_t *table = NULL;
37
    off_t rounded_offset = 0;
38

39
    GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);
40

41
    table = ioc_inode->table;
42
    GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);
43

44
    rounded_offset = gf_floor(offset, table->page_size);
45

46
    page = rbthash_get(ioc_inode->cache.page_table, &rounded_offset,
47
                       sizeof(rounded_offset));
48

49
    if (page != NULL) {
50
        /* push the page to the end of the lru list */
51
        list_move_tail(&page->page_lru, &ioc_inode->cache.page_lru);
52
    }
53

54
out:
55
    return page;
56
}
57

58
ioc_page_t *
59
ioc_page_get(ioc_inode_t *ioc_inode, off_t offset)
60
{
61
    ioc_page_t *page = NULL;
62

63
    if (ioc_inode == NULL) {
64
        goto out;
65
    }
66

67
    ioc_inode_lock(ioc_inode);
68
    {
69
        page = __ioc_page_get(ioc_inode, offset);
70
    }
71
    ioc_inode_unlock(ioc_inode);
72

73
out:
74
    return page;
75
}
76

77
/*
78
 * __ioc_page_destroy -
79
 *
80
 * @page:
81
 *
82
 */
83
int64_t
84
__ioc_page_destroy(ioc_page_t *page)
85
{
86
    int64_t page_size = 0;
87

88
    GF_VALIDATE_OR_GOTO("io-cache", page, out);
89

90
    if (page->iobref)
91
        page_size = iobref_size(page->iobref);
92

93
    if (page->waitq) {
94
        /* frames waiting on this page, do not destroy this page */
95
        page_size = -1;
96
        page->stale = 1;
97
    } else {
98
        rbthash_remove(page->inode->cache.page_table, &page->offset,
99
                       sizeof(page->offset));
100
        list_del(&page->page_lru);
101

102
        gf_msg_trace(page->inode->table->xl->name, 0,
103
                     "destroying page = %p, offset = %" PRId64
104
                     " "
105
                     "&& inode = %p",
106
                     page, page->offset, page->inode);
107

108
        if (page->vector) {
109
            iobref_unref(page->iobref);
110
            GF_FREE(page->vector);
111
            page->vector = NULL;
112
        }
113

114
        page->inode = NULL;
115
    }
116

117
    if (page_size != -1) {
118
        pthread_mutex_destroy(&page->page_lock);
119
        GF_FREE(page);
120
    }
121

122
out:
123
    return page_size;
124
}
125

126
int64_t
127
ioc_page_destroy(ioc_page_t *page)
128
{
129
    int64_t ret = 0;
130
    struct ioc_inode *inode = NULL;
131

132
    if (page == NULL) {
133
        goto out;
134
    }
135

136
    ioc_inode_lock(page->inode);
137
    {
138
        inode = page->inode;
139
        ret = __ioc_page_destroy(page);
140
    }
141
    ioc_inode_unlock(inode);
142

143
out:
144
    return ret;
145
}
146

147
int32_t
148
__ioc_inode_prune(ioc_inode_t *curr, uint64_t *size_pruned,
149
                  uint64_t size_to_prune, uint32_t index)
150
{
151
    ioc_page_t *page = NULL, *next = NULL;
152
    int32_t ret = 0;
153
    ioc_table_t *table = NULL;
154

155
    if (curr == NULL) {
156
        goto out;
157
    }
158

159
    table = curr->table;
160

161
    list_for_each_entry_safe(page, next, &curr->cache.page_lru, page_lru)
162
    {
163
        *size_pruned += page->size;
164
        ret = __ioc_page_destroy(page);
165

166
        if (ret != -1)
167
            table->cache_used -= ret;
168

169
        gf_msg_trace(table->xl->name, 0,
170
                     "index = %d && "
171
                     "table->cache_used = %" PRIu64
172
                     " && table->"
173
                     "cache_size = %" PRIu64,
174
                     index, table->cache_used, table->cache_size);
175

176
        if ((*size_pruned) >= size_to_prune)
177
            break;
178
    }
179

180
    if (ioc_empty(&curr->cache)) {
181
        list_del_init(&curr->inode_lru);
182
    }
183

184
out:
185
    return 0;
186
}
187
/*
188
 * ioc_prune - prune the cache. we have a limit to the number of pages we
189
 *             can have in-memory.
190
 *
191
 * @table: ioc_table_t of this translator
192
 *
193
 */
194
int32_t
195
ioc_prune(ioc_table_t *table)
196
{
197
    ioc_inode_t *curr = NULL, *next_ioc_inode = NULL;
198
    int32_t index = 0;
199
    uint64_t size_to_prune = 0;
200
    uint64_t size_pruned = 0;
201

202
    GF_VALIDATE_OR_GOTO("io-cache", table, out);
203

204
    ioc_table_lock(table);
205
    {
206
        size_to_prune = table->cache_used - table->cache_size;
207
        /* take out the least recently used inode */
208
        for (index = 0; index < table->max_pri; index++) {
209
            list_for_each_entry_safe(curr, next_ioc_inode,
210
                                     &table->inode_lru[index], inode_lru)
211
            {
212
                /* prune page-by-page for this inode, till
213
                 * we reach the equilibrium */
214
                ioc_inode_lock(curr);
215
                {
216
                    __ioc_inode_prune(curr, &size_pruned, size_to_prune, index);
217
                }
218
                ioc_inode_unlock(curr);
219

220
                if (size_pruned >= size_to_prune)
221
                    break;
222
            } /* list_for_each_entry_safe (curr...) */
223

224
            if (size_pruned >= size_to_prune)
225
                break;
226
        } /* for(index=0;...) */
227

228
    } /* ioc_inode_table locked region end */
229
    ioc_table_unlock(table);
230

231
out:
232
    return 0;
233
}
234

235
/*
236
 * __ioc_page_create - create a new page.
237
 *
238
 * @ioc_inode:
239
 * @offset:
240
 *
241
 */
242
ioc_page_t *
243
__ioc_page_create(ioc_inode_t *ioc_inode, off_t offset)
244
{
245
    ioc_table_t *table = NULL;
246
    ioc_page_t *page = NULL;
247
    off_t rounded_offset = 0;
248
    ioc_page_t *newpage = NULL;
249

250
    GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);
251

252
    table = ioc_inode->table;
253
    GF_VALIDATE_OR_GOTO("io-cache", table, out);
254

255
    rounded_offset = gf_floor(offset, table->page_size);
256

257
    newpage = GF_CALLOC(1, sizeof(*newpage), gf_ioc_mt_ioc_newpage_t);
258
    if (newpage == NULL) {
259
        goto out;
260
    }
261

262
    if (!ioc_inode) {
263
        GF_FREE(newpage);
264
        newpage = NULL;
265
        goto out;
266
    }
267

268
    newpage->offset = rounded_offset;
269
    newpage->inode = ioc_inode;
270
    pthread_mutex_init(&newpage->page_lock, NULL);
271

272
    rbthash_insert(ioc_inode->cache.page_table, newpage, &rounded_offset,
273
                   sizeof(rounded_offset));
274

275
    list_add_tail(&newpage->page_lru, &ioc_inode->cache.page_lru);
276

277
    page = newpage;
278

279
    gf_msg_trace("io-cache", 0, "returning new page %p", page);
280

281
out:
282
    return page;
283
}
284

285
/*
286
 * ioc_wait_on_page - pause a frame to wait till the arrival of a page.
287
 * here we need to handle the case when the frame who calls wait_on_page
288
 * himself has caused page_fault
289
 *
290
 * @page: page to wait on
291
 * @frame: call frame who is waiting on page
292
 *
293
 */
294
void
295
__ioc_wait_on_page(ioc_page_t *page, call_frame_t *frame, off_t offset,
296
                   size_t size)
297
{
298
    ioc_waitq_t *waitq = NULL;
299
    ioc_local_t *local = NULL;
300

301
    GF_VALIDATE_OR_GOTO("io-cache", frame, out);
302
    local = frame->local;
303

304
    GF_VALIDATE_OR_GOTO(frame->this->name, local, out);
305

306
    if (page == NULL) {
307
        local->op_ret = -1;
308
        local->op_errno = ENOMEM;
309
        gf_smsg(frame->this->name, GF_LOG_WARNING, 0,
310
                IO_CACHE_MSG_NULL_PAGE_WAIT, NULL);
311
        goto out;
312
    }
313

314
    waitq = GF_CALLOC(1, sizeof(*waitq), gf_ioc_mt_ioc_waitq_t);
315
    if (waitq == NULL) {
316
        local->op_ret = -1;
317
        local->op_errno = ENOMEM;
318
        goto out;
319
    }
320

321
    gf_msg_trace(frame->this->name, 0,
322
                 "frame(%p) waiting on page = %p, offset=%" PRId64
323
                 ", "
324
                 "size=%" GF_PRI_SIZET "",
325
                 frame, page, offset, size);
326

327
    waitq->data = frame;
328
    waitq->next = page->waitq;
329
    waitq->pending_offset = offset;
330
    waitq->pending_size = size;
331
    page->waitq = waitq;
332
    /* one frame can wait only once on a given page,
333
     * local->wait_count is number of pages a frame is waiting on */
334
    ioc_local_lock(local);
335
    {
336
        local->wait_count++;
337
    }
338
    ioc_local_unlock(local);
339

340
out:
341
    return;
342
}
343

344
/*
345
 * ioc_cache_still_valid - see if cached pages ioc_inode are still valid
346
 * against given stbuf
347
 *
348
 * @ioc_inode:
349
 * @stbuf:
350
 *
351
 * assumes ioc_inode is locked
352
 */
353
int8_t
354
ioc_cache_still_valid(ioc_inode_t *ioc_inode, struct iatt *stbuf)
355
{
356
    int8_t cache_still_valid = 1;
357

358
    GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);
359

360
#if 0
361
        if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) ||
362
            (stbuf->st_mtim.tv_nsec != ioc_inode->stbuf.st_mtim.tv_nsec))
363
                cache_still_valid = 0;
364

365
#else
366
    if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) ||
367
        (stbuf->ia_mtime_nsec != ioc_inode->cache.mtime_nsec))
368
        cache_still_valid = 0;
369

370
#endif
371

372
#if 0
373
        /* talk with avati@gluster.com to enable this section */
374
        if (!ioc_inode->mtime && stbuf) {
375
                cache_still_valid = 1;
376
                ioc_inode->mtime = stbuf->ia_mtime;
377
        }
378
#endif
379

380
out:
381
    return cache_still_valid;
382
}
383

384
void
385
ioc_waitq_return(ioc_waitq_t *waitq)
386
{
387
    ioc_waitq_t *trav = NULL;
388
    ioc_waitq_t *next = NULL;
389
    call_frame_t *frame = NULL;
390

391
    for (trav = waitq; trav; trav = next) {
392
        next = trav->next;
393

394
        frame = trav->data;
395
        ioc_frame_return(frame);
396
        GF_FREE(trav);
397
    }
398
}
399

400
int
401
ioc_fault_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
402
              int32_t op_errno, struct iovec *vector, int32_t count,
403
              struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
404
{
405
    ioc_local_t *local = NULL;
406
    off_t offset = 0;
407
    ioc_inode_t *ioc_inode = NULL;
408
    ioc_table_t *table = NULL;
409
    ioc_page_t *page = NULL;
410
    int32_t destroy_size = 0;
411
    size_t page_size = 0;
412
    ioc_waitq_t *waitq = NULL;
413
    size_t iobref_page_size = 0;
414
    char zero_filled = 0;
415

416
    GF_ASSERT(frame);
417

418
    local = frame->local;
419
    GF_ASSERT(local);
420

421
    offset = local->pending_offset;
422
    ioc_inode = local->inode;
423
    GF_ASSERT(ioc_inode);
424

425
    table = ioc_inode->table;
426
    GF_ASSERT(table);
427

428
    zero_filled = ((op_ret >= 0) && (stbuf->ia_mtime == 0));
429

430
    ioc_inode_lock(ioc_inode);
431
    {
432
        if (op_ret == -1 ||
433
            !(zero_filled || ioc_cache_still_valid(ioc_inode, stbuf))) {
434
            gf_msg_trace(ioc_inode->table->xl->name, 0,
435
                         "cache for inode(%p) is invalid. flushing "
436
                         "all pages",
437
                         ioc_inode);
438
            destroy_size = __ioc_inode_flush(ioc_inode);
439
        }
440

441
        if ((op_ret >= 0) && !zero_filled) {
442
            ioc_inode->cache.mtime = stbuf->ia_mtime;
443
            ioc_inode->cache.mtime_nsec = stbuf->ia_mtime_nsec;
444
        }
445

446
        ioc_inode->cache.last_revalidate = gf_time();
447

448
        if (op_ret < 0) {
449
            /* error, readv returned -1 */
450
            page = __ioc_page_get(ioc_inode, offset);
451
            if (page)
452
                waitq = __ioc_page_error(page, op_ret, op_errno);
453
        } else {
454
            gf_msg_trace(ioc_inode->table->xl->name, 0, "op_ret = %d", op_ret);
455
            page = __ioc_page_get(ioc_inode, offset);
456
            if (!page) {
457
                /* page was flushed */
458
                /* some serious bug ? */
459
                gf_smsg(frame->this->name, GF_LOG_WARNING, 0,
460
                        IO_CACHE_MSG_WASTED_COPY, "offset=%" PRId64, offset,
461
                        "page-size=%" PRId64, table->page_size, "ioc_inode=%p",
462
                        ioc_inode, NULL);
463
            } else {
464
                if (page->vector) {
465
                    iobref_unref(page->iobref);
466
                    GF_FREE(page->vector);
467
                    page->vector = NULL;
468
                    page->iobref = NULL;
469
                }
470

471
                /* keep a copy of the page for our cache */
472
                page->vector = iov_dup(vector, count);
473
                if (page->vector == NULL) {
474
                    page = __ioc_page_get(ioc_inode, offset);
475
                    if (page != NULL)
476
                        waitq = __ioc_page_error(page, -1, ENOMEM);
477
                    goto unlock;
478
                }
479

480
                page->count = count;
481
                if (iobref) {
482
                    page->iobref = iobref_ref(iobref);
483
                } else {
484
                    /* TODO: we have got a response to
485
                     * our request and no data */
486
                    gf_smsg(frame->this->name, GF_LOG_CRITICAL, ENOMEM,
487
                            IO_CACHE_MSG_FRAME_NULL, NULL);
488
                } /* if(frame->root->rsp_refs) */
489

490
                /* page->size should indicate exactly how
491
                 * much the readv call to the child
492
                 * translator returned. earlier op_ret
493
                 * from child translator was used, which
494
                 * gave rise to a bug where reads from
495
                 * io-cached volume were resulting in 0
496
                 * byte replies */
497
                page_size = iov_length(vector, count);
498
                page->size = page_size;
499
                page->op_errno = op_errno;
500

501
                iobref_page_size = iobref_size(page->iobref);
502

503
                if (page->waitq) {
504
                    /* wake up all the frames waiting on
505
                     * this page, including
506
                     * the frame which triggered fault */
507
                    waitq = __ioc_page_wakeup(page, op_errno);
508
                } /* if(page->waitq) */
509
            }     /* if(!page)...else */
510
        }         /* if(op_ret < 0)...else */
511
    }             /* ioc_inode locked region end */
512
unlock:
513
    ioc_inode_unlock(ioc_inode);
514

515
    ioc_waitq_return(waitq);
516

517
    if (iobref_page_size) {
518
        ioc_table_lock(table);
519
        {
520
            table->cache_used += iobref_page_size;
521
        }
522
        ioc_table_unlock(table);
523
    }
524

525
    if (destroy_size) {
526
        ioc_table_lock(table);
527
        {
528
            table->cache_used -= destroy_size;
529
        }
530
        ioc_table_unlock(table);
531
    }
532

533
    if (ioc_need_prune(ioc_inode->table)) {
534
        ioc_prune(ioc_inode->table);
535
    }
536

537
    gf_msg_trace(frame->this->name, 0, "fault frame %p returned", frame);
538
    pthread_mutex_destroy(&local->local_lock);
539

540
    fd_unref(local->fd);
541
    if (local->xattr_req)
542
        dict_unref(local->xattr_req);
543

544
    STACK_DESTROY(frame->root);
545
    return 0;
546
}
547

548
/*
549
 * ioc_page_fault -
550
 *
551
 * @ioc_inode:
552
 * @frame:
553
 * @fd:
554
 * @offset:
555
 *
556
 */
557
void
558
ioc_page_fault(ioc_inode_t *ioc_inode, call_frame_t *frame, fd_t *fd,
559
               off_t offset)
560
{
561
    ioc_table_t *table = NULL;
562
    call_frame_t *fault_frame = NULL;
563
    ioc_local_t *fault_local = NULL;
564
    ioc_local_t *local = NULL;
565
    int32_t op_ret = -1, op_errno = -1;
566
    ioc_waitq_t *waitq = NULL;
567
    ioc_page_t *page = NULL;
568

569
    GF_ASSERT(ioc_inode);
570
    if (frame == NULL) {
571
        op_ret = -1;
572
        op_errno = EINVAL;
573
        gf_smsg("io-cache", GF_LOG_WARNING, EINVAL, IO_CACHE_MSG_PAGE_FAULT,
574
                NULL);
575
        goto err;
576
    }
577

578
    table = ioc_inode->table;
579
    fault_frame = copy_frame(frame);
580
    if (fault_frame == NULL) {
581
        op_ret = -1;
582
        op_errno = ENOMEM;
583
        goto err;
584
    }
585

586
    local = frame->local;
587
    fault_local = mem_get0(THIS->local_pool);
588
    if (fault_local == NULL) {
589
        op_ret = -1;
590
        op_errno = ENOMEM;
591
        STACK_DESTROY(fault_frame->root);
592
        goto err;
593
    }
594

595
    /* NOTE: copy_frame() means, the frame the fop whose fd_ref we
596
     * are using till now won't be valid till we get reply from server.
597
     * we unref this fd, in fault_cbk */
598
    fault_local->fd = fd_ref(fd);
599

600
    fault_frame->local = fault_local;
601
    pthread_mutex_init(&fault_local->local_lock, NULL);
602

603
    INIT_LIST_HEAD(&fault_local->fill_list);
604
    fault_local->pending_offset = offset;
605
    fault_local->pending_size = table->page_size;
606
    fault_local->inode = ioc_inode;
607

608
    if (local && local->xattr_req)
609
        fault_local->xattr_req = dict_ref(local->xattr_req);
610

611
    gf_msg_trace(frame->this->name, 0,
612
                 "stack winding page fault for offset = %" PRId64
613
                 " with "
614
                 "frame %p",
615
                 offset, fault_frame);
616

617
    STACK_WIND(fault_frame, ioc_fault_cbk, FIRST_CHILD(fault_frame->this),
618
               FIRST_CHILD(fault_frame->this)->fops->readv, fd,
619
               table->page_size, offset, 0, fault_local->xattr_req);
620
    return;
621

622
err:
623
    ioc_inode_lock(ioc_inode);
624
    {
625
        page = __ioc_page_get(ioc_inode, offset);
626
        if (page != NULL) {
627
            waitq = __ioc_page_error(page, op_ret, op_errno);
628
        }
629
    }
630
    ioc_inode_unlock(ioc_inode);
631

632
    if (waitq != NULL) {
633
        ioc_waitq_return(waitq);
634
    }
635
}
636

637
int32_t
638
__ioc_frame_fill(ioc_page_t *page, call_frame_t *frame, off_t offset,
639
                 size_t size, int32_t op_errno)
640
{
641
    ioc_local_t *local = NULL;
642
    ioc_fill_t *fill = NULL;
643
    off_t src_offset = 0;
644
    off_t dst_offset = 0;
645
    ssize_t copy_size = 0;
646
    ioc_inode_t *ioc_inode = NULL;
647
    ioc_fill_t *new = NULL;
648
    int8_t found = 0;
649
    int32_t ret = -1;
650

651
    GF_VALIDATE_OR_GOTO("io-cache", frame, out);
652

653
    local = frame->local;
654
    GF_VALIDATE_OR_GOTO(frame->this->name, local, out);
655

656
    if (page == NULL) {
657
        gf_smsg(frame->this->name, GF_LOG_WARNING, 0,
658
                IO_CACHE_MSG_SERVE_READ_REQUEST, NULL);
659
        local->op_ret = -1;
660
        local->op_errno = EINVAL;
661
        goto out;
662
    }
663

664
    ioc_inode = page->inode;
665

666
    gf_msg_trace(frame->this->name, 0,
667
                 "frame (%p) offset = %" PRId64 " && size = %" GF_PRI_SIZET
668
                 " "
669
                 "&& page->size = %" GF_PRI_SIZET " && wait_count = %d",
670
                 frame, offset, size, page->size, local->wait_count);
671

672
    /* immediately move this page to the end of the page_lru list */
673
    list_move_tail(&page->page_lru, &ioc_inode->cache.page_lru);
674
    /* fill local->pending_size bytes from local->pending_offset */
675
    if (local->op_ret != -1) {
676
        local->op_errno = op_errno;
677

678
        if (page->size == 0) {
679
            goto done;
680
        }
681

682
        if (offset > page->offset)
683
            /* offset is offset in file, convert it to offset in
684
             * page */
685
            src_offset = offset - page->offset;
686
        /*FIXME: since offset is the offset within page is the
687
         * else case valid? */
688
        else
689
            /* local->pending_offset is in previous page. do not
690
             * fill until we have filled all previous pages */
691
            dst_offset = page->offset - offset;
692

693
        /* we have to copy from offset to either end of this page
694
         * or till the requested size */
695
        copy_size = min(page->size - src_offset, size - dst_offset);
696

697
        if (copy_size < 0) {
698
            /* if page contains fewer bytes and the required offset
699
               is beyond the page size in the page */
700
            copy_size = src_offset = 0;
701
        }
702

703
        gf_msg_trace(page->inode->table->xl->name, 0,
704
                     "copy_size = %" GF_PRI_SIZET
705
                     " && src_offset = "
706
                     "%" PRId64 " && dst_offset = %" PRId64 "",
707
                     copy_size, src_offset, dst_offset);
708

709
        {
710
            new = GF_CALLOC(1, sizeof(*new), gf_ioc_mt_ioc_fill_t);
711
            if (new == NULL) {
712
                local->op_ret = -1;
713
                local->op_errno = ENOMEM;
714
                goto out;
715
            }
716

717
            new->offset = page->offset;
718
            new->size = copy_size;
719
            new->iobref = iobref_ref(page->iobref);
720
            new->count = iov_subset(page->vector, page->count, src_offset,
721
                                    copy_size, &new->vector, 0);
722
            if (new->count < 0) {
723
                local->op_ret = -1;
724
                local->op_errno = ENOMEM;
725

726
                iobref_unref(new->iobref);
727
                GF_FREE(new);
728
                goto out;
729
            }
730

731
            /* add the ioc_fill to fill_list for this frame */
732
            if (list_empty(&local->fill_list)) {
733
                /* if list is empty, then this is the first
734
                 * time we are filling frame, add the
735
                 * ioc_fill_t to the end of list */
736
                list_add_tail(&new->list, &local->fill_list);
737
            } else {
738
                found = 0;
739
                /* list is not empty, we need to look for
740
                 * where this offset fits in list */
741
                list_for_each_entry(fill, &local->fill_list, list)
742
                {
743
                    if (fill->offset > new->offset) {
744
                        found = 1;
745
                        break;
746
                    }
747
                }
748

749
                if (found) {
750
                    list_add_tail(&new->list, &fill->list);
751
                } else {
752
                    list_add_tail(&new->list, &local->fill_list);
753
                }
754
            }
755
        }
756

757
        local->op_ret += copy_size;
758
    }
759

760
done:
761
    ret = 0;
762
out:
763
    return ret;
764
}
765

766
/*
767
 * ioc_frame_unwind - frame unwinds only from here
768
 *
769
 * @frame: call frame to unwind
770
 *
771
 * to be used only by ioc_frame_return(), when a frame has
772
 * finished waiting on all pages, required
773
 *
774
 */
775
static void
776
ioc_frame_unwind(call_frame_t *frame)
777
{
778
    ioc_local_t *local = NULL;
779
    ioc_fill_t *fill = NULL, *next = NULL;
780
    int32_t count = 0;
781
    struct iovec *vector = NULL;
782
    int32_t copied = 0;
783
    struct iobref *iobref = NULL;
784
    struct iatt stbuf = {
785
        0,
786
    };
787
    int32_t op_ret = 0, op_errno = 0;
788

789
    GF_ASSERT(frame);
790

791
    local = frame->local;
792
    if (local == NULL) {
793
        gf_smsg(frame->this->name, GF_LOG_WARNING, ENOMEM,
794
                IO_CACHE_MSG_LOCAL_NULL, NULL);
795
        op_ret = -1;
796
        op_errno = ENOMEM;
797
        goto unwind;
798
    }
799

800
    /* gnfs requires op_errno to determine is_eof */
801
    op_errno = local->op_errno;
802

803
    if (local->op_ret < 0) {
804
        op_ret = local->op_ret;
805
        goto unwind;
806
    }
807

808
    //  ioc_local_lock (local);
809
    iobref = iobref_new();
810
    if (iobref == NULL) {
811
        op_ret = -1;
812
        op_errno = ENOMEM;
813
    }
814

815
    if (list_empty(&local->fill_list)) {
816
        gf_msg_trace(frame->this->name, 0,
817
                     "frame(%p) has 0 entries in local->fill_list "
818
                     "(offset = %" PRId64 " && size = %" GF_PRI_SIZET ")",
819
                     frame, local->offset, local->size);
820
    }
821

822
    list_for_each_entry(fill, &local->fill_list, list) { count += fill->count; }
823

824
    vector = GF_CALLOC(count, sizeof(*vector), gf_ioc_mt_iovec);
825
    if (vector == NULL) {
826
        op_ret = -1;
827
        op_errno = ENOMEM;
828
    }
829

830
    list_for_each_entry_safe(fill, next, &local->fill_list, list)
831
    {
832
        /* # TODO: check why this if clause is needed at all. */
833
        if ((vector != NULL) && (iobref != NULL)) {
834
            memcpy(((char *)vector) + copied, fill->vector,
835
                   fill->count * sizeof(*vector));
836

837
            copied += (fill->count * sizeof(*vector));
838

839
            if (iobref_merge(iobref, fill->iobref)) {
840
                op_ret = -1;
841
                op_errno = ENOMEM;
842
            }
843
        }
844

845
        list_del(&fill->list);
846
        iobref_unref(fill->iobref);
847
        GF_FREE(fill->vector);
848
        GF_FREE(fill);
849
    }
850

851
    if (op_ret != -1) {
852
        op_ret = iov_length(vector, count);
853
    }
854

855
unwind:
856
    gf_msg_trace(frame->this->name, 0, "frame(%p) unwinding with op_ret=%d",
857
                 frame, op_ret);
858

859
    //  ioc_local_unlock (local);
860

861
    frame->local = NULL;
862
    STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, vector, count, &stbuf,
863
                        iobref, NULL);
864

865
    if (iobref != NULL) {
866
        iobref_unref(iobref);
867
    }
868

869
    if (vector != NULL) {
870
        GF_FREE(vector);
871
        vector = NULL;
872
    }
873

874
    if (local) {
875
        if (local->xattr_req)
876
            dict_unref(local->xattr_req);
877
        pthread_mutex_destroy(&local->local_lock);
878
        mem_put(local);
879
    }
880
    return;
881
}
882

883
/*
884
 * ioc_frame_return -
885
 * @frame:
886
 *
887
 * to be called only when a frame is waiting on an in-transit page
888
 */
889
void
890
ioc_frame_return(call_frame_t *frame)
891
{
892
    ioc_local_t *local = NULL;
893
    int32_t wait_count = 0;
894

895
    GF_ASSERT(frame);
896

897
    local = frame->local;
898
    GF_ASSERT(local->wait_count > 0);
899

900
    ioc_local_lock(local);
901
    {
902
        wait_count = --local->wait_count;
903
    }
904
    ioc_local_unlock(local);
905

906
    if (!wait_count) {
907
        ioc_frame_unwind(frame);
908
    }
909

910
    return;
911
}
912

913
/*
914
 * ioc_page_wakeup -
915
 * @page:
916
 *
917
 * to be called only when a frame is waiting on an in-transit page
918
 */
919
ioc_waitq_t *
920
__ioc_page_wakeup(ioc_page_t *page, int32_t op_errno)
921
{
922
    ioc_waitq_t *waitq = NULL, *trav = NULL;
923
    call_frame_t *frame = NULL;
924
    int32_t ret = -1;
925

926
    GF_VALIDATE_OR_GOTO("io-cache", page, out);
927

928
    waitq = page->waitq;
929
    page->waitq = NULL;
930

931
    page->ready = 1;
932

933
    gf_msg_trace(page->inode->table->xl->name, 0, "page is %p && waitq = %p",
934
                 page, waitq);
935

936
    for (trav = waitq; trav; trav = trav->next) {
937
        frame = trav->data;
938
        ret = __ioc_frame_fill(page, frame, trav->pending_offset,
939
                               trav->pending_size, op_errno);
940
        if (ret == -1) {
941
            break;
942
        }
943
    }
944

945
    if (page->stale) {
946
        __ioc_page_destroy(page);
947
    }
948

949
out:
950
    return waitq;
951
}
952

953
/*
954
 * ioc_page_error -
955
 * @page:
956
 * @op_ret:
957
 * @op_errno:
958
 *
959
 */
960
ioc_waitq_t *
961
__ioc_page_error(ioc_page_t *page, int32_t op_ret, int32_t op_errno)
962
{
963
    ioc_waitq_t *waitq = NULL, *trav = NULL;
964
    call_frame_t *frame = NULL;
965
    int64_t ret = 0;
966
    ioc_table_t *table = NULL;
967
    ioc_local_t *local = NULL;
968

969
    GF_VALIDATE_OR_GOTO("io-cache", page, out);
970

971
    waitq = page->waitq;
972
    page->waitq = NULL;
973

974
    gf_msg_debug(page->inode->table->xl->name, 0,
975
                 "page error for page = %p & waitq = %p", page, waitq);
976

977
    for (trav = waitq; trav; trav = trav->next) {
978
        frame = trav->data;
979

980
        local = frame->local;
981
        ioc_local_lock(local);
982
        {
983
            if (local->op_ret != -1) {
984
                local->op_ret = op_ret;
985
                local->op_errno = op_errno;
986
            }
987
        }
988
        ioc_local_unlock(local);
989
    }
990

991
    table = page->inode->table;
992
    ret = __ioc_page_destroy(page);
993

994
    if (ret != -1) {
995
        table->cache_used -= ret;
996
    }
997

998
out:
999
    return waitq;
1000
}
1001

1002
/*
1003
 * ioc_page_error -
1004
 * @page:
1005
 * @op_ret:
1006
 * @op_errno:
1007
 *
1008
 */
1009
ioc_waitq_t *
1010
ioc_page_error(ioc_page_t *page, int32_t op_ret, int32_t op_errno)
1011
{
1012
    ioc_waitq_t *waitq = NULL;
1013
    struct ioc_inode *inode = NULL;
1014

1015
    if (page == NULL) {
1016
        goto out;
1017
    }
1018

1019
    ioc_inode_lock(page->inode);
1020
    {
1021
        inode = page->inode;
1022
        waitq = __ioc_page_error(page, op_ret, op_errno);
1023
    }
1024
    ioc_inode_unlock(inode);
1025

1026
out:
1027
    return waitq;
1028
}
1029

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.