glusterfs
1028 строк · 26.2 Кб
1/*
2Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11#include <glusterfs/glusterfs.h>12#include <glusterfs/logging.h>13#include <glusterfs/dict.h>14#include "io-cache.h"15#include "ioc-mem-types.h"16#include <assert.h>17#include <sys/time.h>18#include "io-cache-messages.h"19char
20ioc_empty(struct ioc_cache *cache)21{
22char is_empty = -1;23
24GF_VALIDATE_OR_GOTO("io-cache", cache, out);25
26is_empty = list_empty(&cache->page_lru);27
28out:29return is_empty;30}
31
32ioc_page_t *33__ioc_page_get(ioc_inode_t *ioc_inode, off_t offset)34{
35ioc_page_t *page = NULL;36ioc_table_t *table = NULL;37off_t rounded_offset = 0;38
39GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);40
41table = ioc_inode->table;42GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);43
44rounded_offset = gf_floor(offset, table->page_size);45
46page = rbthash_get(ioc_inode->cache.page_table, &rounded_offset,47sizeof(rounded_offset));48
49if (page != NULL) {50/* push the page to the end of the lru list */51list_move_tail(&page->page_lru, &ioc_inode->cache.page_lru);52}53
54out:55return page;56}
57
58ioc_page_t *59ioc_page_get(ioc_inode_t *ioc_inode, off_t offset)60{
61ioc_page_t *page = NULL;62
63if (ioc_inode == NULL) {64goto out;65}66
67ioc_inode_lock(ioc_inode);68{69page = __ioc_page_get(ioc_inode, offset);70}71ioc_inode_unlock(ioc_inode);72
73out:74return page;75}
76
77/*
78* __ioc_page_destroy -
79*
80* @page:
81*
82*/
83int64_t
84__ioc_page_destroy(ioc_page_t *page)85{
86int64_t page_size = 0;87
88GF_VALIDATE_OR_GOTO("io-cache", page, out);89
90if (page->iobref)91page_size = iobref_size(page->iobref);92
93if (page->waitq) {94/* frames waiting on this page, do not destroy this page */95page_size = -1;96page->stale = 1;97} else {98rbthash_remove(page->inode->cache.page_table, &page->offset,99sizeof(page->offset));100list_del(&page->page_lru);101
102gf_msg_trace(page->inode->table->xl->name, 0,103"destroying page = %p, offset = %" PRId64104" "105"&& inode = %p",106page, page->offset, page->inode);107
108if (page->vector) {109iobref_unref(page->iobref);110GF_FREE(page->vector);111page->vector = NULL;112}113
114page->inode = NULL;115}116
117if (page_size != -1) {118pthread_mutex_destroy(&page->page_lock);119GF_FREE(page);120}121
122out:123return page_size;124}
125
126int64_t
127ioc_page_destroy(ioc_page_t *page)128{
129int64_t ret = 0;130struct ioc_inode *inode = NULL;131
132if (page == NULL) {133goto out;134}135
136ioc_inode_lock(page->inode);137{138inode = page->inode;139ret = __ioc_page_destroy(page);140}141ioc_inode_unlock(inode);142
143out:144return ret;145}
146
147int32_t
148__ioc_inode_prune(ioc_inode_t *curr, uint64_t *size_pruned,149uint64_t size_to_prune, uint32_t index)150{
151ioc_page_t *page = NULL, *next = NULL;152int32_t ret = 0;153ioc_table_t *table = NULL;154
155if (curr == NULL) {156goto out;157}158
159table = curr->table;160
161list_for_each_entry_safe(page, next, &curr->cache.page_lru, page_lru)162{163*size_pruned += page->size;164ret = __ioc_page_destroy(page);165
166if (ret != -1)167table->cache_used -= ret;168
169gf_msg_trace(table->xl->name, 0,170"index = %d && "171"table->cache_used = %" PRIu64172" && table->"173"cache_size = %" PRIu64,174index, table->cache_used, table->cache_size);175
176if ((*size_pruned) >= size_to_prune)177break;178}179
180if (ioc_empty(&curr->cache)) {181list_del_init(&curr->inode_lru);182}183
184out:185return 0;186}
187/*
188* ioc_prune - prune the cache. we have a limit to the number of pages we
189* can have in-memory.
190*
191* @table: ioc_table_t of this translator
192*
193*/
194int32_t
195ioc_prune(ioc_table_t *table)196{
197ioc_inode_t *curr = NULL, *next_ioc_inode = NULL;198int32_t index = 0;199uint64_t size_to_prune = 0;200uint64_t size_pruned = 0;201
202GF_VALIDATE_OR_GOTO("io-cache", table, out);203
204ioc_table_lock(table);205{206size_to_prune = table->cache_used - table->cache_size;207/* take out the least recently used inode */208for (index = 0; index < table->max_pri; index++) {209list_for_each_entry_safe(curr, next_ioc_inode,210&table->inode_lru[index], inode_lru)211{212/* prune page-by-page for this inode, till213* we reach the equilibrium */
214ioc_inode_lock(curr);215{216__ioc_inode_prune(curr, &size_pruned, size_to_prune, index);217}218ioc_inode_unlock(curr);219
220if (size_pruned >= size_to_prune)221break;222} /* list_for_each_entry_safe (curr...) */223
224if (size_pruned >= size_to_prune)225break;226} /* for(index=0;...) */227
228} /* ioc_inode_table locked region end */229ioc_table_unlock(table);230
231out:232return 0;233}
234
235/*
236* __ioc_page_create - create a new page.
237*
238* @ioc_inode:
239* @offset:
240*
241*/
242ioc_page_t *243__ioc_page_create(ioc_inode_t *ioc_inode, off_t offset)244{
245ioc_table_t *table = NULL;246ioc_page_t *page = NULL;247off_t rounded_offset = 0;248ioc_page_t *newpage = NULL;249
250GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);251
252table = ioc_inode->table;253GF_VALIDATE_OR_GOTO("io-cache", table, out);254
255rounded_offset = gf_floor(offset, table->page_size);256
257newpage = GF_CALLOC(1, sizeof(*newpage), gf_ioc_mt_ioc_newpage_t);258if (newpage == NULL) {259goto out;260}261
262if (!ioc_inode) {263GF_FREE(newpage);264newpage = NULL;265goto out;266}267
268newpage->offset = rounded_offset;269newpage->inode = ioc_inode;270pthread_mutex_init(&newpage->page_lock, NULL);271
272rbthash_insert(ioc_inode->cache.page_table, newpage, &rounded_offset,273sizeof(rounded_offset));274
275list_add_tail(&newpage->page_lru, &ioc_inode->cache.page_lru);276
277page = newpage;278
279gf_msg_trace("io-cache", 0, "returning new page %p", page);280
281out:282return page;283}
284
285/*
286* ioc_wait_on_page - pause a frame to wait till the arrival of a page.
287* here we need to handle the case when the frame who calls wait_on_page
288* himself has caused page_fault
289*
290* @page: page to wait on
291* @frame: call frame who is waiting on page
292*
293*/
294void
295__ioc_wait_on_page(ioc_page_t *page, call_frame_t *frame, off_t offset,296size_t size)297{
298ioc_waitq_t *waitq = NULL;299ioc_local_t *local = NULL;300
301GF_VALIDATE_OR_GOTO("io-cache", frame, out);302local = frame->local;303
304GF_VALIDATE_OR_GOTO(frame->this->name, local, out);305
306if (page == NULL) {307local->op_ret = -1;308local->op_errno = ENOMEM;309gf_smsg(frame->this->name, GF_LOG_WARNING, 0,310IO_CACHE_MSG_NULL_PAGE_WAIT, NULL);311goto out;312}313
314waitq = GF_CALLOC(1, sizeof(*waitq), gf_ioc_mt_ioc_waitq_t);315if (waitq == NULL) {316local->op_ret = -1;317local->op_errno = ENOMEM;318goto out;319}320
321gf_msg_trace(frame->this->name, 0,322"frame(%p) waiting on page = %p, offset=%" PRId64323", "324"size=%" GF_PRI_SIZET "",325frame, page, offset, size);326
327waitq->data = frame;328waitq->next = page->waitq;329waitq->pending_offset = offset;330waitq->pending_size = size;331page->waitq = waitq;332/* one frame can wait only once on a given page,333* local->wait_count is number of pages a frame is waiting on */
334ioc_local_lock(local);335{336local->wait_count++;337}338ioc_local_unlock(local);339
340out:341return;342}
343
344/*
345* ioc_cache_still_valid - see if cached pages ioc_inode are still valid
346* against given stbuf
347*
348* @ioc_inode:
349* @stbuf:
350*
351* assumes ioc_inode is locked
352*/
353int8_t
354ioc_cache_still_valid(ioc_inode_t *ioc_inode, struct iatt *stbuf)355{
356int8_t cache_still_valid = 1;357
358GF_VALIDATE_OR_GOTO("io-cache", ioc_inode, out);359
360#if 0361if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) ||362(stbuf->st_mtim.tv_nsec != ioc_inode->stbuf.st_mtim.tv_nsec))363cache_still_valid = 0;364
365#else366if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) ||367(stbuf->ia_mtime_nsec != ioc_inode->cache.mtime_nsec))368cache_still_valid = 0;369
370#endif371
372#if 0373/* talk with avati@gluster.com to enable this section */374if (!ioc_inode->mtime && stbuf) {375cache_still_valid = 1;376ioc_inode->mtime = stbuf->ia_mtime;377}378#endif379
380out:381return cache_still_valid;382}
383
384void
385ioc_waitq_return(ioc_waitq_t *waitq)386{
387ioc_waitq_t *trav = NULL;388ioc_waitq_t *next = NULL;389call_frame_t *frame = NULL;390
391for (trav = waitq; trav; trav = next) {392next = trav->next;393
394frame = trav->data;395ioc_frame_return(frame);396GF_FREE(trav);397}398}
399
400int
401ioc_fault_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,402int32_t op_errno, struct iovec *vector, int32_t count,403struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)404{
405ioc_local_t *local = NULL;406off_t offset = 0;407ioc_inode_t *ioc_inode = NULL;408ioc_table_t *table = NULL;409ioc_page_t *page = NULL;410int32_t destroy_size = 0;411size_t page_size = 0;412ioc_waitq_t *waitq = NULL;413size_t iobref_page_size = 0;414char zero_filled = 0;415
416GF_ASSERT(frame);417
418local = frame->local;419GF_ASSERT(local);420
421offset = local->pending_offset;422ioc_inode = local->inode;423GF_ASSERT(ioc_inode);424
425table = ioc_inode->table;426GF_ASSERT(table);427
428zero_filled = ((op_ret >= 0) && (stbuf->ia_mtime == 0));429
430ioc_inode_lock(ioc_inode);431{432if (op_ret == -1 ||433!(zero_filled || ioc_cache_still_valid(ioc_inode, stbuf))) {434gf_msg_trace(ioc_inode->table->xl->name, 0,435"cache for inode(%p) is invalid. flushing "436"all pages",437ioc_inode);438destroy_size = __ioc_inode_flush(ioc_inode);439}440
441if ((op_ret >= 0) && !zero_filled) {442ioc_inode->cache.mtime = stbuf->ia_mtime;443ioc_inode->cache.mtime_nsec = stbuf->ia_mtime_nsec;444}445
446ioc_inode->cache.last_revalidate = gf_time();447
448if (op_ret < 0) {449/* error, readv returned -1 */450page = __ioc_page_get(ioc_inode, offset);451if (page)452waitq = __ioc_page_error(page, op_ret, op_errno);453} else {454gf_msg_trace(ioc_inode->table->xl->name, 0, "op_ret = %d", op_ret);455page = __ioc_page_get(ioc_inode, offset);456if (!page) {457/* page was flushed */458/* some serious bug ? */459gf_smsg(frame->this->name, GF_LOG_WARNING, 0,460IO_CACHE_MSG_WASTED_COPY, "offset=%" PRId64, offset,461"page-size=%" PRId64, table->page_size, "ioc_inode=%p",462ioc_inode, NULL);463} else {464if (page->vector) {465iobref_unref(page->iobref);466GF_FREE(page->vector);467page->vector = NULL;468page->iobref = NULL;469}470
471/* keep a copy of the page for our cache */472page->vector = iov_dup(vector, count);473if (page->vector == NULL) {474page = __ioc_page_get(ioc_inode, offset);475if (page != NULL)476waitq = __ioc_page_error(page, -1, ENOMEM);477goto unlock;478}479
480page->count = count;481if (iobref) {482page->iobref = iobref_ref(iobref);483} else {484/* TODO: we have got a response to485* our request and no data */
486gf_smsg(frame->this->name, GF_LOG_CRITICAL, ENOMEM,487IO_CACHE_MSG_FRAME_NULL, NULL);488} /* if(frame->root->rsp_refs) */489
490/* page->size should indicate exactly how491* much the readv call to the child
492* translator returned. earlier op_ret
493* from child translator was used, which
494* gave rise to a bug where reads from
495* io-cached volume were resulting in 0
496* byte replies */
497page_size = iov_length(vector, count);498page->size = page_size;499page->op_errno = op_errno;500
501iobref_page_size = iobref_size(page->iobref);502
503if (page->waitq) {504/* wake up all the frames waiting on505* this page, including
506* the frame which triggered fault */
507waitq = __ioc_page_wakeup(page, op_errno);508} /* if(page->waitq) */509} /* if(!page)...else */510} /* if(op_ret < 0)...else */511} /* ioc_inode locked region end */512unlock:513ioc_inode_unlock(ioc_inode);514
515ioc_waitq_return(waitq);516
517if (iobref_page_size) {518ioc_table_lock(table);519{520table->cache_used += iobref_page_size;521}522ioc_table_unlock(table);523}524
525if (destroy_size) {526ioc_table_lock(table);527{528table->cache_used -= destroy_size;529}530ioc_table_unlock(table);531}532
533if (ioc_need_prune(ioc_inode->table)) {534ioc_prune(ioc_inode->table);535}536
537gf_msg_trace(frame->this->name, 0, "fault frame %p returned", frame);538pthread_mutex_destroy(&local->local_lock);539
540fd_unref(local->fd);541if (local->xattr_req)542dict_unref(local->xattr_req);543
544STACK_DESTROY(frame->root);545return 0;546}
547
548/*
549* ioc_page_fault -
550*
551* @ioc_inode:
552* @frame:
553* @fd:
554* @offset:
555*
556*/
557void
558ioc_page_fault(ioc_inode_t *ioc_inode, call_frame_t *frame, fd_t *fd,559off_t offset)560{
561ioc_table_t *table = NULL;562call_frame_t *fault_frame = NULL;563ioc_local_t *fault_local = NULL;564ioc_local_t *local = NULL;565int32_t op_ret = -1, op_errno = -1;566ioc_waitq_t *waitq = NULL;567ioc_page_t *page = NULL;568
569GF_ASSERT(ioc_inode);570if (frame == NULL) {571op_ret = -1;572op_errno = EINVAL;573gf_smsg("io-cache", GF_LOG_WARNING, EINVAL, IO_CACHE_MSG_PAGE_FAULT,574NULL);575goto err;576}577
578table = ioc_inode->table;579fault_frame = copy_frame(frame);580if (fault_frame == NULL) {581op_ret = -1;582op_errno = ENOMEM;583goto err;584}585
586local = frame->local;587fault_local = mem_get0(THIS->local_pool);588if (fault_local == NULL) {589op_ret = -1;590op_errno = ENOMEM;591STACK_DESTROY(fault_frame->root);592goto err;593}594
595/* NOTE: copy_frame() means, the frame the fop whose fd_ref we596* are using till now won't be valid till we get reply from server.
597* we unref this fd, in fault_cbk */
598fault_local->fd = fd_ref(fd);599
600fault_frame->local = fault_local;601pthread_mutex_init(&fault_local->local_lock, NULL);602
603INIT_LIST_HEAD(&fault_local->fill_list);604fault_local->pending_offset = offset;605fault_local->pending_size = table->page_size;606fault_local->inode = ioc_inode;607
608if (local && local->xattr_req)609fault_local->xattr_req = dict_ref(local->xattr_req);610
611gf_msg_trace(frame->this->name, 0,612"stack winding page fault for offset = %" PRId64613" with "614"frame %p",615offset, fault_frame);616
617STACK_WIND(fault_frame, ioc_fault_cbk, FIRST_CHILD(fault_frame->this),618FIRST_CHILD(fault_frame->this)->fops->readv, fd,619table->page_size, offset, 0, fault_local->xattr_req);620return;621
622err:623ioc_inode_lock(ioc_inode);624{625page = __ioc_page_get(ioc_inode, offset);626if (page != NULL) {627waitq = __ioc_page_error(page, op_ret, op_errno);628}629}630ioc_inode_unlock(ioc_inode);631
632if (waitq != NULL) {633ioc_waitq_return(waitq);634}635}
636
637int32_t
638__ioc_frame_fill(ioc_page_t *page, call_frame_t *frame, off_t offset,639size_t size, int32_t op_errno)640{
641ioc_local_t *local = NULL;642ioc_fill_t *fill = NULL;643off_t src_offset = 0;644off_t dst_offset = 0;645ssize_t copy_size = 0;646ioc_inode_t *ioc_inode = NULL;647ioc_fill_t *new = NULL;648int8_t found = 0;649int32_t ret = -1;650
651GF_VALIDATE_OR_GOTO("io-cache", frame, out);652
653local = frame->local;654GF_VALIDATE_OR_GOTO(frame->this->name, local, out);655
656if (page == NULL) {657gf_smsg(frame->this->name, GF_LOG_WARNING, 0,658IO_CACHE_MSG_SERVE_READ_REQUEST, NULL);659local->op_ret = -1;660local->op_errno = EINVAL;661goto out;662}663
664ioc_inode = page->inode;665
666gf_msg_trace(frame->this->name, 0,667"frame (%p) offset = %" PRId64 " && size = %" GF_PRI_SIZET668" "669"&& page->size = %" GF_PRI_SIZET " && wait_count = %d",670frame, offset, size, page->size, local->wait_count);671
672/* immediately move this page to the end of the page_lru list */673list_move_tail(&page->page_lru, &ioc_inode->cache.page_lru);674/* fill local->pending_size bytes from local->pending_offset */675if (local->op_ret != -1) {676local->op_errno = op_errno;677
678if (page->size == 0) {679goto done;680}681
682if (offset > page->offset)683/* offset is offset in file, convert it to offset in684* page */
685src_offset = offset - page->offset;686/*FIXME: since offset is the offset within page is the687* else case valid? */
688else689/* local->pending_offset is in previous page. do not690* fill until we have filled all previous pages */
691dst_offset = page->offset - offset;692
693/* we have to copy from offset to either end of this page694* or till the requested size */
695copy_size = min(page->size - src_offset, size - dst_offset);696
697if (copy_size < 0) {698/* if page contains fewer bytes and the required offset699is beyond the page size in the page */
700copy_size = src_offset = 0;701}702
703gf_msg_trace(page->inode->table->xl->name, 0,704"copy_size = %" GF_PRI_SIZET705" && src_offset = "706"%" PRId64 " && dst_offset = %" PRId64 "",707copy_size, src_offset, dst_offset);708
709{710new = GF_CALLOC(1, sizeof(*new), gf_ioc_mt_ioc_fill_t);711if (new == NULL) {712local->op_ret = -1;713local->op_errno = ENOMEM;714goto out;715}716
717new->offset = page->offset;718new->size = copy_size;719new->iobref = iobref_ref(page->iobref);720new->count = iov_subset(page->vector, page->count, src_offset,721copy_size, &new->vector, 0);722if (new->count < 0) {723local->op_ret = -1;724local->op_errno = ENOMEM;725
726iobref_unref(new->iobref);727GF_FREE(new);728goto out;729}730
731/* add the ioc_fill to fill_list for this frame */732if (list_empty(&local->fill_list)) {733/* if list is empty, then this is the first734* time we are filling frame, add the
735* ioc_fill_t to the end of list */
736list_add_tail(&new->list, &local->fill_list);737} else {738found = 0;739/* list is not empty, we need to look for740* where this offset fits in list */
741list_for_each_entry(fill, &local->fill_list, list)742{743if (fill->offset > new->offset) {744found = 1;745break;746}747}748
749if (found) {750list_add_tail(&new->list, &fill->list);751} else {752list_add_tail(&new->list, &local->fill_list);753}754}755}756
757local->op_ret += copy_size;758}759
760done:761ret = 0;762out:763return ret;764}
765
766/*
767* ioc_frame_unwind - frame unwinds only from here
768*
769* @frame: call frame to unwind
770*
771* to be used only by ioc_frame_return(), when a frame has
772* finished waiting on all pages, required
773*
774*/
775static void776ioc_frame_unwind(call_frame_t *frame)777{
778ioc_local_t *local = NULL;779ioc_fill_t *fill = NULL, *next = NULL;780int32_t count = 0;781struct iovec *vector = NULL;782int32_t copied = 0;783struct iobref *iobref = NULL;784struct iatt stbuf = {7850,786};787int32_t op_ret = 0, op_errno = 0;788
789GF_ASSERT(frame);790
791local = frame->local;792if (local == NULL) {793gf_smsg(frame->this->name, GF_LOG_WARNING, ENOMEM,794IO_CACHE_MSG_LOCAL_NULL, NULL);795op_ret = -1;796op_errno = ENOMEM;797goto unwind;798}799
800/* gnfs requires op_errno to determine is_eof */801op_errno = local->op_errno;802
803if (local->op_ret < 0) {804op_ret = local->op_ret;805goto unwind;806}807
808// ioc_local_lock (local);809iobref = iobref_new();810if (iobref == NULL) {811op_ret = -1;812op_errno = ENOMEM;813}814
815if (list_empty(&local->fill_list)) {816gf_msg_trace(frame->this->name, 0,817"frame(%p) has 0 entries in local->fill_list "818"(offset = %" PRId64 " && size = %" GF_PRI_SIZET ")",819frame, local->offset, local->size);820}821
822list_for_each_entry(fill, &local->fill_list, list) { count += fill->count; }823
824vector = GF_CALLOC(count, sizeof(*vector), gf_ioc_mt_iovec);825if (vector == NULL) {826op_ret = -1;827op_errno = ENOMEM;828}829
830list_for_each_entry_safe(fill, next, &local->fill_list, list)831{832/* # TODO: check why this if clause is needed at all. */833if ((vector != NULL) && (iobref != NULL)) {834memcpy(((char *)vector) + copied, fill->vector,835fill->count * sizeof(*vector));836
837copied += (fill->count * sizeof(*vector));838
839if (iobref_merge(iobref, fill->iobref)) {840op_ret = -1;841op_errno = ENOMEM;842}843}844
845list_del(&fill->list);846iobref_unref(fill->iobref);847GF_FREE(fill->vector);848GF_FREE(fill);849}850
851if (op_ret != -1) {852op_ret = iov_length(vector, count);853}854
855unwind:856gf_msg_trace(frame->this->name, 0, "frame(%p) unwinding with op_ret=%d",857frame, op_ret);858
859// ioc_local_unlock (local);860
861frame->local = NULL;862STACK_UNWIND_STRICT(readv, frame, op_ret, op_errno, vector, count, &stbuf,863iobref, NULL);864
865if (iobref != NULL) {866iobref_unref(iobref);867}868
869if (vector != NULL) {870GF_FREE(vector);871vector = NULL;872}873
874if (local) {875if (local->xattr_req)876dict_unref(local->xattr_req);877pthread_mutex_destroy(&local->local_lock);878mem_put(local);879}880return;881}
882
883/*
884* ioc_frame_return -
885* @frame:
886*
887* to be called only when a frame is waiting on an in-transit page
888*/
889void
890ioc_frame_return(call_frame_t *frame)891{
892ioc_local_t *local = NULL;893int32_t wait_count = 0;894
895GF_ASSERT(frame);896
897local = frame->local;898GF_ASSERT(local->wait_count > 0);899
900ioc_local_lock(local);901{902wait_count = --local->wait_count;903}904ioc_local_unlock(local);905
906if (!wait_count) {907ioc_frame_unwind(frame);908}909
910return;911}
912
913/*
914* ioc_page_wakeup -
915* @page:
916*
917* to be called only when a frame is waiting on an in-transit page
918*/
919ioc_waitq_t *920__ioc_page_wakeup(ioc_page_t *page, int32_t op_errno)921{
922ioc_waitq_t *waitq = NULL, *trav = NULL;923call_frame_t *frame = NULL;924int32_t ret = -1;925
926GF_VALIDATE_OR_GOTO("io-cache", page, out);927
928waitq = page->waitq;929page->waitq = NULL;930
931page->ready = 1;932
933gf_msg_trace(page->inode->table->xl->name, 0, "page is %p && waitq = %p",934page, waitq);935
936for (trav = waitq; trav; trav = trav->next) {937frame = trav->data;938ret = __ioc_frame_fill(page, frame, trav->pending_offset,939trav->pending_size, op_errno);940if (ret == -1) {941break;942}943}944
945if (page->stale) {946__ioc_page_destroy(page);947}948
949out:950return waitq;951}
952
953/*
954* ioc_page_error -
955* @page:
956* @op_ret:
957* @op_errno:
958*
959*/
960ioc_waitq_t *961__ioc_page_error(ioc_page_t *page, int32_t op_ret, int32_t op_errno)962{
963ioc_waitq_t *waitq = NULL, *trav = NULL;964call_frame_t *frame = NULL;965int64_t ret = 0;966ioc_table_t *table = NULL;967ioc_local_t *local = NULL;968
969GF_VALIDATE_OR_GOTO("io-cache", page, out);970
971waitq = page->waitq;972page->waitq = NULL;973
974gf_msg_debug(page->inode->table->xl->name, 0,975"page error for page = %p & waitq = %p", page, waitq);976
977for (trav = waitq; trav; trav = trav->next) {978frame = trav->data;979
980local = frame->local;981ioc_local_lock(local);982{983if (local->op_ret != -1) {984local->op_ret = op_ret;985local->op_errno = op_errno;986}987}988ioc_local_unlock(local);989}990
991table = page->inode->table;992ret = __ioc_page_destroy(page);993
994if (ret != -1) {995table->cache_used -= ret;996}997
998out:999return waitq;1000}
1001
1002/*
1003* ioc_page_error -
1004* @page:
1005* @op_ret:
1006* @op_errno:
1007*
1008*/
1009ioc_waitq_t *1010ioc_page_error(ioc_page_t *page, int32_t op_ret, int32_t op_errno)1011{
1012ioc_waitq_t *waitq = NULL;1013struct ioc_inode *inode = NULL;1014
1015if (page == NULL) {1016goto out;1017}1018
1019ioc_inode_lock(page->inode);1020{1021inode = page->inode;1022waitq = __ioc_page_error(page, op_ret, op_errno);1023}1024ioc_inode_unlock(inode);1025
1026out:1027return waitq;1028}
1029