cubefs

Форк
0
286 строк · 7.2 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package auditlog
16

17
import (
18
	"bytes"
19
	"encoding/json"
20
	"io"
21
	"net/http"
22
	"runtime/debug"
23
	"strconv"
24
	"sync"
25
	"time"
26

27
	"github.com/cubefs/cubefs/blobstore/common/rpc"
28
	"github.com/cubefs/cubefs/blobstore/common/trace"
29
	"github.com/cubefs/cubefs/blobstore/util/errors"
30
	"github.com/cubefs/cubefs/blobstore/util/largefile"
31
)
32

33
const (
34
	defaultReadBodyBuffLength = 512
35
	maxReadBodyBuffLength     = 2048
36
	defaultFileChunkBits      = 29
37
)
38

39
type reqBodyReadCloser struct {
40
	bodyRead int64
41

42
	io.ReadCloser
43
}
44

45
func (reqBody *reqBodyReadCloser) Read(p []byte) (n int, err error) {
46
	n, err = reqBody.ReadCloser.Read(p)
47
	reqBody.bodyRead += int64(n)
48
	return
49
}
50

51
type jsonAuditlog struct {
52
	module       string
53
	decoder      Decoder
54
	metricSender MetricSender
55
	logFile      LogCloser
56
	logFilter    LogFilter
57

58
	logPool  sync.Pool
59
	bodyPool sync.Pool
60

61
	cfg *Config
62
}
63

64
// AuditLog Define a struct to represent the structured log data
65
type AuditLog struct {
66
	ReqType    string `json:"req_type"`
67
	Module     string `json:"module"`
68
	StartTime  int64  `json:"start_time"`
69
	Method     string `json:"method"`
70
	Path       string `json:"path"`
71
	ReqHeader  M      `json:"req_header"`
72
	ReqParams  string `json:"req_params"`
73
	StatusCode int    `json:"status_code"`
74
	RespHeader M      `json:"resp_header"`
75
	RespBody   string `json:"resp_body"`
76
	RespLength int64  `json:"resp_length"`
77
	Duration   int64  `json:"duration"`
78
}
79

80
func (a *AuditLog) ToBytesWithTab(buf *bytes.Buffer) (b []byte) {
81
	buf.WriteString(a.ReqType)
82
	buf.WriteByte('\t')
83
	buf.WriteString(a.Module)
84
	buf.WriteByte('\t')
85
	buf.WriteString(strconv.FormatInt(a.StartTime, 10))
86
	buf.WriteByte('\t')
87
	buf.WriteString(a.Method)
88
	buf.WriteByte('\t')
89
	buf.WriteString(a.Path)
90
	buf.WriteByte('\t')
91
	buf.Write(a.ReqHeader.Encode())
92
	buf.WriteByte('\t')
93
	buf.WriteString(a.ReqParams)
94
	buf.WriteByte('\t')
95
	buf.WriteString(strconv.Itoa(a.StatusCode))
96
	buf.WriteByte('\t')
97
	buf.Write(a.RespHeader.Encode())
98
	buf.WriteByte('\t')
99
	buf.WriteString(a.RespBody)
100
	buf.WriteByte('\t')
101
	buf.WriteString(strconv.FormatInt(a.RespLength, 10))
102
	buf.WriteByte('\t')
103
	buf.WriteString(strconv.FormatInt(a.Duration, 10))
104
	buf.WriteByte('\n')
105
	return buf.Bytes()
106
}
107

108
func (a *AuditLog) ToJson() (b []byte) {
109
	b, _ = json.Marshal(a)
110
	return
111
}
112

113
func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser, err error) {
114
	if cfg.BodyLimit < 0 {
115
		cfg.BodyLimit = 0
116
	} else if cfg.BodyLimit == 0 {
117
		cfg.BodyLimit = defaultReadBodyBuffLength
118
	} else if cfg.BodyLimit > maxReadBodyBuffLength {
119
		cfg.BodyLimit = maxReadBodyBuffLength
120
	}
121

122
	if cfg.ChunkBits == 0 {
123
		cfg.ChunkBits = defaultFileChunkBits
124
	}
125

126
	largeLogConfig := largefile.Config{
127
		Path:              cfg.LogDir,
128
		FileChunkSizeBits: cfg.ChunkBits,
129
		Suffix:            cfg.LogFileSuffix,
130
		Backup:            cfg.Backup,
131
	}
132

133
	logFile = noopLogCloser{}
134
	if cfg.LogDir != "" {
135
		logFile, err = largefile.OpenLargeFileLog(largeLogConfig, cfg.RotateNew)
136
		if err != nil {
137
			return nil, nil, errors.Info(err, "auditlog.Open: large file log open failed").Detail(err)
138
		}
139
	}
140

141
	logFilter, err := newLogFilter(cfg.Filters)
142
	if err != nil {
143
		return nil, nil, errors.Info(err, "new log filter").Detail(err)
144
	}
145

146
	return &jsonAuditlog{
147
		module:       module,
148
		decoder:      &defaultDecoder{},
149
		metricSender: NewPrometheusSender(cfg.MetricConfig),
150
		logFile:      logFile,
151
		logFilter:    logFilter,
152

153
		logPool: sync.Pool{
154
			New: func() interface{} {
155
				return new(bytes.Buffer)
156
			},
157
		},
158
		bodyPool: sync.Pool{
159
			New: func() interface{} {
160
				return make([]byte, cfg.BodyLimit)
161
			},
162
		},
163
		cfg: cfg,
164
	}, logFile, nil
165
}
166

167
func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request)) {
168
	var (
169
		logBytes []byte
170
		err      error
171
	)
172
	startTime := time.Now().UnixNano()
173

174
	ctx := req.Context()
175
	span := trace.SpanFromContext(ctx)
176
	if span == nil {
177
		span, ctx = trace.StartSpanFromHTTPHeaderSafe(req, "")
178
		defer span.Finish()
179
		req = req.WithContext(ctx)
180
	}
181

182
	_w := &responseWriter{
183
		module:         j.module,
184
		body:           j.bodyPool.Get().([]byte),
185
		bodyLimit:      j.cfg.BodyLimit,
186
		no2xxBody:      j.cfg.No2xxBody,
187
		span:           span,
188
		startTime:      time.Now(),
189
		statusCode:     http.StatusOK,
190
		ResponseWriter: w,
191
	}
192

193
	// parse request to decodeRep
194
	decodeReq := j.decoder.DecodeReq(req)
195

196
	// handle panic recover, return 597 status code
197
	defer func() {
198
		j.bodyPool.Put(_w.body) // nolint: staticcheck
199

200
		p := recover()
201
		if p != nil {
202
			span.Printf("WARN: panic fired in %v.panic - %v\n", f, p)
203
			span.Println(string(debug.Stack()))
204
			w.WriteHeader(597)
205
		}
206
	}()
207

208
	rc := &reqBodyReadCloser{ReadCloser: req.Body}
209
	req.Body = rc
210
	f(_w, req)
211
	bodySize := rc.bodyRead
212
	decodeReq.Header["BodySize"] = bodySize
213

214
	endTime := time.Now().UnixNano() / 1000
215
	b := j.logPool.Get().(*bytes.Buffer)
216
	defer j.logPool.Put(b)
217
	b.Reset()
218

219
	auditLog := &AuditLog{
220
		ReqType:   "REQ",
221
		Module:    j.module,
222
		StartTime: startTime / 100,
223
		Method:    req.Method,
224
		Path:      decodeReq.Path,
225
		ReqHeader: decodeReq.Header,
226
	}
227

228
	if len(decodeReq.Params) <= maxSeekableBodyLength && len(decodeReq.Params) > 0 {
229
		auditLog.ReqParams = string(decodeReq.Params)
230
	}
231

232
	// record response info
233
	respContentType := _w.Header().Get("Content-Type")
234
	auditLog.StatusCode = _w.getStatusCode()
235

236
	// Check if track-log and tags changed or not,
237
	// if changed, we should set into response header again.
238
	// But the additional headers DO NOT write to client if
239
	// they set after response WriteHeader, just logging.
240
	wHeader := _w.Header()
241
	traceLogs := span.TrackLog()
242
	if len(wHeader[rpc.HeaderTraceLog]) < len(traceLogs) {
243
		wHeader[rpc.HeaderTraceLog] = traceLogs
244
	}
245
	tags := span.Tags().ToSlice()
246
	if len(wHeader[rpc.HeaderTraceTags]) < len(tags) {
247
		wHeader[rpc.HeaderTraceTags] = tags
248
	}
249
	auditLog.RespHeader = _w.getHeader()
250

251
	if (respContentType == rpc.MIMEJSON || respContentType == rpc.MIMEXML) &&
252
		_w.Header().Get("Content-Encoding") != rpc.GzipEncodingType {
253
		auditLog.RespBody = string(_w.getBody())
254
	}
255

256
	auditLog.RespLength = _w.getBodyWritten()
257
	auditLog.Duration = endTime - startTime/1000
258

259
	j.metricSender.Send(auditLog.ToBytesWithTab(b))
260

261
	if j.logFile == nil || j.logFilter.Filter(auditLog) {
262
		return
263
	}
264

265
	switch j.cfg.LogFormat {
266
	case LogFormatJSON:
267
		logBytes = auditLog.ToJson()
268
	default:
269
		logBytes = b.Bytes() // *bytes.Buffer was filled with metricSender.Send
270
	}
271
	err = j.logFile.Log(logBytes)
272
	if err != nil {
273
		span.Errorf("jsonlog.Handler Log failed, err: %s", err.Error())
274
		return
275
	}
276
}
277

278
// ExtraHeader provides extra response header writes to the ResponseWriter.
279
func ExtraHeader(w http.ResponseWriter) http.Header {
280
	h := make(http.Header)
281
	if eh, ok := w.(ResponseExtraHeader); ok {
282
		h = eh.ExtraHeader()
283
	}
284

285
	return h
286
}
287

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.