8
"github.com/garyburd/redigo/redis"
9
"github.com/patrickmn/go-cache"
13
"go.avito.ru/DO/moira"
16
// DbConnector contains redis pool
17
type DbConnector struct {
20
retentionCache *cache.Cache
21
metricsCache *cache.Cache
22
messengersCache *cache.Cache
26
// NewDatabase creates Redis pool based on config
27
func NewDatabase(logger moira.Logger, config Config) *DbConnector {
28
pool := newRedisPool(fmt.Sprintf("%s:%s", config.Host, config.Port), config.DBID)
32
retentionCache: cache.New(time.Minute, time.Minute*60),
33
metricsCache: cache.New(time.Minute, time.Minute*60),
34
messengersCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
35
sync: redsync.New([]redsync.Pool{pool}),
40
func newRedisPool(redisURI string, dbID ...int) *redis.Pool {
43
IdleTimeout: 240 * time.Second,
44
Dial: func() (redis.Conn, error) {
45
c, err := redis.Dial("tcp", redisURI)
50
c.Do("SELECT", dbID[0])
54
TestOnBorrow: func(c redis.Conn, t time.Time) error {
55
_, err := c.Do("PING")
61
func (connector *DbConnector) makePubSubConnection(channel string) (*redis.PubSubConn, error) {
62
c := connector.pool.Get()
63
psc := redis.PubSubConn{Conn: c}
64
if err := psc.Subscribe(channel); err != nil {
65
return nil, fmt.Errorf("Failed to subscribe to '%s', error: %v", channel, err)
70
func (connector *DbConnector) manageSubscriptions(tomb *tomb.Tomb, channel string) (<-chan []byte, error) {
71
psc, err := connector.makePubSubConnection(channel)
78
connector.logger.InfoF("Calling shutdown, unsubscribe from '%s' redis channels...", channel)
82
dataChan := make(chan []byte)
86
switch n := psc.Receive().(type) {
89
case redis.Subscription:
92
connector.logger.InfoF("Subscribe to %s channel, current subscriptions is %v", n.Channel, n.Count)
94
connector.logger.InfoF("Unsubscribe from %s channel, current subscriptions is %v", n.Channel, n.Count)
96
connector.logger.InfoF("No more subscriptions, exit...")
102
connector.logger.InfoF("psc.Receive() returned *net.OpError: %s. Reconnecting...", n.Err.Error())
103
newPsc, err := connector.makePubSubConnection(metricEventKey)
105
connector.logger.ErrorF("Failed to reconnect to subscription: %v", err)
106
<-time.After(5 * time.Second)
110
<-time.After(5 * time.Second)
112
connector.logger.ErrorF("Can not receive message of type '%T': %v", n, n)
113
<-time.After(5 * time.Second)
120
// CLEAN DATABASE! USE IT ONLY FOR TESTING!!!
121
func (connector *DbConnector) flush() {
122
c := connector.pool.Get()