weaviate

Форк
0
/
meta_class.go 
283 строки · 7.5 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package store
13

14
import (
15
	"fmt"
16
	"strings"
17
	"sync"
18

19
	command "github.com/weaviate/weaviate/cluster/proto/api"
20
	"github.com/weaviate/weaviate/entities/models"
21
	entSchema "github.com/weaviate/weaviate/entities/schema"
22
	"github.com/weaviate/weaviate/usecases/sharding"
23
	"golang.org/x/exp/slices"
24
)
25

26
type metaClass struct {
27
	sync.RWMutex
28
	Class        models.Class
29
	ClassVersion uint64
30
	Sharding     sharding.State
31
	ShardVersion uint64
32
}
33

34
func (m *metaClass) ClassInfo() (ci ClassInfo) {
35
	if m == nil {
36
		return
37
	}
38
	m.RLock()
39
	defer m.RUnlock()
40
	ci.Exists = true
41
	ci.Properties = len(m.Class.Properties)
42
	ci.MultiTenancy, _ = m.MultiTenancyConfig()
43
	ci.ReplicationFactor = 1
44
	if m.Class.ReplicationConfig != nil && m.Class.ReplicationConfig.Factor > 1 {
45
		ci.ReplicationFactor = int(m.Class.ReplicationConfig.Factor)
46
	}
47
	ci.Tenants = len(m.Sharding.Physical)
48
	ci.ClassVersion = m.ClassVersion
49
	ci.ShardVersion = m.ShardVersion
50
	return ci
51
}
52

53
func (m *metaClass) version() uint64 {
54
	if m == nil {
55
		return 0
56
	}
57
	return max(m.ClassVersion, m.ShardVersion)
58
}
59

60
func (m *metaClass) MultiTenancyConfig() (mc models.MultiTenancyConfig, v uint64) {
61
	if m == nil {
62
		return
63
	}
64
	m.RLock()
65
	defer m.RUnlock()
66
	if m.Class.MultiTenancyConfig == nil {
67
		return
68
	}
69

70
	return *m.Class.MultiTenancyConfig, m.version()
71
}
72

73
// CloneClass returns a shallow copy of m
74
func (m *metaClass) CloneClass() *models.Class {
75
	m.RLock()
76
	defer m.RUnlock()
77
	cp := m.Class
78
	return &cp
79
}
80

81
// ShardOwner returns the node owner of the specified shard
82
func (m *metaClass) ShardOwner(shard string) (string, uint64, error) {
83
	m.RLock()
84
	defer m.RUnlock()
85
	x, ok := m.Sharding.Physical[shard]
86

87
	if !ok {
88
		return "", 0, errShardNotFound
89
	}
90
	if len(x.BelongsToNodes) < 1 || x.BelongsToNodes[0] == "" {
91
		return "", 0, fmt.Errorf("owner node not found")
92
	}
93
	return x.BelongsToNodes[0], m.version(), nil
94
}
95

96
// ShardFromUUID returns shard name of the provided uuid
97
func (m *metaClass) ShardFromUUID(uuid []byte) (string, uint64) {
98
	m.RLock()
99
	defer m.RUnlock()
100
	return m.Sharding.PhysicalShard(uuid), m.version()
101
}
102

103
// ShardReplicas returns the replica nodes of a shard
104
func (m *metaClass) ShardReplicas(shard string) ([]string, uint64, error) {
105
	m.RLock()
106
	defer m.RUnlock()
107
	x, ok := m.Sharding.Physical[shard]
108
	if !ok {
109
		return nil, 0, errShardNotFound
110
	}
111
	return slices.Clone(x.BelongsToNodes), m.version(), nil
112
}
113

114
// TenantsShards returns shard name for the provided tenant and its activity status
115
func (m *metaClass) TenantsShards(class string, tenants ...string) (map[string]string, uint64) {
116
	m.RLock()
117
	defer m.RUnlock()
118

119
	v := m.version()
120
	if !m.Sharding.PartitioningEnabled {
121
		return nil, v
122
	}
123

124
	res := make(map[string]string, len(tenants))
125
	for _, t := range tenants {
126
		if physical, ok := m.Sharding.Physical[t]; ok {
127
			res[t] = physical.ActivityStatus()
128
		}
129
	}
130
	return res, v
131
}
132

133
// CopyShardingState returns a deep copy of the sharding state
134
func (m *metaClass) CopyShardingState() (*sharding.State, uint64) {
135
	m.RLock()
136
	defer m.RUnlock()
137
	st := m.Sharding.DeepCopy()
138
	return &st, m.version()
139
}
140

141
func (m *metaClass) AddProperty(v uint64, props ...*models.Property) error {
142
	m.Lock()
143
	defer m.Unlock()
144

145
	// update all at once to prevent race condition with concurrent readers
146
	mergedProps := MergeProps(m.Class.Properties, props)
147
	m.Class.Properties = mergedProps
148
	m.ClassVersion = v
149
	return nil
150
}
151

152
// MergeProps makes sure duplicates are not created by ignoring new props
153
// with the same names as old props.
154
// If property of nested type is present in both new and old slices,
155
// final property is created by merging new property into copy of old one
156
func MergeProps(old, new []*models.Property) []*models.Property {
157
	mergedProps := make([]*models.Property, len(old), len(old)+len(new))
158
	copy(mergedProps, old)
159

160
	// create memory to avoid duplication
161
	mem := make(map[string]int, len(old))
162
	for idx := range old {
163
		mem[strings.ToLower(old[idx].Name)] = idx
164
	}
165

166
	// pick ones not present in old slice or merge nested properties
167
	// if already present
168
	for idx := range new {
169
		if oldIdx, exists := mem[strings.ToLower(new[idx].Name)]; !exists {
170
			mergedProps = append(mergedProps, new[idx])
171
		} else {
172
			nestedProperties, merged := entSchema.MergeRecursivelyNestedProperties(
173
				mergedProps[oldIdx].NestedProperties,
174
				new[idx].NestedProperties)
175
			if merged {
176
				propCopy := *mergedProps[oldIdx]
177
				propCopy.NestedProperties = nestedProperties
178
				mergedProps[oldIdx] = &propCopy
179
			}
180
		}
181
	}
182

183
	return mergedProps
184
}
185

186
func (m *metaClass) AddTenants(nodeID string, req *command.AddTenantsRequest, replFactor int64, v uint64) error {
187
	req.Tenants = removeNilTenants(req.Tenants)
188
	m.Lock()
189
	defer m.Unlock()
190

191
	// TODO-RAFT: Optimize here and avoid iteration twice on the req.Tenants array
192
	names := make([]string, len(req.Tenants))
193
	for i, tenant := range req.Tenants {
194
		names[i] = tenant.Name
195
	}
196
	// First determine the partition based on the node *present at the time of the log entry being created*
197
	partitions, err := m.Sharding.GetPartitions(req.ClusterNodes, names, replFactor)
198
	if err != nil {
199
		return fmt.Errorf("get partitions: %w", err)
200
	}
201

202
	// Iterate over requested tenants and assign them, if found, a partition
203
	for i, t := range req.Tenants {
204
		if _, ok := m.Sharding.Physical[t.Name]; ok {
205
			req.Tenants[i] = nil // already exists
206
			continue
207
		}
208
		// TODO-RAFT: Check in which cases can the partition not have assigned one to a tenant
209
		part, ok := partitions[t.Name]
210
		if !ok {
211
			// TODO-RAFT: Do we want to silently continue here or raise an error ?
212
			continue
213
		}
214
		p := sharding.Physical{Name: t.Name, Status: t.Status, BelongsToNodes: part}
215
		m.Sharding.Physical[t.Name] = p
216
		// TODO-RAFT: Check here why we set =nil if it is "owned by another node"
217
		if !slices.Contains(part, nodeID) {
218
			req.Tenants[i] = nil // is owned by another node
219
		}
220
	}
221
	m.ShardVersion = v
222
	req.Tenants = removeNilTenants(req.Tenants)
223
	return nil
224
}
225

226
func (m *metaClass) DeleteTenants(req *command.DeleteTenantsRequest, v uint64) error {
227
	m.Lock()
228
	defer m.Unlock()
229

230
	for _, name := range req.Tenants {
231
		m.Sharding.DeletePartition(name)
232
	}
233
	m.ShardVersion = v
234
	return nil
235
}
236

237
func (m *metaClass) UpdateTenants(nodeID string, req *command.UpdateTenantsRequest, v uint64) (n int, err error) {
238
	m.Lock()
239
	defer m.Unlock()
240

241
	missingShards := []string{}
242
	ps := m.Sharding.Physical
243
	for i, u := range req.Tenants {
244
		p, ok := ps[u.Name]
245
		if !ok {
246
			missingShards = append(missingShards, u.Name)
247
			req.Tenants[i] = nil
248
			continue
249
		}
250
		if p.ActivityStatus() == u.Status {
251
			req.Tenants[i] = nil
252
			continue
253
		}
254
		copy := p.DeepCopy()
255
		copy.Status = u.Status
256
		ps[u.Name] = copy
257
		if !slices.Contains(copy.BelongsToNodes, nodeID) {
258
			req.Tenants[i] = nil
259
		}
260
		n++
261
	}
262

263
	if len(missingShards) > 0 {
264
		err = fmt.Errorf("%w: %v", errShardNotFound, missingShards)
265
	}
266
	m.ShardVersion = v
267
	req.Tenants = removeNilTenants(req.Tenants)
268
	return
269
}
270

271
// LockGuard provides convenient mechanism for owning mutex by function which mutates the state.
272
func (m *metaClass) LockGuard(mutator func(*metaClass) error) error {
273
	m.Lock()
274
	defer m.Unlock()
275
	return mutator(m)
276
}
277

278
// RLockGuard provides convenient mechanism for owning mutex function which doesn't mutates the state
279
func (m *metaClass) RLockGuard(reader func(*models.Class, *sharding.State) error) error {
280
	m.RLock()
281
	defer m.RUnlock()
282
	return reader(&m.Class, &m.Sharding)
283
}
284

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.