1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
27
"github.com/cubefs/cubefs/blobstore/common/rpc"
28
"github.com/cubefs/cubefs/blobstore/common/trace"
29
"github.com/cubefs/cubefs/blobstore/util/errors"
30
"github.com/cubefs/cubefs/blobstore/util/largefile"
34
defaultReadBodyBuffLength = 512
35
maxReadBodyBuffLength = 2048
36
defaultFileChunkBits = 29
39
type reqBodyReadCloser struct {
45
func (reqBody *reqBodyReadCloser) Read(p []byte) (n int, err error) {
46
n, err = reqBody.ReadCloser.Read(p)
47
reqBody.bodyRead += int64(n)
51
type jsonAuditlog struct {
54
metricSender MetricSender
64
// AuditLog Define a struct to represent the structured log data
66
ReqType string `json:"req_type"`
67
Module string `json:"module"`
68
StartTime int64 `json:"start_time"`
69
Method string `json:"method"`
70
Path string `json:"path"`
71
ReqHeader M `json:"req_header"`
72
ReqParams string `json:"req_params"`
73
StatusCode int `json:"status_code"`
74
RespHeader M `json:"resp_header"`
75
RespBody string `json:"resp_body"`
76
RespLength int64 `json:"resp_length"`
77
Duration int64 `json:"duration"`
80
func (a *AuditLog) ToBytesWithTab(buf *bytes.Buffer) (b []byte) {
81
buf.WriteString(a.ReqType)
83
buf.WriteString(a.Module)
85
buf.WriteString(strconv.FormatInt(a.StartTime, 10))
87
buf.WriteString(a.Method)
89
buf.WriteString(a.Path)
91
buf.Write(a.ReqHeader.Encode())
93
buf.WriteString(a.ReqParams)
95
buf.WriteString(strconv.Itoa(a.StatusCode))
97
buf.Write(a.RespHeader.Encode())
99
buf.WriteString(a.RespBody)
101
buf.WriteString(strconv.FormatInt(a.RespLength, 10))
103
buf.WriteString(strconv.FormatInt(a.Duration, 10))
108
func (a *AuditLog) ToJson() (b []byte) {
109
b, _ = json.Marshal(a)
113
func Open(module string, cfg *Config) (ph rpc.ProgressHandler, logFile LogCloser, err error) {
114
if cfg.BodyLimit < 0 {
116
} else if cfg.BodyLimit == 0 {
117
cfg.BodyLimit = defaultReadBodyBuffLength
118
} else if cfg.BodyLimit > maxReadBodyBuffLength {
119
cfg.BodyLimit = maxReadBodyBuffLength
122
if cfg.ChunkBits == 0 {
123
cfg.ChunkBits = defaultFileChunkBits
126
largeLogConfig := largefile.Config{
128
FileChunkSizeBits: cfg.ChunkBits,
129
Suffix: cfg.LogFileSuffix,
133
logFile = noopLogCloser{}
134
if cfg.LogDir != "" {
135
logFile, err = largefile.OpenLargeFileLog(largeLogConfig, cfg.RotateNew)
137
return nil, nil, errors.Info(err, "auditlog.Open: large file log open failed").Detail(err)
141
logFilter, err := newLogFilter(cfg.Filters)
143
return nil, nil, errors.Info(err, "new log filter").Detail(err)
146
return &jsonAuditlog{
148
decoder: &defaultDecoder{},
149
metricSender: NewPrometheusSender(cfg.MetricConfig),
151
logFilter: logFilter,
154
New: func() interface{} {
155
return new(bytes.Buffer)
159
New: func() interface{} {
160
return make([]byte, cfg.BodyLimit)
167
func (j *jsonAuditlog) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request)) {
172
startTime := time.Now().UnixNano()
175
span := trace.SpanFromContext(ctx)
177
span, ctx = trace.StartSpanFromHTTPHeaderSafe(req, "")
179
req = req.WithContext(ctx)
182
_w := &responseWriter{
184
body: j.bodyPool.Get().([]byte),
185
bodyLimit: j.cfg.BodyLimit,
186
no2xxBody: j.cfg.No2xxBody,
188
startTime: time.Now(),
189
statusCode: http.StatusOK,
193
// parse request to decodeRep
194
decodeReq := j.decoder.DecodeReq(req)
196
// handle panic recover, return 597 status code
198
j.bodyPool.Put(_w.body) // nolint: staticcheck
202
span.Printf("WARN: panic fired in %v.panic - %v\n", f, p)
203
span.Println(string(debug.Stack()))
208
rc := &reqBodyReadCloser{ReadCloser: req.Body}
211
bodySize := rc.bodyRead
212
decodeReq.Header["BodySize"] = bodySize
214
endTime := time.Now().UnixNano() / 1000
215
b := j.logPool.Get().(*bytes.Buffer)
216
defer j.logPool.Put(b)
219
auditLog := &AuditLog{
222
StartTime: startTime / 100,
224
Path: decodeReq.Path,
225
ReqHeader: decodeReq.Header,
228
if len(decodeReq.Params) <= maxSeekableBodyLength && len(decodeReq.Params) > 0 {
229
auditLog.ReqParams = string(decodeReq.Params)
232
// record response info
233
respContentType := _w.Header().Get("Content-Type")
234
auditLog.StatusCode = _w.getStatusCode()
236
// Check if track-log and tags changed or not,
237
// if changed, we should set into response header again.
238
// But the additional headers DO NOT write to client if
239
// they set after response WriteHeader, just logging.
240
wHeader := _w.Header()
241
traceLogs := span.TrackLog()
242
if len(wHeader[rpc.HeaderTraceLog]) < len(traceLogs) {
243
wHeader[rpc.HeaderTraceLog] = traceLogs
245
tags := span.Tags().ToSlice()
246
if len(wHeader[rpc.HeaderTraceTags]) < len(tags) {
247
wHeader[rpc.HeaderTraceTags] = tags
249
auditLog.RespHeader = _w.getHeader()
251
if (respContentType == rpc.MIMEJSON || respContentType == rpc.MIMEXML) &&
252
_w.Header().Get("Content-Encoding") != rpc.GzipEncodingType {
253
auditLog.RespBody = string(_w.getBody())
256
auditLog.RespLength = _w.getBodyWritten()
257
auditLog.Duration = endTime - startTime/1000
259
j.metricSender.Send(auditLog.ToBytesWithTab(b))
261
if j.logFile == nil || j.logFilter.Filter(auditLog) {
265
switch j.cfg.LogFormat {
267
logBytes = auditLog.ToJson()
269
logBytes = b.Bytes() // *bytes.Buffer was filled with metricSender.Send
271
err = j.logFile.Log(logBytes)
273
span.Errorf("jsonlog.Handler Log failed, err: %s", err.Error())
278
// ExtraHeader provides extra response header writes to the ResponseWriter.
279
func ExtraHeader(w http.ResponseWriter) http.Header {
280
h := make(http.Header)
281
if eh, ok := w.(ResponseExtraHeader); ok {