1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
26
"github.com/cubefs/cubefs/blobstore/util/log"
37
TAG_RESPONSE_HEADER = 8
39
TAG_RESPONSE_SIZE = 10
40
TAG_RESPONSE_TIME = 11
44
LogTypeAudit = "audit"
47
type LogEntry interface {
53
RespTime() int64 // 100ns
61
// ReqParams returns params of request, including params in raw query and body
67
func ErrInValidFieldCnt(msg string) error {
68
return errors.New("invalid field count, " + msg)
71
var vre = regexp.MustCompile("^v[0-9]+$")
73
type ReqHeader struct {
74
ContentLength string `json:"Content-Length"`
75
BodySize int64 `json:"BodySize"` // body size
76
RawQuery string `json:"RawQuery"`
77
Host string `json:"Host"`
78
Token *Token `json:"Token"`
79
XRealIp string `json:"X-Real-Ip"`
80
XFromCdn string `json:"X-From-Cdn"`
81
XSrc string `json:"X-Src"`
83
UA string `json:"User-Agent"`
87
AppId uint64 `json:"appid"`
88
Uid uint32 `json:"uid"`
89
Utype uint32 `json:"utype"`
93
Uid uint32 `json:"uid"`
94
Bucket string `json:"tbl"`
98
Bucket string `json:"bucket"`
99
EntryURI string `json:"entryURI"`
102
type RespHeader struct {
103
ContentLength string `json:"Content-Length"`
104
Xlog []string `json:"X-Log"`
105
XWarn []string `json:"X-Warn"`
106
Bucket string `json:"Tbl"`
107
Token *Token `json:"Token"`
108
SToken *SToken `json:"SToken"`
109
FileType int `json:"FileType"`
110
BatchOps map[string]int64 `json:"batchOps"`
111
PreDelSize map[string]int64 `json:"preDelSize"`
112
PreDelArchiveSize map[string]int64 `json:"preDelArchiveSize"`
113
OUid uint32 `json:"ouid"` // owner uid
114
RsInfo *RsInfo `json:"rs-info"`
115
XRespCode string `json:"X-Resp-Code"` // return from dora
116
BillTag string `json:"billtag"` // must be same with definition in billtag.go
117
BatchDeletes map[uint32]int64 `json:"batchDelete"`
118
ApiName string `json:"api"` // api name of this auditlog
121
type RequestRow struct {
124
// the following fields will be loaded lazily and use get request to obtain them
126
respHeader *RespHeader
130
type AdRowParser struct{}
132
func (self *AdRowParser) Parse(line string) (*RequestRow, error) {
133
return ParseReqlogToAdrow(line)
136
func ParseReqlogToAdrow(line string) (*RequestRow, error) {
137
idx := strings.Index(line, "REQ")
139
return nil, errors.New("invalid reqlog")
141
line = strings.TrimSuffix(line[idx:], "\n")
143
a.data = strings.Split(line, "\t")
144
if len(a.data) != 12 && len(a.data) != 14 {
145
return nil, ErrInValidFieldCnt(fmt.Sprintf("expect 12 or 14 while get %v", len(a.data)))
147
if a.data[0] != "REQ" {
148
return nil, fmt.Errorf("invalid Head: %v", a.data[0])
153
func ParseReqlog(line string) (LogEntry, error) {
154
return ParseReqlogToAdrow(line)
157
func (a *RequestRow) LogType() string {
161
func (a *RequestRow) ApiName() string { // s3, apiName can get directly from auditlog
162
respHeader := a.getRespHeader()
163
if respHeader == nil {
166
return respHeader.ApiName
169
func (a *RequestRow) Uid() uint32 {
170
respSToken := a.RespSToken()
171
if respSToken != nil {
172
return respSToken.Uid
174
respToken := a.RespToken()
175
if respToken != nil {
178
reqToken := a.ReqToken()
185
func (a *RequestRow) RespToken() *Token {
186
respHeader := a.getRespHeader()
187
if respHeader == nil {
190
return respHeader.Token
193
func (a *RequestRow) RespSToken() *SToken {
194
respHeader := a.getRespHeader()
195
if respHeader == nil {
198
return respHeader.SToken
201
func (a *RequestRow) UA() string {
202
reqHeader := a.getReqHeader()
203
if reqHeader != nil && reqHeader.UA != "" {
209
func (a *RequestRow) Bucket() string {
210
respHeader := a.getRespHeader()
211
if respHeader != nil && respHeader.Bucket != "" {
212
return respHeader.Bucket
217
func (a *RequestRow) getReqHeader() *ReqHeader {
218
if a.reqHeader != nil {
221
err := json.Unmarshal([]byte(a.data[TAG_HEADER]), &a.reqHeader)
223
log.Warnf("%q, Unmarshal ReqHeader err %v", a.data[TAG_HEADER], err)
229
func (a *RequestRow) getRawquery() *url.Values {
230
reqHeader := a.getReqHeader()
231
if reqHeader == nil {
234
m, err := url.ParseQuery(reqHeader.RawQuery)
242
func (a *RequestRow) getRespHeader() *RespHeader {
243
if a.respHeader != nil {
246
err := json.Unmarshal([]byte(a.data[TAG_RESPONSE_HEADER]), &a.respHeader)
248
log.Warnf("%q, Unmarshal RespHeader err %v", a.data[TAG_RESPONSE_HEADER], err)
254
func (a *RequestRow) String() string {
255
return strings.Join(a.data, "\t")
258
func (a *RequestRow) Code() string {
259
if isType(Digital, a.data[TAG_CODE]) {
260
return a.data[TAG_CODE]
265
func (a *RequestRow) ReqToken() *Token {
266
reqHeader := a.getReqHeader()
267
if reqHeader == nil {
270
return reqHeader.Token
273
// get unix second of time
274
func (a *RequestRow) ReqTime() int64 {
275
t, err := strconv.ParseInt(a.data[TAG_TIME], 10, 0)
277
log.Errorf("cannot parse time %v to int", a.data[TAG_TIME])
282
func (a *RequestRow) RemoteIp() string {
283
reqHeader := a.getReqHeader()
284
if reqHeader != nil {
285
if reqHeader.XRealIp != "" {
286
return strings.TrimSpace(reqHeader.XRealIp)
288
return strings.TrimSpace(reqHeader.IP)
294
func (a *RequestRow) ReqCdn() string {
295
reqHeader := a.getReqHeader()
296
if reqHeader == nil {
299
return reqHeader.XFromCdn
302
func (a *RequestRow) ReqSrc() string {
303
reqHeader := a.getReqHeader()
304
if reqHeader == nil {
307
return reqHeader.XSrc
310
func (a *RequestRow) Service() string {
311
return a.data[TAG_SERVICE]
314
func (a *RequestRow) Method() string {
315
return a.data[TAG_METHOD]
318
func (a *RequestRow) Path() string {
319
return a.data[TAG_PATH]
322
func (a *RequestRow) RawQuery() string {
323
reqHeader := a.getReqHeader()
324
if reqHeader == nil {
327
return a.getReqHeader().RawQuery
330
func (a *RequestRow) RespTime() (respTime int64) {
332
respTime, err = strconv.ParseInt(a.data[TAG_RESPONSE_TIME], 10, 64)
339
func (a *RequestRow) RespLength() (respLength int64) {
340
respLength, _ = strconv.ParseInt(a.data[TAG_RESPONSE_SIZE], 10, 64)
344
func (a *RequestRow) XRespCode() string {
345
respHeader := a.getRespHeader()
346
if respHeader == nil {
349
return respHeader.XRespCode
352
func (a *RequestRow) ReqLength() (reqLength int64) {
353
reqHeader := a.getReqHeader()
354
if reqHeader == nil {
357
return reqHeader.BodySize
360
func (a *RequestRow) ReqHost() string {
361
reqHeader := a.getReqHeader()
362
if reqHeader == nil {
365
return reqHeader.Host
368
func (a *RequestRow) ReqParams() string {
369
return a.data[TAG_PARAMS]
372
func (a *RequestRow) ReqFsize() (fsize int64) {
373
rawQuery := a.getRawquery()
377
fsize, _ = strconv.ParseInt(rawQuery.Get("fsize"), 10, 64)
381
func (a *RequestRow) RemoteAddr() string {
385
func (a *RequestRow) XWarns() []string {
386
respHeader := a.getRespHeader()
387
if respHeader == nil {
390
return respHeader.XWarn
393
func (a *RequestRow) BillTag() string {
394
respHeader := a.getRespHeader()
395
if respHeader == nil {
398
return respHeader.BillTag
401
func (a *RequestRow) BatchDelete() map[uint32]int64 {
402
respHeader := a.getRespHeader()
403
if respHeader == nil {
406
return respHeader.BatchDeletes
415
func (a *RequestRow) Xlogs() []string {
416
respHeader := a.getRespHeader()
417
if respHeader == nil {
420
return respHeader.Xlog
423
func (a *RequestRow) XlogSearch(name string) (rets []Xlog) {
425
for _, log := range xlogs {
426
items := strings.Split(log, ";")
427
for _, item := range items {
428
if !strings.HasPrefix(item, name) {
431
ret := parseXlog(item)
432
if ret.Name == name {
433
rets = append(rets, ret)
440
func parseXlog(s string) (ret Xlog) {
441
idx := strings.Index(s, "/")
446
idx = strings.LastIndex(s, ":")
448
ret.MsSpend, _ = strconv.ParseUint(s[idx+1:], 10, 64)
455
func (a *RequestRow) XlogTime(name string) (msSpeed uint64) {
456
rets := a.XlogSearch(name)
457
for _, ret := range rets {
458
if ret.MsSpend > msSpeed {
459
msSpeed = ret.MsSpend
465
func (a *RequestRow) XlogsTime(names []string) (msSpeedTotal uint64) {
466
for _, name := range names {
467
msSpeedTotal += a.XlogTime(name)
472
// apiWithParams returns api information with maxApiLevel( default 2).
473
func apiWithParams(service, method, path, host, params string, maxApiLevel int) (api string) {
474
const unknown = ".unknown"
475
if service == "" || method == "" {
476
return "unknown.unknown"
478
stype := strings.ToLower(service)
479
fields := strings.Split(strings.ToLower(path), "/")
480
if len(fields) <= 1 {
481
return stype + unknown
484
firstPath := fields[1]
487
if (vre.MatchString(firstPath) || firstPath == "admin") && len(fields) > 2 && fields[2] != "" {
488
firstPath = firstPath + "-" + fields[2]
492
// for defy api from apiserver
493
if firstPath == "v2-tune" {
494
return stype + ".v2-tune." + strings.Join(fields[firstPathIndex+1:], ".")
496
if !isValidApi(firstPath) {
497
return stype + unknown
503
index := firstPathIndex + 1
504
length := len(fields)
505
for level <= maxApiLevel && index < length {
506
api += "." + fields[index]
510
if !isValidMultiPathApi(api) {
511
return stype + unknown
515
return stype + "." + api
518
func isValidApi(api string) bool {
519
return isType(ALPHA|Digital|SubLetter|Underline, api)
522
func isValidMultiPathApi(api string) bool {
523
return isType(ALPHA|Digital|DotLetter|SubLetter|Underline, api)