cubefs

Форк
0
/
k_faster_random_selector.go 
198 строк · 5.1 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package wrapper
16

17
import (
18
	"fmt"
19
	"math/rand"
20
	"strconv"
21
	"sync"
22
	"time"
23

24
	"github.com/cubefs/cubefs/util/log"
25
)
26

27
const (
28
	KFasterRandomSelectorName = "kfaster"
29
)
30

31
func init() {
32
	_ = RegisterDataPartitionSelector(KFasterRandomSelectorName, newKFasterRandomSelector)
33
}
34

35
func newKFasterRandomSelector(selectorParam string) (selector DataPartitionSelector, e error) {
36
	param, err := strconv.Atoi(selectorParam)
37
	if err != nil {
38
		return nil, fmt.Errorf("KFasterRandomSelector: get param failed[%v]", err)
39
	}
40

41
	if (param <= 0) || (param >= 100) {
42
		return nil, fmt.Errorf("KFasterRandomSelector: invalid param[%v]", param)
43
	}
44

45
	selector = &KFasterRandomSelector{
46
		kValueHundred: param,
47
		partitions:    make([]*DataPartition, 0),
48
	}
49
	log.LogInfof("KFasterRandomSelector: init selector success, kValueHundred is %v", param)
50
	return
51
}
52

53
type KFasterRandomSelector struct {
54
	sync.RWMutex
55
	kValueHundred int
56
	kValue        int
57
	partitions    []*DataPartition
58
}
59

60
func (s *KFasterRandomSelector) Name() string {
61
	return KFasterRandomSelectorName
62
}
63

64
func (s *KFasterRandomSelector) Refresh(partitions []*DataPartition) (err error) {
65
	kValue := (len(partitions)-1)*s.kValueHundred/100 + 1
66
	selectKminDataPartition(partitions, kValue)
67

68
	s.Lock()
69
	defer s.Unlock()
70

71
	s.kValue = kValue
72
	s.partitions = partitions
73
	return
74
}
75

76
func (s *KFasterRandomSelector) Select(exclude map[string]struct{}) (dp *DataPartition, err error) {
77
	s.RLock()
78
	partitions := s.partitions
79
	kValue := s.kValue
80
	s.RUnlock()
81

82
	if len(partitions) == 0 {
83
		log.LogError("KFasterRandomSelector: no writable data partition with empty partitions")
84
		return nil, fmt.Errorf("no writable data partition")
85
	}
86

87
	// select random dataPartition from fasterRwPartitions
88
	rand.Seed(time.Now().UnixNano())
89
	index := rand.Intn(kValue)
90
	dp = partitions[index]
91
	if !isExcluded(dp, exclude) {
92
		log.LogDebugf("KFasterRandomSelector: select faster dp[%v], index %v, kValue(%v/%v)",
93
			dp, index, kValue, len(partitions))
94
		return dp, nil
95
	}
96

97
	log.LogWarnf("KFasterRandomSelector: first random fasterRwPartition was excluded, get partition from other faster")
98

99
	// if partitions[index] is excluded, select next in fasterRwPartitions
100
	for i := 1; i < kValue; i++ {
101
		dp = partitions[(index+i)%kValue]
102
		if !isExcluded(dp, exclude) {
103
			log.LogDebugf("KFasterRandomSelector: select faster dp[%v], index %v, kValue(%v/%v)",
104
				dp, (index+i)%kValue, kValue, len(partitions))
105
			return dp, nil
106
		}
107
	}
108

109
	log.LogWarnf("KFasterRandomSelector: all fasterRwPartitions were excluded, get partition from slower")
110

111
	// if all fasterRwPartitions are excluded, select random dataPartition in slowerRwPartitions
112
	slowerRwPartitionsNum := len(partitions) - kValue
113
	for i := 0; i < slowerRwPartitionsNum; i++ {
114
		dp = partitions[(index+i)%slowerRwPartitionsNum+kValue]
115
		if !isExcluded(dp, exclude) {
116
			log.LogDebugf("KFasterRandomSelector: select slower dp[%v], index %v, kValue(%v/%v)",
117
				dp, (index+i)%slowerRwPartitionsNum+kValue, kValue, len(partitions))
118
			return dp, nil
119
		}
120
	}
121
	log.LogErrorf("KFasterRandomSelector: no writable data partition with %v partitions and exclude(%v)",
122
		len(partitions), exclude)
123
	return nil, fmt.Errorf("no writable data partition")
124
}
125

126
func (s *KFasterRandomSelector) RemoveDP(partitionID uint64) {
127
	s.RLock()
128
	partitions := s.partitions
129
	s.RUnlock()
130

131
	var i int
132
	for i = 0; i < len(partitions); i++ {
133
		if partitions[i].PartitionID == partitionID {
134
			break
135
		}
136
	}
137
	if i >= len(partitions) {
138
		return
139
	}
140
	newRwPartition := make([]*DataPartition, 0)
141
	newRwPartition = append(newRwPartition, partitions[:i]...)
142
	newRwPartition = append(newRwPartition, partitions[i+1:]...)
143

144
	s.Refresh(newRwPartition)
145

146
	return
147
}
148

149
func (s *KFasterRandomSelector) Count() int {
150
	s.RLock()
151
	defer s.RUnlock()
152
	return len(s.partitions)
153
}
154

155
func swap(s []*DataPartition, i int, j int) {
156
	s[i], s[j] = s[j], s[i]
157
}
158

159
func partByPrivot(partitions []*DataPartition, low, high int) int {
160
	var i, j int
161
	for {
162
		for i = low + 1; i < high; i++ {
163
			if partitions[i].GetAvgWrite() > partitions[low].GetAvgWrite() {
164
				break
165
			}
166
		}
167
		for j = high; j > low; j-- {
168
			if partitions[j].GetAvgWrite() <= partitions[low].GetAvgWrite() {
169
				break
170
			}
171
		}
172
		if i >= j {
173
			break
174
		}
175
		swap(partitions, i, j)
176
	}
177
	if low != j {
178
		swap(partitions, low, j)
179
	}
180
	return j
181
}
182

183
func selectKminDataPartition(partitions []*DataPartition, k int) int {
184
	if len(partitions) <= 1 {
185
		return k
186
	}
187
	low, high := 0, len(partitions)-1
188
	for {
189
		privot := partByPrivot(partitions, low, high)
190
		if privot < k {
191
			low = privot + 1
192
		} else if privot > k {
193
			high = privot - 1
194
		} else {
195
			return k
196
		}
197
	}
198
}
199

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.