glusterfs

Форк
0
395 строк · 14.6 Кб
1
#
2
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
3
# This file is part of GlusterFS.
4

5
# This file is licensed to you under your choice of the GNU Lesser
6
# General Public License, version 3 or any later version (LGPLv3 or
7
# later), or the GNU General Public License, version 2 (GPLv2), in all
8
# cases as published by the Free Software Foundation.
9
#
10

11
import os
12
import sys
13
import time
14
import signal
15
import logging
16
import xml.etree.ElementTree as XET
17
from threading import Lock
18
from errno import ECHILD, ESRCH
19
import random
20

21
from resource import SSH
22
import gsyncdconfig as gconf
23
import libgfchangelog
24
from rconf import rconf
25
from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,
26
                        set_term_handler, GsyncdError,
27
                        Thread, finalize, Volinfo, VolinfoFromGconf,
28
                        gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,
29
                        unshare_propagation_supported)
30
from gsyncdstatus import GeorepStatus, set_monitor_status
31
import py2py3
32
from py2py3 import pipe
33

34
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
35

36

37
def get_subvol_num(brick_idx, vol, hot):
38
    tier = vol.is_tier()
39
    disperse_count = vol.disperse_count(tier, hot)
40
    replica_count = vol.replica_count(tier, hot)
41
    distribute_count = vol.distribution_count(tier, hot)
42
    gconf.setconfig("primary-distribution-count", distribute_count)
43

44
    if (tier and not hot):
45
        brick_idx = brick_idx - vol.get_hot_bricks_count(tier)
46

47
    subvol_size = disperse_count if disperse_count > 0 else replica_count
48
    cnt = int((brick_idx + 1) / subvol_size)
49
    rem = (brick_idx + 1) % subvol_size
50
    if rem > 0:
51
        cnt = cnt + 1
52

53
    if (tier and hot):
54
        return "hot_" + str(cnt)
55
    elif (tier and not hot):
56
        return "cold_" + str(cnt)
57
    else:
58
        return str(cnt)
59

60

61
class Monitor(object):
62

63
    """class which spawns and manages gsyncd workers"""
64

65
    ST_INIT = 'Initializing...'
66
    ST_STARTED = 'Started'
67
    ST_STABLE = 'Active'
68
    ST_FAULTY = 'Faulty'
69
    ST_INCON = 'inconsistent'
70
    _ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]
71

72
    def __init__(self):
73
        self.lock = Lock()
74
        self.state = {}
75
        self.status = {}
76

77
    @staticmethod
78
    def terminate():
79
        # relax one SIGTERM by setting a handler that sets back
80
        # standard handler
81
        set_term_handler(lambda *a: set_term_handler())
82
        # give a chance to graceful exit
83
        errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])
84

85
    def monitor(self, w, argv, cpids, secondary_vol, secondary_host, primary,
86
                suuid, secondarynodes):
87
        """the monitor loop
88

89
        Basic logic is a blantantly simple blunt heuristics:
90
        if spawned client survives 60 secs, it's considered OK.
91
        This servers us pretty well as it's not vulneralbe to
92
        any kind of irregular behavior of the child...
93

94
        ... well, except for one: if children is hung up on
95
        waiting for some event, it can survive aeons, still
96
        will be defunct. So we tweak the above logic to
97
        expect the worker to send us a signal within 60 secs
98
        (in the form of closing its end of a pipe). The worker
99
        does this when it's done with the setup stage
100
        ready to enter the service loop (note it's the setup
101
        stage which is vulnerable to hangs -- the full
102
        blown worker blows up on EPIPE if the net goes down,
103
        due to the keep-alive thread)
104
        """
105
        if not self.status.get(w[0]['dir'], None):
106
            self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),
107
                                                    w[0]['host'],
108
                                                    w[0]['dir'],
109
                                                    w[0]['uuid'],
110
                                                    primary,
111
                                                    "%s::%s" % (secondary_host,
112
                                                                secondary_vol))
113
        ret = 0
114

115
        def nwait(p, o=0):
116
            try:
117
                p2, r = waitpid(p, o)
118
                if not p2:
119
                    return
120
                return r
121
            except OSError as e:
122
                # no child process, this happens if the child process
123
                # already died and has been cleaned up
124
                if e.errno == ECHILD:
125
                    return -1
126
                else:
127
                    raise
128

129
        def exit_signalled(s):
130
            """ child terminated due to receipt of SIGUSR1 """
131
            return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))
132

133
        def exit_status(s):
134
            if os.WIFEXITED(s):
135
                return os.WEXITSTATUS(s)
136
            return 1
137

138
        conn_timeout = gconf.get("connection-timeout")
139
        while ret in (0, 1):
140
            remote_user, remote_host = w[1][0].split("@")
141
            remote_id = w[1][1]
142
            # Check the status of the connected secondary node
143
            # If the connected secondary node is down then try to connect to
144
            # different up node.
145
            current_secondary_host = remote_host
146
            secondary_up_hosts = get_up_nodes(secondarynodes, gconf.get("ssh-port"))
147

148
            if (current_secondary_host, remote_id) not in secondary_up_hosts:
149
                if len(secondary_up_hosts) > 0:
150
                    remote_new = random.choice(secondary_up_hosts)
151
                    remote_host = "%s@%s" % (remote_user, remote_new[0])
152
                    remote_id = remote_new[1]
153

154
            # Spawn the worker in lock to avoid fd leak
155
            self.lock.acquire()
156

157
            self.status[w[0]['dir']].set_worker_status(self.ST_INIT)
158
            logging.info(lf('starting gsyncd worker',
159
                            brick=w[0]['dir'],
160
                            secondary_node=remote_host))
161

162
            pr, pw = pipe()
163
            cpid = os.fork()
164
            if cpid == 0:
165
                os.close(pr)
166

167
                args_to_worker = argv + [
168
                    'worker',
169
                    rconf.args.primary,
170
                    rconf.args.secondary,
171
                    '--feedback-fd', str(pw),
172
                    '--local-path', w[0]['dir'],
173
                    '--local-node', w[0]['host'],
174
                    '--local-node-id', w[0]['uuid'],
175
                    '--secondary-id', suuid,
176
                    '--subvol-num', str(w[2]),
177
                    '--resource-remote', remote_host,
178
                    '--resource-remote-id', remote_id
179
                ]
180

181
                if rconf.args.config_file is not None:
182
                    args_to_worker += ['-c', rconf.args.config_file]
183

184
                if w[3]:
185
                    args_to_worker.append("--is-hottier")
186

187
                if rconf.args.debug:
188
                    args_to_worker.append("--debug")
189

190
                access_mount = gconf.get("access-mount")
191
                if access_mount:
192
                    os.execv(sys.executable, args_to_worker)
193
                else:
194
                    if unshare_propagation_supported():
195
                        logging.debug("Worker would mount volume privately")
196
                        unshare_cmd = ['unshare', '-m', '--propagation',
197
                                       'private']
198
                        cmd = unshare_cmd + args_to_worker
199
                        os.execvp("unshare", cmd)
200
                    else:
201
                        logging.debug("Mount is not private. It would be lazy"
202
                                      " umounted")
203
                        os.execv(sys.executable, args_to_worker)
204

205
            cpids.add(cpid)
206
            os.close(pw)
207

208
            self.lock.release()
209

210
            t0 = time.time()
211
            so = select((pr,), (), (), conn_timeout)[0]
212
            os.close(pr)
213

214
            if so:
215
                ret = nwait(cpid, os.WNOHANG)
216

217
                if ret is not None:
218
                    logging.info(lf("worker died before establishing "
219
                                    "connection",
220
                                    brick=w[0]['dir']))
221
                else:
222
                    logging.debug("worker(%s) connected" % w[0]['dir'])
223
                    while time.time() < t0 + conn_timeout:
224
                        ret = nwait(cpid, os.WNOHANG)
225

226
                        if ret is not None:
227
                            logging.info(lf("worker died in startup phase",
228
                                            brick=w[0]['dir']))
229
                            break
230

231
                        time.sleep(1)
232
            else:
233
                logging.info(
234
                    lf("Worker not confirmed after wait, aborting it. "
235
                       "Gsyncd invocation on remote secondary via SSH or "
236
                       "gluster primary mount might have hung. Please "
237
                       "check the above logs for exact issue and check "
238
                       "primary or secondary volume for errors. Restarting "
239
                       "primary/secondary volume accordingly might help.",
240
                       brick=w[0]['dir'],
241
                       timeout=conn_timeout))
242
                errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
243
                ret = nwait(cpid)
244
            if ret is None:
245
                ret = nwait(cpid)
246
            if exit_signalled(ret):
247
                ret = 0
248
            else:
249
                ret = exit_status(ret)
250
                if ret in (0, 1):
251
                    self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY)
252
                    gf_event(EVENT_GEOREP_FAULTY,
253
                             primary_volume=primary.volume,
254
                             primary_node=w[0]['host'],
255
                             primary_node_id=w[0]['uuid'],
256
                             secondary_host=secondary_host,
257
                             secondary_volume=secondary_vol,
258
                             current_secondary_host=current_secondary_host,
259
                             brick_path=w[0]['dir'])
260
            time.sleep(10)
261
        self.status[w[0]['dir']].set_worker_status(self.ST_INCON)
262
        return ret
263

264
    def multiplex(self, wspx, suuid, secondary_vol, secondary_host, primary, secondarynodes):
265
        argv = [os.path.basename(sys.executable), sys.argv[0]]
266

267
        cpids = set()
268
        ta = []
269
        for wx in wspx:
270
            def wmon(w):
271
                cpid, _ = self.monitor(w, argv, cpids, secondary_vol,
272
                                       secondary_host, primary, suuid, secondarynodes)
273
                time.sleep(1)
274
                self.lock.acquire()
275
                for cpid in cpids:
276
                    errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])
277
                self.lock.release()
278
                finalize(exval=1)
279
            t = Thread(target=wmon, args=[wx])
280
            t.start()
281
            ta.append(t)
282

283
        # monitor status was being updated in each monitor thread. It
284
        # should not be done as it can cause deadlock for a worker start.
285
        # set_monitor_status uses flock to synchronize multple instances
286
        # updating the file. Since each monitor thread forks worker,
287
        # these processes can hold the reference to fd of status
288
        # file causing deadlock to workers which starts later as flock
289
        # will not be release until all references to same fd is closed.
290
        # It will also cause fd leaks.
291

292
        self.lock.acquire()
293
        set_monitor_status(gconf.get("state-file"), self.ST_STARTED)
294
        self.lock.release()
295
        for t in ta:
296
            t.join()
297

298

299
def distribute(primary, secondary):
300
    if rconf.args.use_gconf_volinfo:
301
        mvol = VolinfoFromGconf(primary.volume, primary=True)
302
    else:
303
        mvol = Volinfo(primary.volume, primary.host, primary=True)
304
    logging.debug('primary bricks: ' + repr(mvol.bricks))
305
    prelude = []
306
    secondary_host = None
307
    secondary_vol = None
308

309
    prelude = [gconf.get("ssh-command")] + \
310
        gconf.get("ssh-options").split() + \
311
        ["-p", str(gconf.get("ssh-port"))] + \
312
        [secondary.remote_addr]
313

314
    logging.debug('secondary SSH gateway: ' + secondary.remote_addr)
315

316
    if rconf.args.use_gconf_volinfo:
317
        svol = VolinfoFromGconf(secondary.volume, primary=False)
318
    else:
319
        svol = Volinfo(secondary.volume, "localhost", prelude, primary=False)
320

321
    sbricks = svol.bricks
322
    suuid = svol.uuid
323
    secondary_host = secondary.remote_addr.split('@')[-1]
324
    secondary_vol = secondary.volume
325

326
    # save this xattr for the session delete command
327
    old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)
328
    new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \
329
                             svol.uuid
330
    if not old_stime_xattr_prefix or \
331
       old_stime_xattr_prefix != new_stime_xattr_prefix:
332
        gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)
333

334
    logging.debug('secondary bricks: ' + repr(sbricks))
335

336
    secondarynodes = set((b['host'], b["uuid"]) for b in sbricks)
337
    rap = SSH.parse_ssh_address(secondary)
338
    secondarys = [(rap['user'] + '@' + h[0], h[1]) for h in secondarynodes]
339

340
    workerspex = []
341
    for idx, brick in enumerate(mvol.bricks):
342
        if rconf.args.local_node_id == brick['uuid']:
343
            is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))
344
            workerspex.append((brick,
345
                               secondarys[idx % len(secondarys)],
346
                               get_subvol_num(idx, mvol, is_hot),
347
                               is_hot))
348
    logging.debug('worker specs: ' + repr(workerspex))
349
    return workerspex, suuid, secondary_vol, secondary_host, primary, secondarynodes
350

351

352
def monitor(local, remote):
353
    # Check if gsyncd restarted in pause state. If
354
    # yes, send SIGSTOP to negative of monitor pid
355
    # to go back to pause state.
356
    if rconf.args.pause_on_start:
357
        errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])
358

359
    """oh yeah, actually Monitor is used as singleton, too"""
360
    return Monitor().multiplex(*distribute(local, remote))
361

362

363
def startup(go_daemon=True):
364
    """set up logging, pidfile grabbing, daemonization"""
365
    pid_file = gconf.get("pid-file")
366
    if not grabpidfile():
367
        sys.stderr.write("pidfile is taken, exiting.\n")
368
        sys.exit(2)
369
    rconf.pid_file_owned = True
370

371
    if not go_daemon:
372
        return
373

374
    x, y = pipe()
375
    cpid = os.fork()
376
    if cpid:
377
        os.close(x)
378
        sys.exit()
379
    os.close(y)
380
    os.setsid()
381
    dn = os.open(os.devnull, os.O_RDWR)
382
    for f in (sys.stdin, sys.stdout, sys.stderr):
383
        os.dup2(dn, f.fileno())
384

385
    if not grabpidfile(pid_file + '.tmp'):
386
        raise GsyncdError("cannot grab temporary pidfile")
387

388
    os.rename(pid_file + '.tmp', pid_file)
389

390
    # wait for parent to terminate
391
    # so we can start up with
392
    # no messing from the dirty
393
    # ol' bustard
394
    select((x,), (), ())
395
    os.close(x)
396

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.