cubefs

Форк
0
/
default_random_selector.go 
172 строки · 4.4 Кб
1
// Copyright 2020 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package wrapper
16

17
import (
18
	"fmt"
19
	"math/rand"
20
	"strings"
21
	"sync"
22
	"time"
23

24
	"github.com/cubefs/cubefs/util/log"
25
)
26

27
const (
28
	DefaultRandomSelectorName = "default"
29
)
30

31
func init() {
32
	_ = RegisterDataPartitionSelector(DefaultRandomSelectorName, newDefaultRandomSelector)
33
}
34

35
func newDefaultRandomSelector(_ string) (selector DataPartitionSelector, e error) {
36
	selector = &DefaultRandomSelector{
37
		localLeaderPartitions: make([]*DataPartition, 0),
38
		partitions:            make([]*DataPartition, 0),
39
	}
40
	return
41
}
42

43
type DefaultRandomSelector struct {
44
	sync.RWMutex
45
	localLeaderPartitions []*DataPartition
46
	partitions            []*DataPartition
47
}
48

49
func (s *DefaultRandomSelector) Name() string {
50
	return DefaultRandomSelectorName
51
}
52

53
func (s *DefaultRandomSelector) Refresh(partitions []*DataPartition) (err error) {
54
	var localLeaderPartitions []*DataPartition
55
	for i := 0; i < len(partitions); i++ {
56
		if strings.Split(partitions[i].Hosts[0], ":")[0] == LocalIP {
57
			localLeaderPartitions = append(localLeaderPartitions, partitions[i])
58
		}
59
	}
60

61
	s.Lock()
62
	defer s.Unlock()
63

64
	s.localLeaderPartitions = localLeaderPartitions
65
	s.partitions = partitions
66
	return
67
}
68

69
func (s *DefaultRandomSelector) Select(exclude map[string]struct{}) (dp *DataPartition, err error) {
70
	dp = s.getLocalLeaderDataPartition(exclude)
71
	if dp != nil {
72
		return dp, nil
73
	}
74

75
	s.RLock()
76
	partitions := s.partitions
77
	s.RUnlock()
78

79
	dp = s.getRandomDataPartition(partitions, exclude)
80

81
	if dp != nil {
82
		return dp, nil
83
	}
84
	log.LogErrorf("DefaultRandomSelector: no writable data partition with %v partitions and exclude(%v)",
85
		len(partitions), exclude)
86
	return nil, fmt.Errorf("no writable data partition")
87
}
88

89
func (s *DefaultRandomSelector) RemoveDP(partitionID uint64) {
90
	s.RLock()
91
	rwPartitionGroups := s.partitions
92
	localLeaderPartitions := s.localLeaderPartitions
93
	s.RUnlock()
94

95
	var i int
96
	for i = 0; i < len(rwPartitionGroups); i++ {
97
		if rwPartitionGroups[i].PartitionID == partitionID {
98
			break
99
		}
100
	}
101
	if i >= len(rwPartitionGroups) {
102
		return
103
	}
104
	newRwPartition := make([]*DataPartition, 0)
105
	newRwPartition = append(newRwPartition, rwPartitionGroups[:i]...)
106
	newRwPartition = append(newRwPartition, rwPartitionGroups[i+1:]...)
107

108
	defer func() {
109
		s.Lock()
110
		s.partitions = newRwPartition
111
		s.Unlock()
112
	}()
113

114
	for i = 0; i < len(localLeaderPartitions); i++ {
115
		if localLeaderPartitions[i].PartitionID == partitionID {
116
			break
117
		}
118
	}
119
	if i >= len(localLeaderPartitions) {
120
		return
121
	}
122
	newLocalLeaderPartitions := make([]*DataPartition, 0)
123
	newLocalLeaderPartitions = append(newLocalLeaderPartitions, localLeaderPartitions[:i]...)
124
	newLocalLeaderPartitions = append(newLocalLeaderPartitions, localLeaderPartitions[i+1:]...)
125

126
	s.Lock()
127
	defer s.Unlock()
128
	s.localLeaderPartitions = newLocalLeaderPartitions
129

130
	return
131
}
132

133
func (s *DefaultRandomSelector) Count() int {
134
	s.RLock()
135
	defer s.RUnlock()
136
	return len(s.partitions)
137
}
138

139
func (s *DefaultRandomSelector) getLocalLeaderDataPartition(exclude map[string]struct{}) *DataPartition {
140
	s.RLock()
141
	localLeaderPartitions := s.localLeaderPartitions
142
	s.RUnlock()
143
	return s.getRandomDataPartition(localLeaderPartitions, exclude)
144
}
145

146
func (s *DefaultRandomSelector) getRandomDataPartition(partitions []*DataPartition, exclude map[string]struct{}) (
147
	dp *DataPartition) {
148
	length := len(partitions)
149
	if length == 0 {
150
		return nil
151
	}
152

153
	rand.Seed(time.Now().UnixNano())
154
	index := rand.Intn(length)
155
	dp = partitions[index]
156
	if !isExcluded(dp, exclude) {
157
		log.LogDebugf("DefaultRandomSelector: select dp[%v] address[%p], index %v", dp, dp, index)
158
		return dp
159
	}
160

161
	log.LogWarnf("DefaultRandomSelector: first random partition was excluded, get partition from others")
162

163
	var currIndex int
164
	for i := 0; i < length; i++ {
165
		currIndex = (index + i) % length
166
		if !isExcluded(partitions[currIndex], exclude) {
167
			log.LogDebugf("DefaultRandomSelector: select dp[%v], index %v", partitions[currIndex], currIndex)
168
			return partitions[currIndex]
169
		}
170
	}
171
	return nil
172
}
173

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.