glusterfs
253 строки · 7.0 Кб
1#
2# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
3# This file is part of GlusterFS.
4
5# This file is licensed to you under your choice of the GNU Lesser
6# General Public License, version 3 or any later version (LGPLv3 or
7# later), or the GNU General Public License, version 2 (GPLv2), in all
8# cases as published by the Free Software Foundation.
9#
10
11import os
12import sys
13import time
14import logging
15from threading import Condition
16try:
17import _thread as thread
18except ImportError:
19import thread
20try:
21from queue import Queue
22except ImportError:
23from Queue import Queue
24try:
25import cPickle as pickle
26except ImportError:
27import pickle
28
29from syncdutils import Thread, select, lf
30
31pickle_proto = 2
32repce_version = 1.0
33
34
35def ioparse(i, o):
36if isinstance(i, int):
37i = os.fdopen(i, 'rb')
38# rely on duck typing for recognizing
39# streams as that works uniformly
40# in py2 and py3
41if hasattr(o, 'fileno'):
42o = o.fileno()
43return (i, o)
44
45
46def send(out, *args):
47"""pickle args and write out wholly in one syscall
48
49ie. not use the ability of pickle to dump directly to
50a stream, as that would potentially mess up messages
51by interleaving them
52"""
53os.write(out, pickle.dumps(args, pickle_proto))
54
55
56def recv(inf):
57"""load an object from input stream
58python2 and python3 compatibility, inf is sys.stdin
59and is opened as text stream by default. Hence using the
60buffer attribute in python3
61"""
62if hasattr(inf, "buffer"):
63return pickle.load(inf.buffer)
64else:
65return pickle.load(inf)
66
67
68class RepceServer(object):
69
70"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
71
72... also our homebrewed RPC backend where the transport layer is
73reduced to a pair of filehandles.
74
75This is the server component.
76"""
77
78def __init__(self, obj, i, o, wnum=6):
79"""register a backend object .obj to which incoming messages
80are dispatched, also incoming/outcoming streams
81"""
82self.obj = obj
83self.inf, self.out = ioparse(i, o)
84self.wnum = wnum
85self.q = Queue()
86
87def service_loop(self):
88"""fire up worker threads, get messages and dispatch among them"""
89for i in range(self.wnum):
90t = Thread(target=self.worker)
91t.start()
92try:
93while True:
94self.q.put(recv(self.inf))
95except EOFError:
96logging.info("terminating on reaching EOF.")
97
98def worker(self):
99"""life of a worker
100
101Get message, extract its id, method name and arguments
102(kwargs not supported), call method on .obj.
103Send back message id + return value.
104If method call throws an exception, rescue it, and send
105back the exception as result (with flag marking it as
106exception).
107"""
108while True:
109in_data = self.q.get(True)
110rid = in_data[0]
111rmeth = in_data[1]
112exc = False
113if rmeth == '__repce_version__':
114res = repce_version
115else:
116try:
117res = getattr(self.obj, rmeth)(*in_data[2:])
118except:
119res = sys.exc_info()[1]
120exc = True
121logging.exception("call failed: ")
122send(self.out, rid, exc, res)
123
124
125class RepceJob(object):
126
127"""class representing message status we can use
128for waiting on reply"""
129
130def __init__(self, cbk):
131"""
132- .rid: (process-wise) unique id
133- .cbk: what we do upon receiving reply
134"""
135self.rid = (os.getpid(), thread.get_ident(), time.time())
136self.cbk = cbk
137self.lever = Condition()
138self.done = False
139
140def __repr__(self):
141return ':'.join([str(x) for x in self.rid])
142
143def wait(self):
144self.lever.acquire()
145if not self.done:
146self.lever.wait()
147self.lever.release()
148return self.result
149
150def wakeup(self, data):
151self.result = data
152self.lever.acquire()
153self.done = True
154self.lever.notify()
155self.lever.release()
156
157
158class RepceClient(object):
159
160"""RePCe is Hungarian for canola, http://hu.wikipedia.org/wiki/Repce
161
162... also our homebrewed RPC backend where the transport layer is
163reduced to a pair of filehandles.
164
165This is the client component.
166"""
167
168def __init__(self, i, o):
169self.inf, self.out = ioparse(i, o)
170self.jtab = {}
171t = Thread(target=self.listen)
172t.start()
173
174def listen(self):
175while True:
176select((self.inf,), (), ())
177rid, exc, res = recv(self.inf)
178rjob = self.jtab.pop(rid)
179if rjob.cbk:
180rjob.cbk(rjob, [exc, res])
181
182def push(self, meth, *args, **kw):
183"""wrap arguments in a RepceJob, send them to server
184and return the RepceJob
185
186@cbk to pass on RepceJob can be given as kwarg.
187"""
188cbk = kw.get('cbk')
189if not cbk:
190def cbk(rj, res):
191if res[0]:
192raise res[1]
193rjob = RepceJob(cbk)
194self.jtab[rjob.rid] = rjob
195logging.debug("call %s %s%s ..." % (repr(rjob), meth, repr(args)))
196send(self.out, rjob.rid, meth, *args)
197return rjob
198
199def __call__(self, meth, *args):
200"""RePCe client is callabe, calling it implements a synchronous
201remote call.
202
203We do a .push with a cbk which does a wakeup upon receiving answer,
204then wait on the RepceJob.
205"""
206rjob = self.push(
207meth, *args, **{'cbk': lambda rj, res: rj.wakeup(res)})
208exc, res = rjob.wait()
209if exc:
210logging.error(lf('call failed',
211call=repr(rjob),
212method=meth,
213error=str(type(res).__name__)))
214raise res
215logging.debug("call %s %s -> %s" % (repr(rjob), meth, repr(res)))
216return res
217
218class mprx(object):
219
220"""method proxy, standard trick to implement rubyesque
221method_missing in Python
222
223A class is a closure factory, you know what I mean, or go read
224some SICP.
225"""
226
227def __init__(self, ins, meth):
228self.ins = ins
229self.meth = meth
230
231def __call__(self, *a):
232return self.ins(self.meth, *a)
233
234def __getattr__(self, meth):
235"""this implements transparent method dispatch to remote object,
236so that you don't need to call the RepceClient instance like
237
238rclient('how_old_are_you_if_born_in', 1979)
239
240but you can make it into an ordinary method call like
241
242rclient.how_old_are_you_if_born_in(1979)
243"""
244return self.mprx(self, meth)
245
246def __version__(self):
247"""used in handshake to verify compatibility"""
248d = {'proto': self('__repce_version__')}
249try:
250d['object'] = self('version')
251except AttributeError:
252pass
253return d
254