glusterfs

Форк
0
253 строки · 7.0 Кб
1
#
2
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
3
# This file is part of GlusterFS.
4

5
# This file is licensed to you under your choice of the GNU Lesser
6
# General Public License, version 3 or any later version (LGPLv3 or
7
# later), or the GNU General Public License, version 2 (GPLv2), in all
8
# cases as published by the Free Software Foundation.
9
#
10

11
import os
12
import sys
13
import time
14
import logging
15
from threading import Condition
16
try:
17
    import _thread as thread
18
except ImportError:
19
    import thread
20
try:
21
    from queue import Queue
22
except ImportError:
23
    from Queue import Queue
24
try:
25
    import cPickle as pickle
26
except ImportError:
27
    import pickle
28

29
from syncdutils import Thread, select, lf
30

31
pickle_proto = 2
32
repce_version = 1.0
33

34

35
def ioparse(i, o):
36
    if isinstance(i, int):
37
        i = os.fdopen(i, 'rb')
38
    # rely on duck typing for recognizing
39
    # streams as that works uniformly
40
    # in py2 and py3
41
    if hasattr(o, 'fileno'):
42
        o = o.fileno()
43
    return (i, o)
44

45

46
def send(out, *args):
47
    """pickle args and write out wholly in one syscall
48

49
    ie. not use the ability of pickle to dump directly to
50
    a stream, as that would potentially mess up messages
51
    by interleaving them
52
    """
53
    os.write(out, pickle.dumps(args, pickle_proto))
54

55

56
def recv(inf):
57
    """load an object from input stream
58
    python2 and python3 compatibility, inf is sys.stdin
59
    and is opened as text stream by default. Hence using the
60
    buffer attribute in python3
61
    """
62
    if hasattr(inf, "buffer"):
63
        return pickle.load(inf.buffer)
64
    else:
65
        return pickle.load(inf)
66

67

68
class RepceServer(object):
69

70
    """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
71

72
    ... also our homebrewed RPC backend where the transport layer is
73
    reduced to a pair of filehandles.
74

75
    This is the server component.
76
    """
77

78
    def __init__(self, obj, i, o, wnum=6):
79
        """register a backend object .obj to which incoming messages
80
           are dispatched, also incoming/outcoming streams
81
        """
82
        self.obj = obj
83
        self.inf, self.out = ioparse(i, o)
84
        self.wnum = wnum
85
        self.q = Queue()
86

87
    def service_loop(self):
88
        """fire up worker threads, get messages and dispatch among them"""
89
        for i in range(self.wnum):
90
            t = Thread(target=self.worker)
91
            t.start()
92
        try:
93
            while True:
94
                self.q.put(recv(self.inf))
95
        except EOFError:
96
            logging.info("terminating on reaching EOF.")
97

98
    def worker(self):
99
        """life of a worker
100

101
        Get message, extract its id, method name and arguments
102
        (kwargs not supported), call method on .obj.
103
        Send back message id + return value.
104
        If method call throws an exception, rescue it, and send
105
        back the exception as result (with flag marking it as
106
        exception).
107
        """
108
        while True:
109
            in_data = self.q.get(True)
110
            rid = in_data[0]
111
            rmeth = in_data[1]
112
            exc = False
113
            if rmeth == '__repce_version__':
114
                res = repce_version
115
            else:
116
                try:
117
                    res = getattr(self.obj, rmeth)(*in_data[2:])
118
                except:
119
                    res = sys.exc_info()[1]
120
                    exc = True
121
                    logging.exception("call failed: ")
122
            send(self.out, rid, exc, res)
123

124

125
class RepceJob(object):
126

127
    """class representing message status we can use
128
    for waiting on reply"""
129

130
    def __init__(self, cbk):
131
        """
132
        - .rid: (process-wise) unique id
133
        - .cbk: what we do upon receiving reply
134
        """
135
        self.rid = (os.getpid(), thread.get_ident(), time.time())
136
        self.cbk = cbk
137
        self.lever = Condition()
138
        self.done = False
139

140
    def __repr__(self):
141
        return ':'.join([str(x) for x in self.rid])
142

143
    def wait(self):
144
        self.lever.acquire()
145
        if not self.done:
146
            self.lever.wait()
147
        self.lever.release()
148
        return self.result
149

150
    def wakeup(self, data):
151
        self.result = data
152
        self.lever.acquire()
153
        self.done = True
154
        self.lever.notify()
155
        self.lever.release()
156

157

158
class RepceClient(object):
159

160
    """RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
161

162
    ... also our homebrewed RPC backend where the transport layer is
163
    reduced to a pair of filehandles.
164

165
    This is the client component.
166
    """
167

168
    def __init__(self, i, o):
169
        self.inf, self.out = ioparse(i, o)
170
        self.jtab = {}
171
        t = Thread(target=self.listen)
172
        t.start()
173

174
    def listen(self):
175
        while True:
176
            select((self.inf,), (), ())
177
            rid, exc, res = recv(self.inf)
178
            rjob = self.jtab.pop(rid)
179
            if rjob.cbk:
180
                rjob.cbk(rjob, [exc, res])
181

182
    def push(self, meth, *args, **kw):
183
        """wrap arguments in a RepceJob, send them to server
184
           and return the RepceJob
185

186
           @cbk to pass on RepceJob can be given as kwarg.
187
        """
188
        cbk = kw.get('cbk')
189
        if not cbk:
190
            def cbk(rj, res):
191
                if res[0]:
192
                    raise res[1]
193
        rjob = RepceJob(cbk)
194
        self.jtab[rjob.rid] = rjob
195
        logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
196
        send(self.out, rjob.rid, meth, *args)
197
        return rjob
198

199
    def __call__(self, meth, *args):
200
        """RePCe client is callabe, calling it implements a synchronous
201
        remote call.
202

203
        We do a .push with a cbk which does a wakeup upon receiving answer,
204
        then wait on the RepceJob.
205
        """
206
        rjob = self.push(
207
            meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
208
        exc, res = rjob.wait()
209
        if exc:
210
            logging.error(lf('call failed',
211
                             call=repr(rjob),
212
                             method=meth,
213
                             error=str(type(res).__name__)))
214
            raise res
215
        logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
216
        return res
217

218
    class mprx(object):
219

220
        """method proxy, standard trick to implement rubyesque
221
        method_missing in Python
222

223
        A class is a closure factory, you know what I mean, or go read
224
        some SICP.
225
        """
226

227
        def __init__(self, ins, meth):
228
            self.ins = ins
229
            self.meth = meth
230

231
        def __call__(self, *a):
232
            return self.ins(self.meth, *a)
233

234
    def __getattr__(self, meth):
235
        """this implements transparent method dispatch to remote object,
236
           so that you don't need to call the RepceClient instance like
237

238
             rclient('how_old_are_you_if_born_in', 1979)
239

240
           but you can make it into an ordinary method call like
241

242
             rclient.how_old_are_you_if_born_in(1979)
243
        """
244
        return self.mprx(self, meth)
245

246
    def __version__(self):
247
        """used in handshake to verify compatibility"""
248
        d = {'proto': self('__repce_version__')}
249
        try:
250
            d['object'] = self('version')
251
        except AttributeError:
252
            pass
253
        return d
254

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.