glusterfs

Форк
0
419 строк · 15.3 Кб
1
#!/usr/bin/python3
2
#
3
# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
4
# This file is part of GlusterFS.
5

6
# This file is licensed to you under your choice of the GNU Lesser
7
# General Public License, version 3 or any later version (LGPLv3 or
8
# later), or the GNU General Public License, version 2 (GPLv2), in all
9
# cases as published by the Free Software Foundation.
10
#
11

12
from __future__ import print_function
13
import fcntl
14
import os
15
import tempfile
16
try:
17
    import urllib.parse as urllib
18
except ImportError:
19
    import urllib
20
import json
21
import time
22
from datetime import datetime
23
from errno import EACCES, EAGAIN, ENOENT
24
import logging
25

26
from syncdutils import (EVENT_GEOREP_ACTIVE, EVENT_GEOREP_PASSIVE, gf_event,
27
                        EVENT_GEOREP_CHECKPOINT_COMPLETED, lf)
28

29
DEFAULT_STATUS = "N/A"
30
MONITOR_STATUS = ("Created", "Started", "Paused", "Stopped")
31
STATUS_VALUES = (DEFAULT_STATUS,
32
                 "Initializing...",
33
                 "Active",
34
                 "Passive",
35
                 "Faulty")
36

37
CRAWL_STATUS_VALUES = (DEFAULT_STATUS,
38
                       "Hybrid Crawl",
39
                       "History Crawl",
40
                       "Changelog Crawl")
41

42

43
def human_time(ts):
44
    try:
45
        return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
46
    except ValueError:
47
        return DEFAULT_STATUS
48

49

50
def human_time_utc(ts):
51
    try:
52
        return datetime.utcfromtimestamp(
53
            float(ts)).strftime("%Y-%m-%d %H:%M:%S")
54
    except ValueError:
55
        return DEFAULT_STATUS
56

57

58
def get_default_values():
59
    return {
60
        "secondary_node": DEFAULT_STATUS,
61
        "worker_status": DEFAULT_STATUS,
62
        "last_synced": 0,
63
        "last_synced_entry": 0,
64
        "crawl_status": DEFAULT_STATUS,
65
        "entry": 0,
66
        "data": 0,
67
        "meta": 0,
68
        "failures": 0,
69
        "checkpoint_completed": DEFAULT_STATUS,
70
        "checkpoint_time": 0,
71
        "checkpoint_completion_time": 0}
72

73

74
class LockedOpen(object):
75

76
    def __init__(self, filename, *args, **kwargs):
77
        self.filename = filename
78
        self.open_args = args
79
        self.open_kwargs = kwargs
80
        self.fileobj = None
81

82
    def __enter__(self):
83
        """
84
        If two processes compete to update a file, The first process
85
        gets the lock and the second process is blocked in the fcntl.flock()
86
        call. When first process replaces the file and releases the lock,
87
        the already open file descriptor in the second process now points
88
        to a  "ghost" file(not reachable by any path name) with old contents.
89
        To avoid that conflict, check the fd already opened is same or
90
        not. Open new one if not same
91
        """
92
        f = open(self.filename, *self.open_args, **self.open_kwargs)
93
        while True:
94
            fcntl.flock(f, fcntl.LOCK_EX)
95
            fnew = open(self.filename, *self.open_args, **self.open_kwargs)
96
            if os.path.sameopenfile(f.fileno(), fnew.fileno()):
97
                fnew.close()
98
                break
99
            else:
100
                f.close()
101
                f = fnew
102
        self.fileobj = f
103
        return f
104

105
    def __exit__(self, _exc_type, _exc_value, _traceback):
106
        fcntl.flock(self.fileobj, fcntl.LOCK_UN)
107
        self.fileobj.close()
108

109

110
def set_monitor_status(status_file, status):
111
    fd = os.open(status_file, os.O_CREAT | os.O_RDWR)
112
    os.close(fd)
113
    with LockedOpen(status_file, 'r+'):
114
        with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(status_file),
115
                                         delete=False) as tf:
116
            tf.write(status)
117
            tempname = tf.name
118

119
        os.rename(tempname, status_file)
120
        dirfd = os.open(os.path.dirname(os.path.abspath(status_file)),
121
                        os.O_DIRECTORY)
122
        os.fsync(dirfd)
123
        os.close(dirfd)
124

125

126
class GeorepStatus(object):
127
    def __init__(self, monitor_status_file, primary_node, brick, primary_node_id,
128
                 primary, secondary, monitor_pid_file=None):
129
        self.primary = primary
130
        slv_data = secondary.split("::")
131
        self.secondary_host = slv_data[0]
132
        self.secondary_volume = slv_data[1].split(":")[0]  # Remove Secondary UUID
133
        self.work_dir = os.path.dirname(monitor_status_file)
134
        self.monitor_status_file = monitor_status_file
135
        self.filename = os.path.join(self.work_dir,
136
                                     "brick_%s.status"
137
                                     % urllib.quote_plus(brick))
138

139
        fd = os.open(self.filename, os.O_CREAT | os.O_RDWR)
140
        os.close(fd)
141
        fd = os.open(self.monitor_status_file, os.O_CREAT | os.O_RDWR)
142
        os.close(fd)
143
        self.primary_node = primary_node
144
        self.primary_node_id = primary_node_id
145
        self.brick = brick
146
        self.default_values = get_default_values()
147
        self.monitor_pid_file = monitor_pid_file
148

149
    def send_event(self, event_type, **kwargs):
150
        gf_event(event_type,
151
                 primary_volume=self.primary,
152
                 primary_node=self.primary_node,
153
                 primary_node_id=self.primary_node_id,
154
                 secondary_host=self.secondary_host,
155
                 secondary_volume=self.secondary_volume,
156
                 brick_path=self.brick,
157
                 **kwargs)
158

159
    def _update(self, mergerfunc):
160
        data = self.default_values
161
        with LockedOpen(self.filename, 'r+') as f:
162
            try:
163
                data.update(json.load(f))
164
            except ValueError:
165
                pass
166

167
            data = mergerfunc(data)
168
            # If Data is not changed by merger func
169
            if not data:
170
                return False
171

172
            with tempfile.NamedTemporaryFile(
173
                    'w',
174
                    dir=os.path.dirname(self.filename),
175
                    delete=False) as tf:
176
                tf.write(data)
177
                tempname = tf.name
178

179
            os.rename(tempname, self.filename)
180
            dirfd = os.open(os.path.dirname(os.path.abspath(self.filename)),
181
                            os.O_DIRECTORY)
182
            os.fsync(dirfd)
183
            os.close(dirfd)
184
            return True
185

186
    def reset_on_worker_start(self):
187
        def merger(data):
188
            data["secondary_node"] = DEFAULT_STATUS
189
            data["crawl_status"] = DEFAULT_STATUS
190
            data["entry"] = 0
191
            data["data"] = 0
192
            data["meta"] = 0
193
            return json.dumps(data)
194

195
        self._update(merger)
196

197
    def set_field(self, key, value):
198
        def merger(data):
199
            # Current data and prev data is same
200
            if data[key] == value:
201
                return {}
202

203
            data[key] = value
204
            return json.dumps(data)
205

206
        return self._update(merger)
207

208
    def trigger_gf_event_checkpoint_completion(self, checkpoint_time,
209
                                               checkpoint_completion_time):
210
        self.send_event(EVENT_GEOREP_CHECKPOINT_COMPLETED,
211
                        checkpoint_time=checkpoint_time,
212
                        checkpoint_completion_time=checkpoint_completion_time)
213

214
    def set_last_synced(self, value, checkpoint_time):
215
        def merger(data):
216
            data["last_synced"] = value[0]
217

218
            # If checkpoint is not set or reset
219
            # or if last set checkpoint is changed
220
            if checkpoint_time == 0 or \
221
               checkpoint_time != data["checkpoint_time"]:
222
                data["checkpoint_time"] = 0
223
                data["checkpoint_completion_time"] = 0
224
                data["checkpoint_completed"] = "No"
225

226
            # If checkpoint is completed and not marked as completed
227
            # previously then update the checkpoint completed time
228
            if checkpoint_time > 0 and checkpoint_time <= value[0]:
229
                if data["checkpoint_completed"] == "No":
230
                    curr_time = int(time.time())
231
                    data["checkpoint_time"] = checkpoint_time
232
                    data["checkpoint_completion_time"] = curr_time
233
                    data["checkpoint_completed"] = "Yes"
234
                    logging.info(lf("Checkpoint completed",
235
                                    checkpoint_time=human_time_utc(
236
                                        checkpoint_time),
237
                                    completion_time=human_time_utc(curr_time)))
238
                    self.trigger_gf_event_checkpoint_completion(
239
                        checkpoint_time, curr_time)
240

241
            return json.dumps(data)
242

243
        self._update(merger)
244

245
    def set_worker_status(self, status):
246
        if self.set_field("worker_status", status):
247
            logging.info(lf("Worker Status Change",
248
                            status=status))
249

250
    def set_worker_crawl_status(self, status):
251
        if self.set_field("crawl_status", status):
252
            logging.info(lf("Crawl Status Change",
253
                            status=status))
254

255
    def set_secondary_node(self, secondary_node):
256
        def merger(data):
257
            data["secondary_node"] = secondary_node
258
            return json.dumps(data)
259

260
        self._update(merger)
261

262
    def inc_value(self, key, value):
263
        def merger(data):
264
            data[key] = data.get(key, 0) + value
265
            return json.dumps(data)
266

267
        self._update(merger)
268

269
    def dec_value(self, key, value):
270
        def merger(data):
271
            data[key] = data.get(key, 0) - value
272
            if data[key] < 0:
273
                data[key] = 0
274
            return json.dumps(data)
275

276
        self._update(merger)
277

278
    def set_active(self):
279
        if self.set_field("worker_status", "Active"):
280
            logging.info(lf("Worker Status Change",
281
                            status="Active"))
282
            self.send_event(EVENT_GEOREP_ACTIVE)
283

284
    def set_passive(self):
285
        if self.set_field("worker_status", "Passive"):
286
            logging.info(lf("Worker Status Change",
287
                            status="Passive"))
288
            self.send_event(EVENT_GEOREP_PASSIVE)
289

290
    def get_monitor_status(self):
291
        data = ""
292
        with open(self.monitor_status_file, "r") as f:
293
            data = f.read().strip()
294
        return data
295

296
    def get_status(self, checkpoint_time=0):
297
        """
298
        Monitor Status --->        Created    Started  Paused      Stopped
299
        ----------------------------------------------------------------------
300
        secondary_node                 N/A        VALUE    VALUE       N/A
301
        status                     Created    VALUE    Paused      Stopped
302
        last_synced                N/A        VALUE    VALUE       VALUE
303
        last_synced_entry          N/A        VALUE    VALUE       VALUE
304
        crawl_status               N/A        VALUE    N/A         N/A
305
        entry                      N/A        VALUE    N/A         N/A
306
        data                       N/A        VALUE    N/A         N/A
307
        meta                       N/A        VALUE    N/A         N/A
308
        failures                   N/A        VALUE    VALUE       VALUE
309
        checkpoint_completed       N/A        VALUE    VALUE       VALUE
310
        checkpoint_time            N/A        VALUE    VALUE       VALUE
311
        checkpoint_completed_time  N/A        VALUE    VALUE       VALUE
312
        """
313
        data = self.default_values
314
        with open(self.filename) as f:
315
            try:
316
                data.update(json.load(f))
317
            except ValueError:
318
                pass
319
        monitor_status = self.get_monitor_status()
320

321
        # Verifying whether monitor process running and adjusting status
322
        if monitor_status in ["Started", "Paused"]:
323
            try:
324
                with open(self.monitor_pid_file, "r+") as f:
325
                    fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
326
                    monitor_status = "Stopped"
327
            except (IOError, OSError) as e:
328
                # If pid file not exists, either monitor died or Geo-rep
329
                # not even started once
330
                if e.errno == ENOENT:
331
                    monitor_status = "Stopped"
332
                elif e.errno in (EACCES, EAGAIN):
333
                    # cannot grab. so, monitor process still running..move on
334
                    pass
335
                else:
336
                    raise
337

338
        if monitor_status in ["Created", "Paused", "Stopped"]:
339
            data["worker_status"] = monitor_status
340

341
        if monitor_status == "":
342
            data["worker_status"] = "Stopped"
343

344
        # Checkpoint adjustments
345
        if checkpoint_time == 0:
346
            data["checkpoint_completed"] = DEFAULT_STATUS
347
            data["checkpoint_time"] = DEFAULT_STATUS
348
            data["checkpoint_completion_time"] = DEFAULT_STATUS
349
        else:
350
            if checkpoint_time != data["checkpoint_time"]:
351
                if checkpoint_time <= data["last_synced"]:
352
                    data["checkpoint_completed"] = "Yes"
353
                    data["checkpoint_time"] = checkpoint_time
354
                    data["checkpoint_completion_time"] = data["last_synced"]
355
                else:
356
                    data["checkpoint_completed"] = "No"
357
                    data["checkpoint_time"] = checkpoint_time
358
                    data["checkpoint_completion_time"] = DEFAULT_STATUS
359

360
        if data["checkpoint_time"] not in [0, DEFAULT_STATUS]:
361
            chkpt_time = data["checkpoint_time"]
362
            data["checkpoint_time"] = human_time(chkpt_time)
363
            data["checkpoint_time_utc"] = human_time_utc(chkpt_time)
364

365
        if data["checkpoint_completion_time"] not in [0, DEFAULT_STATUS]:
366
            chkpt_completion_time = data["checkpoint_completion_time"]
367
            data["checkpoint_completion_time"] = human_time(
368
                chkpt_completion_time)
369
            data["checkpoint_completion_time_utc"] = human_time_utc(
370
                chkpt_completion_time)
371

372
        if data["last_synced"] == 0:
373
            data["last_synced"] = DEFAULT_STATUS
374
            data["last_synced_utc"] = DEFAULT_STATUS
375
        else:
376
            last_synced = data["last_synced"]
377
            data["last_synced"] = human_time(last_synced)
378
            data["last_synced_utc"] = human_time_utc(last_synced)
379

380
        if data["worker_status"] != "Active":
381
            data["last_synced"] = DEFAULT_STATUS
382
            data["last_synced_utc"] = DEFAULT_STATUS
383
            data["crawl_status"] = DEFAULT_STATUS
384
            data["entry"] = DEFAULT_STATUS
385
            data["data"] = DEFAULT_STATUS
386
            data["meta"] = DEFAULT_STATUS
387
            data["failures"] = DEFAULT_STATUS
388
            data["checkpoint_completed"] = DEFAULT_STATUS
389
            data["checkpoint_time"] = DEFAULT_STATUS
390
            data["checkpoint_completed_time"] = DEFAULT_STATUS
391
            data["checkpoint_time_utc"] = DEFAULT_STATUS
392
            data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
393

394
        if data["worker_status"] not in ["Active", "Passive"]:
395
            data["secondary_node"] = DEFAULT_STATUS
396

397
        if data.get("last_synced_utc", 0) == 0:
398
            data["last_synced_utc"] = DEFAULT_STATUS
399

400
        if data.get("checkpoint_completion_time_utc", 0) == 0:
401
            data["checkpoint_completion_time_utc"] = DEFAULT_STATUS
402

403
        if data.get("checkpoint_time_utc", 0) == 0:
404
            data["checkpoint_time_utc"] = DEFAULT_STATUS
405

406
        return data
407

408
    def print_status(self, checkpoint_time=0, json_output=False):
409
        status_out = self.get_status(checkpoint_time)
410
        if json_output:
411
            out = {}
412
            # Convert all values as string
413
            for k, v in status_out.items():
414
                out[k] = str(v)
415
            print(json.dumps(out))
416
            return
417

418
        for key, value in status_out.items():
419
            print(("%s: %s" % (key, value)))
420

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.