prometheus
212 строк · 5.5 Кб
1// Copyright 2019 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package promql
15
16import (
17"context"
18"encoding/json"
19"io"
20"os"
21"path/filepath"
22"strings"
23"time"
24"unicode/utf8"
25
26"github.com/edsrzf/mmap-go"
27"github.com/go-kit/log"
28"github.com/go-kit/log/level"
29)
30
31type ActiveQueryTracker struct {
32mmapedFile []byte
33getNextIndex chan int
34logger log.Logger
35closer io.Closer
36maxConcurrent int
37}
38
39type Entry struct {
40Query string `json:"query"`
41Timestamp int64 `json:"timestamp_sec"`
42}
43
44const (
45entrySize int = 1000
46)
47
48func parseBrokenJSON(brokenJSON []byte) (string, bool) {
49queries := strings.ReplaceAll(string(brokenJSON), "\x00", "")
50if len(queries) > 0 {
51queries = queries[:len(queries)-1] + "]"
52}
53
54// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['.
55if len(queries) <= 1 {
56return "[]", false
57}
58
59return queries, true
60}
61
62func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
63if _, err := os.Stat(filename); err == nil {
64fd, err := os.Open(filename)
65if err != nil {
66level.Error(logger).Log("msg", "Failed to open query log file", "err", err)
67return
68}
69defer fd.Close()
70
71brokenJSON := make([]byte, filesize)
72_, err = fd.Read(brokenJSON)
73if err != nil {
74level.Error(logger).Log("msg", "Failed to read query log file", "err", err)
75return
76}
77
78queries, queriesExist := parseBrokenJSON(brokenJSON)
79if !queriesExist {
80return
81}
82level.Info(logger).Log("msg", "These queries didn't finish in prometheus' last run:", "queries", queries)
83}
84}
85
86func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) {
87file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
88if err != nil {
89absPath, pathErr := filepath.Abs(filename)
90if pathErr != nil {
91absPath = filename
92}
93level.Error(logger).Log("msg", "Error opening query log file", "file", absPath, "err", err)
94return nil, nil, err
95}
96
97err = file.Truncate(int64(filesize))
98if err != nil {
99file.Close()
100level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err)
101return nil, nil, err
102}
103
104fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
105if err != nil {
106file.Close()
107level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attempted size", filesize, "err", err)
108return nil, nil, err
109}
110
111return fileAsBytes, file, err
112}
113
114func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
115err := os.MkdirAll(localStoragePath, 0o777)
116if err != nil {
117level.Error(logger).Log("msg", "Failed to create directory for logging active queries")
118}
119
120filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize
121logUnfinishedQueries(filename, filesize, logger)
122
123fileAsBytes, closer, err := getMMapedFile(filename, filesize, logger)
124if err != nil {
125panic("Unable to create mmap-ed active query log")
126}
127
128copy(fileAsBytes, "[")
129activeQueryTracker := ActiveQueryTracker{
130mmapedFile: fileAsBytes,
131closer: closer,
132getNextIndex: make(chan int, maxConcurrent),
133logger: logger,
134maxConcurrent: maxConcurrent,
135}
136
137activeQueryTracker.generateIndices(maxConcurrent)
138
139return &activeQueryTracker
140}
141
142func trimStringByBytes(str string, size int) string {
143bytesStr := []byte(str)
144
145trimIndex := len(bytesStr)
146if size < len(bytesStr) {
147for !utf8.RuneStart(bytesStr[size]) {
148size--
149}
150trimIndex = size
151}
152
153return string(bytesStr[:trimIndex])
154}
155
156func _newJSONEntry(query string, timestamp int64, logger log.Logger) []byte {
157entry := Entry{query, timestamp}
158jsonEntry, err := json.Marshal(entry)
159if err != nil {
160level.Error(logger).Log("msg", "Cannot create json of query", "query", query)
161return []byte{}
162}
163
164return jsonEntry
165}
166
167func newJSONEntry(query string, logger log.Logger) []byte {
168timestamp := time.Now().Unix()
169minEntryJSON := _newJSONEntry("", timestamp, logger)
170
171query = trimStringByBytes(query, entrySize-(len(minEntryJSON)+1))
172jsonEntry := _newJSONEntry(query, timestamp, logger)
173
174return jsonEntry
175}
176
177func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) {
178for i := 0; i < maxConcurrent; i++ {
179tracker.getNextIndex <- 1 + (i * entrySize)
180}
181}
182
183func (tracker ActiveQueryTracker) GetMaxConcurrent() int {
184return tracker.maxConcurrent
185}
186
187func (tracker ActiveQueryTracker) Delete(insertIndex int) {
188copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize))
189tracker.getNextIndex <- insertIndex
190}
191
192func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) {
193select {
194case i := <-tracker.getNextIndex:
195fileBytes := tracker.mmapedFile
196entry := newJSONEntry(query, tracker.logger)
197start, end := i, i+entrySize
198
199copy(fileBytes[start:], entry)
200copy(fileBytes[end-1:], ",")
201return i, nil
202case <-ctx.Done():
203return 0, ctx.Err()
204}
205}
206
207func (tracker *ActiveQueryTracker) Close() {
208if tracker == nil || tracker.closer == nil {
209return
210}
211tracker.closer.Close()
212}
213