weaviate

Форк
0
/
transport.go 
86 строк · 2.4 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package store
13

14
import (
15
	"fmt"
16
	"net"
17
	"sync"
18
	"time"
19

20
	"github.com/hashicorp/raft"
21
	"github.com/sirupsen/logrus"
22
)
23

24
// addressResolver resolves server id into an ip
25
type addressResolver interface {
26
	// NodeAddress resolves node id into an ip address without the port
27
	NodeAddress(id string) string
28
}
29

30
type addrResolver struct {
31
	addressResolver
32
	RaftPort int
33
	// IsLocalCluster is cluster running Weaviate from the console in localhost
34
	IsLocalCluster   bool
35
	NodeName2PortMap map[string]int
36

37
	nodesLock        sync.Mutex
38
	notResolvedNodes map[raft.ServerID]struct{}
39
}
40

41
func newAddrResolver(cfg *Config) *addrResolver {
42
	return &addrResolver{
43
		addressResolver:  cfg.AddrResolver,
44
		RaftPort:         cfg.RaftPort,
45
		IsLocalCluster:   cfg.IsLocalHost,
46
		NodeName2PortMap: cfg.ServerName2PortMap,
47
		notResolvedNodes: make(map[raft.ServerID]struct{}),
48
	}
49
}
50

51
// ServerAddr resolves server ID to a RAFT address
52
func (a *addrResolver) ServerAddr(id raft.ServerID) (raft.ServerAddress, error) {
53
	addr := a.addressResolver.NodeAddress(string(id))
54

55
	a.nodesLock.Lock()
56
	defer a.nodesLock.Unlock()
57
	if addr == "" {
58
		a.notResolvedNodes[id] = struct{}{}
59
		return "", fmt.Errorf("could not resolve server id %s", id)
60
	}
61
	delete(a.notResolvedNodes, id)
62

63
	if !a.IsLocalCluster {
64
		return raft.ServerAddress(fmt.Sprintf("%s:%d", addr, a.RaftPort)), nil
65
	}
66

67
	// This is only necessary for running Weaviate from the console in localhost
68
	return raft.ServerAddress(fmt.Sprintf("%s:%d", addr, a.NodeName2PortMap[string(id)])), nil
69
}
70

71
// NewTCPTransport returns a new raft.NetworkTransportConfig that utilizes
72
// this resolver to resolve addresses based on server IDs.
73
// This is particularly crucial as K8s assigns new IPs on each node restart.
74
func (a *addrResolver) NewTCPTransport(
75
	bindAddr string, advertise net.Addr,
76
	maxPool int, timeout time.Duration, logger *logrus.Logger,
77
) (*raft.NetworkTransport, error) {
78
	cfg := &raft.NetworkTransportConfig{
79
		ServerAddressProvider: a,
80
		MaxPool:               tcpMaxPool,
81
		Timeout:               tcpTimeout,
82
		Logger:                NewHCLogrusLogger("raft-net", logger),
83
	}
84

85
	return raft.NewTCPTransportWithConfig(bindAddr, advertise, cfg)
86
}
87

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.