moira
62 строки · 1.3 Кб
1package connection2
3import (4"bufio"5"io"6"net"7"sync"8
9"go.avito.ru/DO/moira"10)
11
12// Handler handling connection data and shift it to lineChan channel
13type Handler struct {14logger moira.Logger15wg sync.WaitGroup16terminate chan bool17}
18
19// NewConnectionsHandler creates new Handler
20func NewConnectionsHandler(logger moira.Logger) *Handler {21return &Handler{22logger: logger,23terminate: make(chan bool, 1),24}25}
26
27// HandleConnection convert every line from connection to metric and send it to lineChan channel
28func (handler *Handler) HandleConnection(connection net.Conn, lineChan chan<- []byte) {29handler.wg.Add(1)30go func() {31defer handler.wg.Done()32handler.handle(connection, lineChan)33}()34}
35
36func (handler *Handler) handle(connection net.Conn, lineChan chan<- []byte) {37buffer := bufio.NewReader(connection)38
39go func(conn net.Conn) {40<-handler.terminate41conn.Close()42}(connection)43
44for {45lineBytes, err := buffer.ReadBytes('\n')46if err != nil {47connection.Close()48if err != io.EOF {49handler.logger.ErrorF("read failed: %s", err)50}51break52}53lineBytes = lineBytes[:len(lineBytes)-1]54lineChan <- lineBytes55}56}
57
58// StopHandlingConnections closes all open connections and wait for handling ramaining metrics
59func (handler *Handler) StopHandlingConnections() {60close(handler.terminate)61handler.wg.Wait()62}
63