1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
25
"github.com/cubefs/cubefs/proto"
26
"github.com/cubefs/cubefs/util"
27
"github.com/cubefs/cubefs/util/log"
30
// DataPartition defines the wrapper of the data partition.
31
type DataPartition struct {
32
// Will not be changed
33
proto.DataPartitionResponse
36
ClientWrapper *Wrapper
37
Metrics *DataPartitionMetrics
40
// DataPartitionMetrics defines the wrapper of the metrics related to the data partition.
41
type DataPartitionMetrics struct {
43
AvgReadLatencyNano int64
44
AvgWriteLatencyNano int64
45
SumReadLatencyNano int64
46
SumWriteLatencyNano int64
51
func (dp *DataPartition) RecordWrite(startT int64) {
53
log.LogWarnf("RecordWrite: invalid start time")
56
cost := time.Now().UnixNano() - startT
59
defer dp.Metrics.Unlock()
61
dp.Metrics.WriteOpNum++
62
dp.Metrics.SumWriteLatencyNano += cost
67
func (dp *DataPartition) MetricsRefresh() {
69
defer dp.Metrics.Unlock()
71
if dp.Metrics.ReadOpNum != 0 {
72
dp.Metrics.AvgReadLatencyNano = dp.Metrics.SumReadLatencyNano / dp.Metrics.ReadOpNum
74
dp.Metrics.AvgReadLatencyNano = 0
77
if dp.Metrics.WriteOpNum != 0 {
78
dp.Metrics.AvgWriteLatencyNano = dp.Metrics.SumWriteLatencyNano / dp.Metrics.WriteOpNum
80
dp.Metrics.AvgWriteLatencyNano = 0
83
dp.Metrics.SumReadLatencyNano = 0
84
dp.Metrics.SumWriteLatencyNano = 0
85
dp.Metrics.ReadOpNum = 0
86
dp.Metrics.WriteOpNum = 0
89
func (dp *DataPartition) GetAvgRead() int64 {
91
defer dp.Metrics.RUnlock()
93
return dp.Metrics.AvgReadLatencyNano
96
func (dp *DataPartition) GetAvgWrite() int64 {
98
defer dp.Metrics.RUnlock()
100
return dp.Metrics.AvgWriteLatencyNano
103
type DataPartitionSorter []*DataPartition
105
func (ds DataPartitionSorter) Len() int {
109
func (ds DataPartitionSorter) Swap(i, j int) {
110
ds[i], ds[j] = ds[j], ds[i]
113
func (ds DataPartitionSorter) Less(i, j int) bool {
114
return ds[i].Metrics.AvgWriteLatencyNano < ds[j].Metrics.AvgWriteLatencyNano
117
// NewDataPartitionMetrics returns a new DataPartitionMetrics instance.
118
func NewDataPartitionMetrics() *DataPartitionMetrics {
119
metrics := new(DataPartitionMetrics)
123
// String returns the string format of the data partition.
124
func (dp *DataPartition) String() string {
125
return fmt.Sprintf("PartitionID(%v) Type(%v), Status(%v) ReplicaNum(%v) Hosts(%v) NearHosts(%v)",
126
dp.PartitionID, dp.PartitionType, dp.Status, dp.ReplicaNum, dp.Hosts, dp.NearHosts)
129
func (dp *DataPartition) CheckAllHostsIsAvail(exclude map[string]struct{}) {
134
for i := 0; i < len(dp.Hosts); i++ {
136
if conn, err = util.DailTimeOut(host, proto.ReadDeadlineTime*time.Second); err != nil {
137
log.LogWarnf("CheckAllHostsIsAvail: dial host (%v) err(%v)", host, err)
138
if strings.Contains(err.Error(), syscall.ECONNREFUSED.Error()) {
139
exclude[host] = struct{}{}
147
// GetAllAddrs returns the addresses of all the replicas of the data partition.
148
func (dp *DataPartition) GetAllAddrs() string {
149
return strings.Join(dp.Hosts[1:], proto.AddrSplit) + proto.AddrSplit
152
func isExcluded(dp *DataPartition, exclude map[string]struct{}) bool {
153
for _, host := range dp.Hosts {
154
if _, exist := exclude[host]; exist {