prometheus

Форк
0
/
query_logger.go 
212 строк · 5.5 Кб
1
// Copyright 2019 The Prometheus Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package promql
15

16
import (
17
	"context"
18
	"encoding/json"
19
	"io"
20
	"os"
21
	"path/filepath"
22
	"strings"
23
	"time"
24
	"unicode/utf8"
25

26
	"github.com/edsrzf/mmap-go"
27
	"github.com/go-kit/log"
28
	"github.com/go-kit/log/level"
29
)
30

31
type ActiveQueryTracker struct {
32
	mmapedFile    []byte
33
	getNextIndex  chan int
34
	logger        log.Logger
35
	closer        io.Closer
36
	maxConcurrent int
37
}
38

39
type Entry struct {
40
	Query     string `json:"query"`
41
	Timestamp int64  `json:"timestamp_sec"`
42
}
43

44
const (
45
	entrySize int = 1000
46
)
47

48
func parseBrokenJSON(brokenJSON []byte) (string, bool) {
49
	queries := strings.ReplaceAll(string(brokenJSON), "\x00", "")
50
	if len(queries) > 0 {
51
		queries = queries[:len(queries)-1] + "]"
52
	}
53

54
	// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['.
55
	if len(queries) <= 1 {
56
		return "[]", false
57
	}
58

59
	return queries, true
60
}
61

62
func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
63
	if _, err := os.Stat(filename); err == nil {
64
		fd, err := os.Open(filename)
65
		if err != nil {
66
			level.Error(logger).Log("msg", "Failed to open query log file", "err", err)
67
			return
68
		}
69
		defer fd.Close()
70

71
		brokenJSON := make([]byte, filesize)
72
		_, err = fd.Read(brokenJSON)
73
		if err != nil {
74
			level.Error(logger).Log("msg", "Failed to read query log file", "err", err)
75
			return
76
		}
77

78
		queries, queriesExist := parseBrokenJSON(brokenJSON)
79
		if !queriesExist {
80
			return
81
		}
82
		level.Info(logger).Log("msg", "These queries didn't finish in prometheus' last run:", "queries", queries)
83
	}
84
}
85

86
func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) {
87
	file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
88
	if err != nil {
89
		absPath, pathErr := filepath.Abs(filename)
90
		if pathErr != nil {
91
			absPath = filename
92
		}
93
		level.Error(logger).Log("msg", "Error opening query log file", "file", absPath, "err", err)
94
		return nil, nil, err
95
	}
96

97
	err = file.Truncate(int64(filesize))
98
	if err != nil {
99
		file.Close()
100
		level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err)
101
		return nil, nil, err
102
	}
103

104
	fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
105
	if err != nil {
106
		file.Close()
107
		level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attempted size", filesize, "err", err)
108
		return nil, nil, err
109
	}
110

111
	return fileAsBytes, file, err
112
}
113

114
func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
115
	err := os.MkdirAll(localStoragePath, 0o777)
116
	if err != nil {
117
		level.Error(logger).Log("msg", "Failed to create directory for logging active queries")
118
	}
119

120
	filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize
121
	logUnfinishedQueries(filename, filesize, logger)
122

123
	fileAsBytes, closer, err := getMMapedFile(filename, filesize, logger)
124
	if err != nil {
125
		panic("Unable to create mmap-ed active query log")
126
	}
127

128
	copy(fileAsBytes, "[")
129
	activeQueryTracker := ActiveQueryTracker{
130
		mmapedFile:    fileAsBytes,
131
		closer:        closer,
132
		getNextIndex:  make(chan int, maxConcurrent),
133
		logger:        logger,
134
		maxConcurrent: maxConcurrent,
135
	}
136

137
	activeQueryTracker.generateIndices(maxConcurrent)
138

139
	return &activeQueryTracker
140
}
141

142
func trimStringByBytes(str string, size int) string {
143
	bytesStr := []byte(str)
144

145
	trimIndex := len(bytesStr)
146
	if size < len(bytesStr) {
147
		for !utf8.RuneStart(bytesStr[size]) {
148
			size--
149
		}
150
		trimIndex = size
151
	}
152

153
	return string(bytesStr[:trimIndex])
154
}
155

156
func _newJSONEntry(query string, timestamp int64, logger log.Logger) []byte {
157
	entry := Entry{query, timestamp}
158
	jsonEntry, err := json.Marshal(entry)
159
	if err != nil {
160
		level.Error(logger).Log("msg", "Cannot create json of query", "query", query)
161
		return []byte{}
162
	}
163

164
	return jsonEntry
165
}
166

167
func newJSONEntry(query string, logger log.Logger) []byte {
168
	timestamp := time.Now().Unix()
169
	minEntryJSON := _newJSONEntry("", timestamp, logger)
170

171
	query = trimStringByBytes(query, entrySize-(len(minEntryJSON)+1))
172
	jsonEntry := _newJSONEntry(query, timestamp, logger)
173

174
	return jsonEntry
175
}
176

177
func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) {
178
	for i := 0; i < maxConcurrent; i++ {
179
		tracker.getNextIndex <- 1 + (i * entrySize)
180
	}
181
}
182

183
func (tracker ActiveQueryTracker) GetMaxConcurrent() int {
184
	return tracker.maxConcurrent
185
}
186

187
func (tracker ActiveQueryTracker) Delete(insertIndex int) {
188
	copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize))
189
	tracker.getNextIndex <- insertIndex
190
}
191

192
func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) {
193
	select {
194
	case i := <-tracker.getNextIndex:
195
		fileBytes := tracker.mmapedFile
196
		entry := newJSONEntry(query, tracker.logger)
197
		start, end := i, i+entrySize
198

199
		copy(fileBytes[start:], entry)
200
		copy(fileBytes[end-1:], ",")
201
		return i, nil
202
	case <-ctx.Done():
203
		return 0, ctx.Err()
204
	}
205
}
206

207
func (tracker *ActiveQueryTracker) Close() {
208
	if tracker == nil || tracker.closer == nil {
209
		return
210
	}
211
	tracker.closer.Close()
212
}
213

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.