seaweedfs/weed/s3api/s3err/audit_fluent.go

184 lines
5.7 KiB
Go
Raw Permalink Normal View History

2021-12-07 07:13:21 +00:00
package s3err
import (
2021-12-07 13:20:52 +00:00
"encoding/json"
2021-12-07 07:13:21 +00:00
"fmt"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
2021-12-07 07:13:21 +00:00
"net/http"
"os"
"time"
)
2021-12-07 07:15:48 +00:00
type AccessLogExtend struct {
2021-12-07 07:13:21 +00:00
AccessLog
AccessLogHTTP
}
2021-12-07 07:15:48 +00:00
type AccessLog struct {
2021-12-07 13:20:52 +00:00
Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1
Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000]
RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3
Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id
RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07
Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg
ErrorCode string `msg:"error_code" json:"error_code,omitempty"`
HostId string `msg:"host_id" json:"host_id,omitempty"`
HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
UserAgent string `msg:"user_agent" json:"user_agent,omitempty"`
HTTPStatus int `msg:"status" json:"status,omitempty"`
SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"`
2021-12-07 07:13:21 +00:00
}
type AccessLogHTTP struct {
2021-12-07 13:20:52 +00:00
RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
BytesSent string `json:"bytes_sent,omitempty"`
ObjectSize string `json:"object_size,omitempty"`
TotalTime int `json:"total_time,omitempty"`
TurnAroundTime int `json:"turn_around_time,omitempty"`
Referer string `json:"Referer,omitempty"`
VersionId string `json:"version_id,omitempty"`
CipherSuite string `json:"cipher_suite,omitempty"`
AuthenticationType string `json:"auth_type,omitempty"`
TLSVersion string `json:"TLS_version,omitempty"`
2021-12-07 07:13:21 +00:00
}
const tag = "s3.access"
var (
2021-12-10 14:40:32 +00:00
Logger *fluent.Fluent
hostname = os.Getenv("HOSTNAME")
environment = os.Getenv("ENVIRONMENT")
2021-12-07 07:13:21 +00:00
)
2021-12-07 13:20:52 +00:00
func InitAuditLog(config string) {
configContent, readErr := os.ReadFile(config)
if readErr != nil {
2021-12-09 14:47:16 +00:00
glog.Errorf("fail to read fluent config %s : %v", config, readErr)
return
}
2021-12-10 14:40:32 +00:00
fluentConfig := &fluent.Config{}
2021-12-09 14:47:16 +00:00
if err := json.Unmarshal(configContent, fluentConfig); err != nil {
2021-12-10 14:40:32 +00:00
glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err)
2021-12-09 14:47:16 +00:00
return
2021-12-07 13:20:52 +00:00
}
2021-12-09 14:47:16 +00:00
if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 {
fluentConfig.TagPrefix = environment
2021-12-07 13:20:52 +00:00
}
fluentConfig.Async = true
fluentConfig.AsyncResultCallback = func(data []byte, err error) {
if err != nil {
glog.Warning("Error while posting log: ", err)
}
}
2021-12-07 07:13:21 +00:00
var err error
2021-12-09 14:47:16 +00:00
Logger, err = fluent.New(*fluentConfig)
2021-12-07 07:13:21 +00:00
if err != nil {
2021-12-09 14:47:16 +00:00
glog.Errorf("fail to load fluent config: %v", err)
2021-12-07 07:13:21 +00:00
}
}
func getREST(httpMetod string, resourceType string) string {
return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType)
}
2021-12-07 13:20:52 +00:00
func getResourceType(object string, query_key string, metod string) (string, bool) {
if object == "/" {
switch query_key {
2021-12-07 07:13:21 +00:00
case "delete":
return "BATCH.DELETE.OBJECT", true
case "tagging":
return getREST(metod, "OBJECTTAGGING"), true
case "lifecycle":
return getREST(metod, "LIFECYCLECONFIGURATION"), true
case "acl":
return getREST(metod, "ACCESSCONTROLPOLICY"), true
case "policy":
return getREST(metod, "BUCKETPOLICY"), true
default:
return getREST(metod, "BUCKET"), false
}
2021-12-07 13:20:52 +00:00
} else {
switch query_key {
case "tagging":
return getREST(metod, "OBJECTTAGGING"), true
default:
return getREST(metod, "OBJECT"), false
}
2021-12-07 07:13:21 +00:00
}
}
2021-12-07 07:15:48 +00:00
func getOperation(object string, r *http.Request) string {
2021-12-07 07:13:21 +00:00
queries := r.URL.Query()
var operation string
var queryFound bool
2021-12-07 13:20:52 +00:00
for key, _ := range queries {
operation, queryFound = getResourceType(object, key, r.Method)
if queryFound {
2021-12-07 07:13:21 +00:00
return operation
}
}
2021-12-07 13:20:52 +00:00
if len(queries) == 0 {
operation, _ = getResourceType(object, "", r.Method)
}
2021-12-07 07:13:21 +00:00
return operation
}
2021-12-07 13:20:52 +00:00
func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP {
return AccessLogHTTP{
RequestURI: r.RequestURI,
Referer: r.Header.Get("Referer"),
}
}
func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog {
bucket, key := s3_constants.GetBucketAndObject(r)
2021-12-07 07:13:21 +00:00
var errorCode string
2021-12-07 07:15:48 +00:00
if s3errCode != ErrNone {
errorCode = GetAPIError(s3errCode).Code
2021-12-07 07:13:21 +00:00
}
2021-12-07 13:20:52 +00:00
remoteIP := r.Header.Get("X-Real-IP")
if len(remoteIP) == 0 {
remoteIP = r.RemoteAddr
}
2021-12-09 14:47:16 +00:00
hostHeader := r.Header.Get("X-Forwarded-Host")
2021-12-07 13:20:52 +00:00
if len(hostHeader) == 0 {
2021-12-09 14:47:16 +00:00
hostHeader = r.Host
2021-12-07 13:20:52 +00:00
}
return &AccessLog{
2021-12-10 14:40:32 +00:00
HostHeader: hostHeader,
RequestID: r.Header.Get("X-Request-ID"),
RemoteIP: remoteIP,
Requester: r.Header.Get(s3_constants.AmzIdentityId),
SignatureVersion: r.Header.Get(s3_constants.AmzAuthType),
2021-12-10 14:40:32 +00:00
UserAgent: r.Header.Get("user-agent"),
HostId: hostname,
Bucket: bucket,
HTTPStatus: HTTPStatusCode,
Time: time.Now().Unix(),
Key: key,
Operation: getOperation(key, r),
ErrorCode: errorCode,
2021-12-07 07:13:21 +00:00
}
}
2021-12-07 13:20:52 +00:00
func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) {
if Logger == nil {
2021-12-07 07:13:21 +00:00
return
}
if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil {
glog.Warning("Error while posting log: ", err)
}
2021-12-07 13:20:52 +00:00
}
2021-12-09 14:47:16 +00:00
func PostAccessLog(log AccessLog) {
if Logger == nil || len(log.Key) == 0 {
2021-12-07 13:20:52 +00:00
return
}
2021-12-09 14:47:16 +00:00
if err := Logger.Post(tag, log); err != nil {
2021-12-07 13:20:52 +00:00
glog.Warning("Error while posting log: ", err)
2021-12-07 07:13:21 +00:00
}
2021-12-07 07:15:48 +00:00
}