glusterfs
419 строк · 15.3 Кб
1#!/usr/bin/python3
2#
3# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
4# This file is part of GlusterFS.
5
6# This file is licensed to you under your choice of the GNU Lesser
7# General Public License, version 3 or any later version (LGPLv3 or
8# later), or the GNU General Public License, version 2 (GPLv2), in all
9# cases as published by the Free Software Foundation.
10#
11
12from __future__ import print_function13import fcntl14import os15import tempfile16try:17import urllib.parse as urllib18except ImportError:19import urllib20import json21import time22from datetime import datetime23from errno import EACCES, EAGAIN, ENOENT24import logging25
26from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event,27EVENT_GEOREP_CHECKPOINT_COMPLETED, lf)28
29DEFAULT_STATUS = "N/A"30MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")31STATUS_VALUES = (DEFAULT_STATUS,32"Initializing...",33"Active",34"Passive",35"Faulty")36
37CRAWL_STATUS_VALUES = (DEFAULT_STATUS,38"Hybrid Crawl",39"History Crawl",40"Changelog Crawl")41
42
43def human_time(ts):44try:45return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")46except ValueError:47return DEFAULT_STATUS48
49
50def human_time_utc(ts):51try:52return datetime.utcfromtimestamp(53float(ts)).strftime("%Y-%m-%d %H:%M:%S")54except ValueError:55return DEFAULT_STATUS56
57
58def get_default_values():59return {60"secondary_node": DEFAULT_STATUS,61"worker_status": DEFAULT_STATUS,62"last_synced": 0,63"last_synced_entry": 0,64"crawl_status": DEFAULT_STATUS,65"entry": 0,66"data": 0,67"meta": 0,68"failures": 0,69"checkpoint_completed": DEFAULT_STATUS,70"checkpoint_time": 0,71"checkpoint_completion_time": 0}72
73
74class LockedOpen(object):75
76def __init__(self, filename, *args, **kwargs):77self.filename = filename78self.open_args = args79self.open_kwargs = kwargs80self.fileobj = None81
82def __enter__(self):83"""84If two processes compete to update a file, The first process
85gets the lock and the second process is blocked in the fcntl.flock()
86call. When first process replaces the file and releases the lock,
87the already open file descriptor in the second process now points
88to a "ghost" file(not reachable by any path name) with old contents.
89To avoid that conflict, check the fd already opened is same or
90not. Open new one if not same
91"""
92f = open(self.filename, *self.open_args, **self.open_kwargs)93while True:94fcntl.flock(f, fcntl.LOCK_EX)95fnew = open(self.filename, *self.open_args, **self.open_kwargs)96if os.path.sameopenfile(f.fileno(), fnew.fileno()):97fnew.close()98break99else:100f.close()101f = fnew102self.fileobj = f103return f104
105def __exit__(self, _exc_type, _exc_value, _traceback):106fcntl.flock(self.fileobj, fcntl.LOCK_UN)107self.fileobj.close()108
109
110def set_monitor_status(status_file, status):111fd = os.open(status_file, os.O_CREAT | os.O_RDWR)112os.close(fd)113with LockedOpen(status_file, 'r+'):114with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),115delete=False) as tf:116tf.write(status)117tempname = tf.name118
119os.rename(tempname, status_file)120dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),121os.O_DIRECTORY)122os.fsync(dirfd)123os.close(dirfd)124
125
126class GeorepStatus(object):127def __init__(self, monitor_status_file, primary_node, brick, primary_node_id,128primary, secondary, monitor_pid_file=None):129self.primary = primary130slv_data = secondary.split("::")131self.secondary_host = slv_data[0]132self.secondary_volume = slv_data[1].split(":")[0] # Remove Secondary UUID133self.work_dir = os.path.dirname(monitor_status_file)134self.monitor_status_file = monitor_status_file135self.filename = os.path.join(self.work_dir,136"brick_%s.status"137% urllib.quote_plus(brick))138
139fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)140os.close(fd)141fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)142os.close(fd)143self.primary_node = primary_node144self.primary_node_id = primary_node_id145self.brick = brick146self.default_values = get_default_values()147self.monitor_pid_file = monitor_pid_file148
149def send_event(self, event_type, **kwargs):150gf_event(event_type,151primary_volume=self.primary,152primary_node=self.primary_node,153primary_node_id=self.primary_node_id,154secondary_host=self.secondary_host,155secondary_volume=self.secondary_volume,156brick_path=self.brick,157**kwargs)158
159def _update(self, mergerfunc):160data = self.default_values161with LockedOpen(self.filename, 'r+') as f:162try:163data.update(json.load(f))164except ValueError:165pass166
167data = mergerfunc(data)168# If Data is not changed by merger func169if not data:170return False171
172with tempfile.NamedTemporaryFile(173'w',174dir=os.path.dirname(self.filename),175delete=False) as tf:176tf.write(data)177tempname = tf.name178
179os.rename(tempname, self.filename)180dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),181os.O_DIRECTORY)182os.fsync(dirfd)183os.close(dirfd)184return True185
186def reset_on_worker_start(self):187def merger(data):188data["secondary_node"] = DEFAULT_STATUS189data["crawl_status"] = DEFAULT_STATUS190data["entry"] = 0191data["data"] = 0192data["meta"] = 0193return json.dumps(data)194
195self._update(merger)196
197def set_field(self, key, value):198def merger(data):199# Current data and prev data is same200if data[key] == value:201return {}202
203data[key] = value204return json.dumps(data)205
206return self._update(merger)207
208def trigger_gf_event_checkpoint_completion(self, checkpoint_time,209checkpoint_completion_time):210self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,211checkpoint_time=checkpoint_time,212checkpoint_completion_time=checkpoint_completion_time)213
214def set_last_synced(self, value, checkpoint_time):215def merger(data):216data["last_synced"] = value[0]217
218# If checkpoint is not set or reset219# or if last set checkpoint is changed220if checkpoint_time == 0 or \221checkpoint_time != data["checkpoint_time"]:222data["checkpoint_time"] = 0223data["checkpoint_completion_time"] = 0224data["checkpoint_completed"] = "No"225
226# If checkpoint is completed and not marked as completed227# previously then update the checkpoint completed time228if checkpoint_time > 0 and checkpoint_time <= value[0]:229if data["checkpoint_completed"] == "No":230curr_time = int(time.time())231data["checkpoint_time"] = checkpoint_time232data["checkpoint_completion_time"] = curr_time233data["checkpoint_completed"] = "Yes"234logging.info(lf("Checkpoint completed",235checkpoint_time=human_time_utc(236checkpoint_time),237completion_time=human_time_utc(curr_time)))238self.trigger_gf_event_checkpoint_completion(239checkpoint_time, curr_time)240
241return json.dumps(data)242
243self._update(merger)244
245def set_worker_status(self, status):246if self.set_field("worker_status", status):247logging.info(lf("Worker Status Change",248status=status))249
250def set_worker_crawl_status(self, status):251if self.set_field("crawl_status", status):252logging.info(lf("Crawl Status Change",253status=status))254
255def set_secondary_node(self, secondary_node):256def merger(data):257data["secondary_node"] = secondary_node258return json.dumps(data)259
260self._update(merger)261
262def inc_value(self, key, value):263def merger(data):264data[key] = data.get(key, 0) + value265return json.dumps(data)266
267self._update(merger)268
269def dec_value(self, key, value):270def merger(data):271data[key] = data.get(key, 0) - value272if data[key] < 0:273data[key] = 0274return json.dumps(data)275
276self._update(merger)277
278def set_active(self):279if self.set_field("worker_status", "Active"):280logging.info(lf("Worker Status Change",281status="Active"))282self.send_event(EVENT_GEOREP_ACTIVE)283
284def set_passive(self):285if self.set_field("worker_status", "Passive"):286logging.info(lf("Worker Status Change",287status="Passive"))288self.send_event(EVENT_GEOREP_PASSIVE)289
290def get_monitor_status(self):291data = ""292with open(self.monitor_status_file, "r") as f:293data = f.read().strip()294return data295
296def get_status(self, checkpoint_time=0):297"""298Monitor Status ---> Created Started Paused Stopped
299----------------------------------------------------------------------
300secondary_node N/A VALUE VALUE N/A
301status Created VALUE Paused Stopped
302last_synced N/A VALUE VALUE VALUE
303last_synced_entry N/A VALUE VALUE VALUE
304crawl_status N/A VALUE N/A N/A
305entry N/A VALUE N/A N/A
306data N/A VALUE N/A N/A
307meta N/A VALUE N/A N/A
308failures N/A VALUE VALUE VALUE
309checkpoint_completed N/A VALUE VALUE VALUE
310checkpoint_time N/A VALUE VALUE VALUE
311checkpoint_completed_time N/A VALUE VALUE VALUE
312"""
313data = self.default_values314with open(self.filename) as f:315try:316data.update(json.load(f))317except ValueError:318pass319monitor_status = self.get_monitor_status()320
321# Verifying whether monitor process running and adjusting status322if monitor_status in ["Started", "Paused"]:323try:324with open(self.monitor_pid_file, "r+") as f:325fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)326monitor_status = "Stopped"327except (IOError, OSError) as e:328# If pid file not exists, either monitor died or Geo-rep329# not even started once330if e.errno == ENOENT:331monitor_status = "Stopped"332elif e.errno in (EACCES, EAGAIN):333# cannot grab. so, monitor process still running..move on334pass335else:336raise337
338if monitor_status in ["Created", "Paused", "Stopped"]:339data["worker_status"] = monitor_status340
341if monitor_status == "":342data["worker_status"] = "Stopped"343
344# Checkpoint adjustments345if checkpoint_time == 0:346data["checkpoint_completed"] = DEFAULT_STATUS347data["checkpoint_time"] = DEFAULT_STATUS348data["checkpoint_completion_time"] = DEFAULT_STATUS349else:350if checkpoint_time != data["checkpoint_time"]:351if checkpoint_time <= data["last_synced"]:352data["checkpoint_completed"] = "Yes"353data["checkpoint_time"] = checkpoint_time354data["checkpoint_completion_time"] = data["last_synced"]355else:356data["checkpoint_completed"] = "No"357data["checkpoint_time"] = checkpoint_time358data["checkpoint_completion_time"] = DEFAULT_STATUS359
360if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:361chkpt_time = data["checkpoint_time"]362data["checkpoint_time"] = human_time(chkpt_time)363data["checkpoint_time_utc"] = human_time_utc(chkpt_time)364
365if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:366chkpt_completion_time = data["checkpoint_completion_time"]367data["checkpoint_completion_time"] = human_time(368chkpt_completion_time)369data["checkpoint_completion_time_utc"] = human_time_utc(370chkpt_completion_time)371
372if data["last_synced"] == 0:373data["last_synced"] = DEFAULT_STATUS374data["last_synced_utc"] = DEFAULT_STATUS375else:376last_synced = data["last_synced"]377data["last_synced"] = human_time(last_synced)378data["last_synced_utc"] = human_time_utc(last_synced)379
380if data["worker_status"] != "Active":381data["last_synced"] = DEFAULT_STATUS382data["last_synced_utc"] = DEFAULT_STATUS383data["crawl_status"] = DEFAULT_STATUS384data["entry"] = DEFAULT_STATUS385data["data"] = DEFAULT_STATUS386data["meta"] = DEFAULT_STATUS387data["failures"] = DEFAULT_STATUS388data["checkpoint_completed"] = DEFAULT_STATUS389data["checkpoint_time"] = DEFAULT_STATUS390data["checkpoint_completed_time"] = DEFAULT_STATUS391data["checkpoint_time_utc"] = DEFAULT_STATUS392data["checkpoint_completion_time_utc"] = DEFAULT_STATUS393
394if data["worker_status"] not in ["Active", "Passive"]:395data["secondary_node"] = DEFAULT_STATUS396
397if data.get("last_synced_utc", 0) == 0:398data["last_synced_utc"] = DEFAULT_STATUS399
400if data.get("checkpoint_completion_time_utc", 0) == 0:401data["checkpoint_completion_time_utc"] = DEFAULT_STATUS402
403if data.get("checkpoint_time_utc", 0) == 0:404data["checkpoint_time_utc"] = DEFAULT_STATUS405
406return data407
408def print_status(self, checkpoint_time=0, json_output=False):409status_out = self.get_status(checkpoint_time)410if json_output:411out = {}412# Convert all values as string413for k, v in status_out.items():414out[k] = str(v)415print(json.dumps(out))416return417
418for key, value in status_out.items():419print(("%s: %s" % (key, value)))420