cubefs

Форк
0
/
data_partition.go 
159 строк · 4.0 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package wrapper
16

17
import (
18
	"fmt"
19
	"net"
20
	"strings"
21
	"sync"
22
	"syscall"
23
	"time"
24

25
	"github.com/cubefs/cubefs/proto"
26
	"github.com/cubefs/cubefs/util"
27
	"github.com/cubefs/cubefs/util/log"
28
)
29

30
// DataPartition defines the wrapper of the data partition.
31
type DataPartition struct {
32
	// Will not be changed
33
	proto.DataPartitionResponse
34
	RandomWrite   bool
35
	NearHosts     []string
36
	ClientWrapper *Wrapper
37
	Metrics       *DataPartitionMetrics
38
}
39

40
// DataPartitionMetrics defines the wrapper of the metrics related to the data partition.
41
type DataPartitionMetrics struct {
42
	sync.RWMutex
43
	AvgReadLatencyNano  int64
44
	AvgWriteLatencyNano int64
45
	SumReadLatencyNano  int64
46
	SumWriteLatencyNano int64
47
	ReadOpNum           int64
48
	WriteOpNum          int64
49
}
50

51
func (dp *DataPartition) RecordWrite(startT int64) {
52
	if startT == 0 {
53
		log.LogWarnf("RecordWrite: invalid start time")
54
		return
55
	}
56
	cost := time.Now().UnixNano() - startT
57

58
	dp.Metrics.Lock()
59
	defer dp.Metrics.Unlock()
60

61
	dp.Metrics.WriteOpNum++
62
	dp.Metrics.SumWriteLatencyNano += cost
63

64
	return
65
}
66

67
func (dp *DataPartition) MetricsRefresh() {
68
	dp.Metrics.Lock()
69
	defer dp.Metrics.Unlock()
70

71
	if dp.Metrics.ReadOpNum != 0 {
72
		dp.Metrics.AvgReadLatencyNano = dp.Metrics.SumReadLatencyNano / dp.Metrics.ReadOpNum
73
	} else {
74
		dp.Metrics.AvgReadLatencyNano = 0
75
	}
76

77
	if dp.Metrics.WriteOpNum != 0 {
78
		dp.Metrics.AvgWriteLatencyNano = dp.Metrics.SumWriteLatencyNano / dp.Metrics.WriteOpNum
79
	} else {
80
		dp.Metrics.AvgWriteLatencyNano = 0
81
	}
82

83
	dp.Metrics.SumReadLatencyNano = 0
84
	dp.Metrics.SumWriteLatencyNano = 0
85
	dp.Metrics.ReadOpNum = 0
86
	dp.Metrics.WriteOpNum = 0
87
}
88

89
func (dp *DataPartition) GetAvgRead() int64 {
90
	dp.Metrics.RLock()
91
	defer dp.Metrics.RUnlock()
92

93
	return dp.Metrics.AvgReadLatencyNano
94
}
95

96
func (dp *DataPartition) GetAvgWrite() int64 {
97
	dp.Metrics.RLock()
98
	defer dp.Metrics.RUnlock()
99

100
	return dp.Metrics.AvgWriteLatencyNano
101
}
102

103
type DataPartitionSorter []*DataPartition
104

105
func (ds DataPartitionSorter) Len() int {
106
	return len(ds)
107
}
108

109
func (ds DataPartitionSorter) Swap(i, j int) {
110
	ds[i], ds[j] = ds[j], ds[i]
111
}
112

113
func (ds DataPartitionSorter) Less(i, j int) bool {
114
	return ds[i].Metrics.AvgWriteLatencyNano < ds[j].Metrics.AvgWriteLatencyNano
115
}
116

117
// NewDataPartitionMetrics returns a new DataPartitionMetrics instance.
118
func NewDataPartitionMetrics() *DataPartitionMetrics {
119
	metrics := new(DataPartitionMetrics)
120
	return metrics
121
}
122

123
// String returns the string format of the data partition.
124
func (dp *DataPartition) String() string {
125
	return fmt.Sprintf("PartitionID(%v) Type(%v), Status(%v) ReplicaNum(%v) Hosts(%v) NearHosts(%v)",
126
		dp.PartitionID, dp.PartitionType, dp.Status, dp.ReplicaNum, dp.Hosts, dp.NearHosts)
127
}
128

129
func (dp *DataPartition) CheckAllHostsIsAvail(exclude map[string]struct{}) {
130
	var (
131
		conn net.Conn
132
		err  error
133
	)
134
	for i := 0; i < len(dp.Hosts); i++ {
135
		host := dp.Hosts[i]
136
		if conn, err = util.DailTimeOut(host, proto.ReadDeadlineTime*time.Second); err != nil {
137
			log.LogWarnf("CheckAllHostsIsAvail: dial host (%v) err(%v)", host, err)
138
			if strings.Contains(err.Error(), syscall.ECONNREFUSED.Error()) {
139
				exclude[host] = struct{}{}
140
			}
141
			continue
142
		}
143
		conn.Close()
144
	}
145
}
146

147
// GetAllAddrs returns the addresses of all the replicas of the data partition.
148
func (dp *DataPartition) GetAllAddrs() string {
149
	return strings.Join(dp.Hosts[1:], proto.AddrSplit) + proto.AddrSplit
150
}
151

152
func isExcluded(dp *DataPartition, exclude map[string]struct{}) bool {
153
	for _, host := range dp.Hosts {
154
		if _, exist := exclude[host]; exist {
155
			return true
156
		}
157
	}
158
	return false
159
}
160

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.