19
command "github.com/weaviate/weaviate/cluster/proto/api"
20
"github.com/weaviate/weaviate/entities/models"
21
entSchema "github.com/weaviate/weaviate/entities/schema"
22
"github.com/weaviate/weaviate/usecases/sharding"
23
"golang.org/x/exp/slices"
26
type metaClass struct {
30
Sharding sharding.State
34
func (m *metaClass) ClassInfo() (ci ClassInfo) {
41
ci.Properties = len(m.Class.Properties)
42
ci.MultiTenancy, _ = m.MultiTenancyConfig()
43
ci.ReplicationFactor = 1
44
if m.Class.ReplicationConfig != nil && m.Class.ReplicationConfig.Factor > 1 {
45
ci.ReplicationFactor = int(m.Class.ReplicationConfig.Factor)
47
ci.Tenants = len(m.Sharding.Physical)
48
ci.ClassVersion = m.ClassVersion
49
ci.ShardVersion = m.ShardVersion
53
func (m *metaClass) version() uint64 {
57
return max(m.ClassVersion, m.ShardVersion)
60
func (m *metaClass) MultiTenancyConfig() (mc models.MultiTenancyConfig, v uint64) {
66
if m.Class.MultiTenancyConfig == nil {
70
return *m.Class.MultiTenancyConfig, m.version()
74
func (m *metaClass) CloneClass() *models.Class {
82
func (m *metaClass) ShardOwner(shard string) (string, uint64, error) {
85
x, ok := m.Sharding.Physical[shard]
88
return "", 0, errShardNotFound
90
if len(x.BelongsToNodes) < 1 || x.BelongsToNodes[0] == "" {
91
return "", 0, fmt.Errorf("owner node not found")
93
return x.BelongsToNodes[0], m.version(), nil
97
func (m *metaClass) ShardFromUUID(uuid []byte) (string, uint64) {
100
return m.Sharding.PhysicalShard(uuid), m.version()
104
func (m *metaClass) ShardReplicas(shard string) ([]string, uint64, error) {
107
x, ok := m.Sharding.Physical[shard]
109
return nil, 0, errShardNotFound
111
return slices.Clone(x.BelongsToNodes), m.version(), nil
115
func (m *metaClass) TenantsShards(class string, tenants ...string) (map[string]string, uint64) {
120
if !m.Sharding.PartitioningEnabled {
124
res := make(map[string]string, len(tenants))
125
for _, t := range tenants {
126
if physical, ok := m.Sharding.Physical[t]; ok {
127
res[t] = physical.ActivityStatus()
134
func (m *metaClass) CopyShardingState() (*sharding.State, uint64) {
137
st := m.Sharding.DeepCopy()
138
return &st, m.version()
141
func (m *metaClass) AddProperty(v uint64, props ...*models.Property) error {
146
mergedProps := MergeProps(m.Class.Properties, props)
147
m.Class.Properties = mergedProps
156
func MergeProps(old, new []*models.Property) []*models.Property {
157
mergedProps := make([]*models.Property, len(old), len(old)+len(new))
158
copy(mergedProps, old)
161
mem := make(map[string]int, len(old))
162
for idx := range old {
163
mem[strings.ToLower(old[idx].Name)] = idx
168
for idx := range new {
169
if oldIdx, exists := mem[strings.ToLower(new[idx].Name)]; !exists {
170
mergedProps = append(mergedProps, new[idx])
172
nestedProperties, merged := entSchema.MergeRecursivelyNestedProperties(
173
mergedProps[oldIdx].NestedProperties,
174
new[idx].NestedProperties)
176
propCopy := *mergedProps[oldIdx]
177
propCopy.NestedProperties = nestedProperties
178
mergedProps[oldIdx] = &propCopy
186
func (m *metaClass) AddTenants(nodeID string, req *command.AddTenantsRequest, replFactor int64, v uint64) error {
187
req.Tenants = removeNilTenants(req.Tenants)
192
names := make([]string, len(req.Tenants))
193
for i, tenant := range req.Tenants {
194
names[i] = tenant.Name
197
partitions, err := m.Sharding.GetPartitions(req.ClusterNodes, names, replFactor)
199
return fmt.Errorf("get partitions: %w", err)
203
for i, t := range req.Tenants {
204
if _, ok := m.Sharding.Physical[t.Name]; ok {
209
part, ok := partitions[t.Name]
214
p := sharding.Physical{Name: t.Name, Status: t.Status, BelongsToNodes: part}
215
m.Sharding.Physical[t.Name] = p
217
if !slices.Contains(part, nodeID) {
222
req.Tenants = removeNilTenants(req.Tenants)
226
func (m *metaClass) DeleteTenants(req *command.DeleteTenantsRequest, v uint64) error {
230
for _, name := range req.Tenants {
231
m.Sharding.DeletePartition(name)
237
func (m *metaClass) UpdateTenants(nodeID string, req *command.UpdateTenantsRequest, v uint64) (n int, err error) {
241
missingShards := []string{}
242
ps := m.Sharding.Physical
243
for i, u := range req.Tenants {
246
missingShards = append(missingShards, u.Name)
250
if p.ActivityStatus() == u.Status {
255
copy.Status = u.Status
257
if !slices.Contains(copy.BelongsToNodes, nodeID) {
263
if len(missingShards) > 0 {
264
err = fmt.Errorf("%w: %v", errShardNotFound, missingShards)
267
req.Tenants = removeNilTenants(req.Tenants)
272
func (m *metaClass) LockGuard(mutator func(*metaClass) error) error {
279
func (m *metaClass) RLockGuard(reader func(*models.Class, *sharding.State) error) error {
282
return reader(&m.Class, &m.Sharding)