glusterfs
395 строк · 14.6 Кб
1#
2# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
3# This file is part of GlusterFS.
4
5# This file is licensed to you under your choice of the GNU Lesser
6# General Public License, version 3 or any later version (LGPLv3 or
7# later), or the GNU General Public License, version 2 (GPLv2), in all
8# cases as published by the Free Software Foundation.
9#
10
11import os12import sys13import time14import signal15import logging16import xml.etree.ElementTree as XET17from threading import Lock18from errno import ECHILD, ESRCH19import random20
21from resource import SSH22import gsyncdconfig as gconf23import libgfchangelog24from rconf import rconf25from syncdutils import (select, waitpid, errno_wrap, lf, grabpidfile,26set_term_handler, GsyncdError,27Thread, finalize, Volinfo, VolinfoFromGconf,28gf_event, EVENT_GEOREP_FAULTY, get_up_nodes,29unshare_propagation_supported)30from gsyncdstatus import GeorepStatus, set_monitor_status31import py2py332from py2py3 import pipe33
34ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError35
36
37def get_subvol_num(brick_idx, vol, hot):38tier = vol.is_tier()39disperse_count = vol.disperse_count(tier, hot)40replica_count = vol.replica_count(tier, hot)41distribute_count = vol.distribution_count(tier, hot)42gconf.setconfig("primary-distribution-count", distribute_count)43
44if (tier and not hot):45brick_idx = brick_idx - vol.get_hot_bricks_count(tier)46
47subvol_size = disperse_count if disperse_count > 0 else replica_count48cnt = int((brick_idx + 1) / subvol_size)49rem = (brick_idx + 1) % subvol_size50if rem > 0:51cnt = cnt + 152
53if (tier and hot):54return "hot_" + str(cnt)55elif (tier and not hot):56return "cold_" + str(cnt)57else:58return str(cnt)59
60
61class Monitor(object):62
63"""class which spawns and manages gsyncd workers"""64
65ST_INIT = 'Initializing...'66ST_STARTED = 'Started'67ST_STABLE = 'Active'68ST_FAULTY = 'Faulty'69ST_INCON = 'inconsistent'70_ST_ORD = [ST_STABLE, ST_INIT, ST_FAULTY, ST_INCON]71
72def __init__(self):73self.lock = Lock()74self.state = {}75self.status = {}76
77@staticmethod78def terminate():79# relax one SIGTERM by setting a handler that sets back80# standard handler81set_term_handler(lambda *a: set_term_handler())82# give a chance to graceful exit83errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH])84
85def monitor(self, w, argv, cpids, secondary_vol, secondary_host, primary,86suuid, secondarynodes):87"""the monitor loop88
89Basic logic is a blantantly simple blunt heuristics:
90if spawned client survives 60 secs, it's considered OK.
91This servers us pretty well as it's not vulneralbe to
92any kind of irregular behavior of the child...
93
94... well, except for one: if children is hung up on
95waiting for some event, it can survive aeons, still
96will be defunct. So we tweak the above logic to
97expect the worker to send us a signal within 60 secs
98(in the form of closing its end of a pipe). The worker
99does this when it's done with the setup stage
100ready to enter the service loop (note it's the setup
101stage which is vulnerable to hangs -- the full
102blown worker blows up on EPIPE if the net goes down,
103due to the keep-alive thread)
104"""
105if not self.status.get(w[0]['dir'], None):106self.status[w[0]['dir']] = GeorepStatus(gconf.get("state-file"),107w[0]['host'],108w[0]['dir'],109w[0]['uuid'],110primary,111"%s::%s" % (secondary_host,112secondary_vol))113ret = 0114
115def nwait(p, o=0):116try:117p2, r = waitpid(p, o)118if not p2:119return120return r121except OSError as e:122# no child process, this happens if the child process123# already died and has been cleaned up124if e.errno == ECHILD:125return -1126else:127raise128
129def exit_signalled(s):130""" child terminated due to receipt of SIGUSR1 """131return (os.WIFSIGNALED(s) and (os.WTERMSIG(s) == signal.SIGUSR1))132
133def exit_status(s):134if os.WIFEXITED(s):135return os.WEXITSTATUS(s)136return 1137
138conn_timeout = gconf.get("connection-timeout")139while ret in (0, 1):140remote_user, remote_host = w[1][0].split("@")141remote_id = w[1][1]142# Check the status of the connected secondary node143# If the connected secondary node is down then try to connect to144# different up node.145current_secondary_host = remote_host146secondary_up_hosts = get_up_nodes(secondarynodes, gconf.get("ssh-port"))147
148if (current_secondary_host, remote_id) not in secondary_up_hosts:149if len(secondary_up_hosts) > 0:150remote_new = random.choice(secondary_up_hosts)151remote_host = "%s@%s" % (remote_user, remote_new[0])152remote_id = remote_new[1]153
154# Spawn the worker in lock to avoid fd leak155self.lock.acquire()156
157self.status[w[0]['dir']].set_worker_status(self.ST_INIT)158logging.info(lf('starting gsyncd worker',159brick=w[0]['dir'],160secondary_node=remote_host))161
162pr, pw = pipe()163cpid = os.fork()164if cpid == 0:165os.close(pr)166
167args_to_worker = argv + [168'worker',169rconf.args.primary,170rconf.args.secondary,171'--feedback-fd', str(pw),172'--local-path', w[0]['dir'],173'--local-node', w[0]['host'],174'--local-node-id', w[0]['uuid'],175'--secondary-id', suuid,176'--subvol-num', str(w[2]),177'--resource-remote', remote_host,178'--resource-remote-id', remote_id179]180
181if rconf.args.config_file is not None:182args_to_worker += ['-c', rconf.args.config_file]183
184if w[3]:185args_to_worker.append("--is-hottier")186
187if rconf.args.debug:188args_to_worker.append("--debug")189
190access_mount = gconf.get("access-mount")191if access_mount:192os.execv(sys.executable, args_to_worker)193else:194if unshare_propagation_supported():195logging.debug("Worker would mount volume privately")196unshare_cmd = ['unshare', '-m', '--propagation',197'private']198cmd = unshare_cmd + args_to_worker199os.execvp("unshare", cmd)200else:201logging.debug("Mount is not private. It would be lazy"202" umounted")203os.execv(sys.executable, args_to_worker)204
205cpids.add(cpid)206os.close(pw)207
208self.lock.release()209
210t0 = time.time()211so = select((pr,), (), (), conn_timeout)[0]212os.close(pr)213
214if so:215ret = nwait(cpid, os.WNOHANG)216
217if ret is not None:218logging.info(lf("worker died before establishing "219"connection",220brick=w[0]['dir']))221else:222logging.debug("worker(%s) connected" % w[0]['dir'])223while time.time() < t0 + conn_timeout:224ret = nwait(cpid, os.WNOHANG)225
226if ret is not None:227logging.info(lf("worker died in startup phase",228brick=w[0]['dir']))229break230
231time.sleep(1)232else:233logging.info(234lf("Worker not confirmed after wait, aborting it. "235"Gsyncd invocation on remote secondary via SSH or "236"gluster primary mount might have hung. Please "237"check the above logs for exact issue and check "238"primary or secondary volume for errors. Restarting "239"primary/secondary volume accordingly might help.",240brick=w[0]['dir'],241timeout=conn_timeout))242errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])243ret = nwait(cpid)244if ret is None:245ret = nwait(cpid)246if exit_signalled(ret):247ret = 0248else:249ret = exit_status(ret)250if ret in (0, 1):251self.status[w[0]['dir']].set_worker_status(self.ST_FAULTY)252gf_event(EVENT_GEOREP_FAULTY,253primary_volume=primary.volume,254primary_node=w[0]['host'],255primary_node_id=w[0]['uuid'],256secondary_host=secondary_host,257secondary_volume=secondary_vol,258current_secondary_host=current_secondary_host,259brick_path=w[0]['dir'])260time.sleep(10)261self.status[w[0]['dir']].set_worker_status(self.ST_INCON)262return ret263
264def multiplex(self, wspx, suuid, secondary_vol, secondary_host, primary, secondarynodes):265argv = [os.path.basename(sys.executable), sys.argv[0]]266
267cpids = set()268ta = []269for wx in wspx:270def wmon(w):271cpid, _ = self.monitor(w, argv, cpids, secondary_vol,272secondary_host, primary, suuid, secondarynodes)273time.sleep(1)274self.lock.acquire()275for cpid in cpids:276errno_wrap(os.kill, [cpid, signal.SIGKILL], [ESRCH])277self.lock.release()278finalize(exval=1)279t = Thread(target=wmon, args=[wx])280t.start()281ta.append(t)282
283# monitor status was being updated in each monitor thread. It284# should not be done as it can cause deadlock for a worker start.285# set_monitor_status uses flock to synchronize multple instances286# updating the file. Since each monitor thread forks worker,287# these processes can hold the reference to fd of status288# file causing deadlock to workers which starts later as flock289# will not be release until all references to same fd is closed.290# It will also cause fd leaks.291
292self.lock.acquire()293set_monitor_status(gconf.get("state-file"), self.ST_STARTED)294self.lock.release()295for t in ta:296t.join()297
298
299def distribute(primary, secondary):300if rconf.args.use_gconf_volinfo:301mvol = VolinfoFromGconf(primary.volume, primary=True)302else:303mvol = Volinfo(primary.volume, primary.host, primary=True)304logging.debug('primary bricks: ' + repr(mvol.bricks))305prelude = []306secondary_host = None307secondary_vol = None308
309prelude = [gconf.get("ssh-command")] + \310gconf.get("ssh-options").split() + \311["-p", str(gconf.get("ssh-port"))] + \312[secondary.remote_addr]313
314logging.debug('secondary SSH gateway: ' + secondary.remote_addr)315
316if rconf.args.use_gconf_volinfo:317svol = VolinfoFromGconf(secondary.volume, primary=False)318else:319svol = Volinfo(secondary.volume, "localhost", prelude, primary=False)320
321sbricks = svol.bricks322suuid = svol.uuid323secondary_host = secondary.remote_addr.split('@')[-1]324secondary_vol = secondary.volume325
326# save this xattr for the session delete command327old_stime_xattr_prefix = gconf.get("stime-xattr-prefix", None)328new_stime_xattr_prefix = "trusted.glusterfs." + mvol.uuid + "." + \329svol.uuid330if not old_stime_xattr_prefix or \331old_stime_xattr_prefix != new_stime_xattr_prefix:332gconf.setconfig("stime-xattr-prefix", new_stime_xattr_prefix)333
334logging.debug('secondary bricks: ' + repr(sbricks))335
336secondarynodes = set((b['host'], b["uuid"]) for b in sbricks)337rap = SSH.parse_ssh_address(secondary)338secondarys = [(rap['user'] + '@' + h[0], h[1]) for h in secondarynodes]339
340workerspex = []341for idx, brick in enumerate(mvol.bricks):342if rconf.args.local_node_id == brick['uuid']:343is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']]))344workerspex.append((brick,345secondarys[idx % len(secondarys)],346get_subvol_num(idx, mvol, is_hot),347is_hot))348logging.debug('worker specs: ' + repr(workerspex))349return workerspex, suuid, secondary_vol, secondary_host, primary, secondarynodes350
351
352def monitor(local, remote):353# Check if gsyncd restarted in pause state. If354# yes, send SIGSTOP to negative of monitor pid355# to go back to pause state.356if rconf.args.pause_on_start:357errno_wrap(os.kill, [-os.getpid(), signal.SIGSTOP], [ESRCH])358
359"""oh yeah, actually Monitor is used as singleton, too"""360return Monitor().multiplex(*distribute(local, remote))361
362
363def startup(go_daemon=True):364"""set up logging, pidfile grabbing, daemonization"""365pid_file = gconf.get("pid-file")366if not grabpidfile():367sys.stderr.write("pidfile is taken, exiting.\n")368sys.exit(2)369rconf.pid_file_owned = True370
371if not go_daemon:372return373
374x, y = pipe()375cpid = os.fork()376if cpid:377os.close(x)378sys.exit()379os.close(y)380os.setsid()381dn = os.open(os.devnull, os.O_RDWR)382for f in (sys.stdin, sys.stdout, sys.stderr):383os.dup2(dn, f.fileno())384
385if not grabpidfile(pid_file + '.tmp'):386raise GsyncdError("cannot grab temporary pidfile")387
388os.rename(pid_file + '.tmp', pid_file)389
390# wait for parent to terminate391# so we can start up with392# no messing from the dirty393# ol' bustard394select((x,), (), ())395os.close(x)396