moira

Форк
0
/
database.go 
125 строк · 3.3 Кб
1
package redis
2

3
import (
4
	"fmt"
5
	"net"
6
	"time"
7

8
	"github.com/garyburd/redigo/redis"
9
	"github.com/patrickmn/go-cache"
10
	"gopkg.in/redsync.v1"
11
	"gopkg.in/tomb.v2"
12

13
	"go.avito.ru/DO/moira"
14
)
15

16
// DbConnector contains redis pool
17
type DbConnector struct {
18
	pool            *redis.Pool
19
	logger          moira.Logger
20
	retentionCache  *cache.Cache
21
	metricsCache    *cache.Cache
22
	messengersCache *cache.Cache
23
	sync            *redsync.Redsync
24
}
25

26
// NewDatabase creates Redis pool based on config
27
func NewDatabase(logger moira.Logger, config Config) *DbConnector {
28
	pool := newRedisPool(fmt.Sprintf("%s:%s", config.Host, config.Port), config.DBID)
29
	db := DbConnector{
30
		pool:            pool,
31
		logger:          logger,
32
		retentionCache:  cache.New(time.Minute, time.Minute*60),
33
		metricsCache:    cache.New(time.Minute, time.Minute*60),
34
		messengersCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
35
		sync:            redsync.New([]redsync.Pool{pool}),
36
	}
37
	return &db
38
}
39

40
func newRedisPool(redisURI string, dbID ...int) *redis.Pool {
41
	return &redis.Pool{
42
		MaxIdle:     3,
43
		IdleTimeout: 240 * time.Second,
44
		Dial: func() (redis.Conn, error) {
45
			c, err := redis.Dial("tcp", redisURI)
46
			if err != nil {
47
				return nil, err
48
			}
49
			if len(dbID) > 0 {
50
				c.Do("SELECT", dbID[0])
51
			}
52
			return c, err
53
		},
54
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
55
			_, err := c.Do("PING")
56
			return err
57
		},
58
	}
59
}
60

61
func (connector *DbConnector) makePubSubConnection(channel string) (*redis.PubSubConn, error) {
62
	c := connector.pool.Get()
63
	psc := redis.PubSubConn{Conn: c}
64
	if err := psc.Subscribe(channel); err != nil {
65
		return nil, fmt.Errorf("Failed to subscribe to '%s', error: %v", channel, err)
66
	}
67
	return &psc, nil
68
}
69

70
func (connector *DbConnector) manageSubscriptions(tomb *tomb.Tomb, channel string) (<-chan []byte, error) {
71
	psc, err := connector.makePubSubConnection(channel)
72
	if err != nil {
73
		return nil, err
74
	}
75

76
	go func() {
77
		<-tomb.Dying()
78
		connector.logger.InfoF("Calling shutdown, unsubscribe from '%s' redis channels...", channel)
79
		psc.Unsubscribe()
80
	}()
81

82
	dataChan := make(chan []byte)
83
	go func() {
84
		defer psc.Close()
85
		for {
86
			switch n := psc.Receive().(type) {
87
			case redis.Message:
88
				dataChan <- n.Data
89
			case redis.Subscription:
90
				switch n.Kind {
91
				case "subscribe":
92
					connector.logger.InfoF("Subscribe to %s channel, current subscriptions is %v", n.Channel, n.Count)
93
				case "unsubscribe":
94
					connector.logger.InfoF("Unsubscribe from %s channel, current subscriptions is %v", n.Channel, n.Count)
95
					if n.Count == 0 {
96
						connector.logger.InfoF("No more subscriptions, exit...")
97
						close(dataChan)
98
						return
99
					}
100
				}
101
			case *net.OpError:
102
				connector.logger.InfoF("psc.Receive() returned *net.OpError: %s. Reconnecting...", n.Err.Error())
103
				newPsc, err := connector.makePubSubConnection(metricEventKey)
104
				if err != nil {
105
					connector.logger.ErrorF("Failed to reconnect to subscription: %v", err)
106
					<-time.After(5 * time.Second)
107
					continue
108
				}
109
				psc = newPsc
110
				<-time.After(5 * time.Second)
111
			default:
112
				connector.logger.ErrorF("Can not receive message of type '%T': %v", n, n)
113
				<-time.After(5 * time.Second)
114
			}
115
		}
116
	}()
117
	return dataChan, nil
118
}
119

120
// CLEAN DATABASE! USE IT ONLY FOR TESTING!!!
121
func (connector *DbConnector) flush() {
122
	c := connector.pool.Get()
123
	defer c.Close()
124
	c.Do("FLUSHDB")
125
}
126

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.