audit log config

This commit is contained in:
Konstantin Lebedev 2021-12-07 18:20:52 +05:00
parent 4ec8715f20
commit 10678cde81
12 changed files with 145 additions and 65 deletions

View file

@ -0,0 +1,4 @@
{
"fluent_port": 24224,
"fluent_host": "fluent"
}

View file

@ -0,0 +1,20 @@
version: '2'
services:
server:
image: chrislusf/seaweedfs:local
ports:
- 8333:8333
- 9333:9333
- 19333:19333
- 8084:8080
- 18084:18080
- 8888:8888
- 18888:18888
command: "server -ip=server -filer -s3 -s3.auditLogConfig=/etc/seaweedfs/fluent.json -volume.max=0 -master.volumeSizeLimitMB=8 -volume.preStopSeconds=1"
volumes:
- ./fluent.json:/etc/seaweedfs/fluent.json
fluent:
image: fluent/fluentd:v1.14
ports:
- 24224:24224

View file

@ -84,6 +84,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
// start webdav on filer

View file

@ -3,6 +3,7 @@ package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"net/http"
"time"
@ -31,6 +32,7 @@ type S3Options struct {
tlsCertificate *string
metricsHttpPort *int
allowEmptyFolder *bool
auditLogConfig *string
}
func init() {
@ -39,7 +41,7 @@ func init() {
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
@ -193,6 +195,10 @@ func (s3opt *S3Options) startS3Server() bool {
glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
}
if len(*s3opt.auditLogConfig) > 0 {
s3err.InitAuditLog(*s3opt.auditLogConfig)
}
if *s3opt.tlsPrivateKey != "" {
glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {

View file

@ -131,6 +131,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port")

View file

@ -28,7 +28,7 @@ func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
func writeSuccessResponseXML(w http.ResponseWriter, r *http.Request, response interface{}) {
s3err.WriteXMLResponse(w, r, http.StatusOK, response)
s3err.PostLog(r, s3err.ErrNone)
s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}
func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) {

View file

@ -165,11 +165,13 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) {
s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) {
statusCode = http.StatusNoContent
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
w.WriteHeader(http.StatusNoContent)
w.WriteHeader(statusCode)
return statusCode
})
}
@ -224,14 +226,17 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
directoriesWithDeletion := make(map[string]int)
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// delete file entries
for _, object := range deleteObjects.Objects {
lastSeparator := strings.LastIndex(object.ObjectName, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
@ -254,6 +259,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
Key: object.ObjectName,
})
}
if auditLog != nil {
auditLog.Key = entryName
s3err.PostAccessLog(auditLog)
}
}
// purge empty folders, only checking folders with deletions
@ -306,7 +315,7 @@ var passThroughHeaders = []string{
"response-expires",
}
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) {
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) {
glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
@ -360,20 +369,23 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
}
}
responseFn(resp, w)
s3err.PostLog(r, s3err.ErrNone)
responseStatusCode := responseFn(resp, w)
s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
}
func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) {
func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) {
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
w.WriteHeader(http.StatusPartialContent)
statusCode = http.StatusPartialContent
} else {
w.WriteHeader(proxyResponse.StatusCode)
statusCode = proxyResponse.StatusCode
}
w.WriteHeader(statusCode)
io.Copy(w, proxyResponse.Body)
return statusCode
}
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code s3err.ErrorCode) {

View file

@ -142,7 +142,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
Location: w.Header().Get("Location"),
}
s3err.WriteXMLResponse(w, r, http.StatusCreated, resp)
s3err.PostLog(r, s3err.ErrNone)
s3err.PostLog(r, http.StatusCreated, s3err.ErrNone)
case "200":
s3err.WriteEmptyResponse(w, r, http.StatusOK)
default:

View file

@ -92,7 +92,7 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
}
w.WriteHeader(http.StatusOK)
s3err.PostLog(r, s3err.ErrNone)
s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}
// DeleteObjectTaggingHandler Delete object tagging
@ -118,5 +118,5 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt
}
w.WriteHeader(http.StatusNoContent)
s3err.PostLog(r, s3err.ErrNone)
s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
}

View file

@ -38,7 +38,6 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.registerRouter(router)
go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano())
return s3ApiServer, nil
}

View file

@ -1,6 +1,7 @@
package s3err
import (
"encoding/json"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
@ -16,45 +17,52 @@ type AccessLogExtend struct {
}
type AccessLog struct {
Bucket string `json:"bucket"` // awsexamplebucket1
Time time.Time `json:"time"` // [06/Feb/2019:00:00:38 +0000]
RemoteIP string `json:"remote_ip,omitempty"` // 192.0.2.3
Requester string `json:"requester,omitempty"` // IAM user id
RequestID string `json:"request_id,omitempty"` // 3E57427F33A59F07
Operation string `json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
Key string `json:"Key,omitempty"` // /photos/2019/08/puppy.jpg
ErrorCode string `json:"error_code,omitempty"`
HostId string `json:"host_id,omitempty"`
HostHeader string `json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
SignatureVersion string `json:"signature_version,omitempty"`
Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1
Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000]
RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3
Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id
RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07
Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg
ErrorCode string `msg:"error_code" json:"error_code,omitempty"`
HostId string `msg:"host_id" json:"host_id,omitempty"`
HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
UserAgent string `msg:"user_agent" json:"user_agent,omitempty"`
HTTPStatus int `msg:"status" json:"status,omitempty"`
SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"`
}
type AccessLogHTTP struct {
RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
HTTPStatus int `json:"HTTP_status,omitempty"`
BytesSent string `json:"bytes_sent,omitempty"`
ObjectSize string `json:"object_size,omitempty"`
TotalTime time.Duration `json:"total_time,omitempty"`
TurnAroundTime time.Duration `json:"turn_around_time,omitempty"`
Referer string `json:"Referer,omitempty"`
UserAgent string `json:"user_agent,omitempty"`
VersionId string `json:"version_id,omitempty"`
CipherSuite string `json:"cipher_suite,omitempty"`
AuthenticationType string `json:"auth_type,omitempty"`
TLSVersion string `json:"TLS_version,omitempty"`
RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
BytesSent string `json:"bytes_sent,omitempty"`
ObjectSize string `json:"object_size,omitempty"`
TotalTime int `json:"total_time,omitempty"`
TurnAroundTime int `json:"turn_around_time,omitempty"`
Referer string `json:"Referer,omitempty"`
VersionId string `json:"version_id,omitempty"`
CipherSuite string `json:"cipher_suite,omitempty"`
AuthenticationType string `json:"auth_type,omitempty"`
TLSVersion string `json:"TLS_version,omitempty"`
}
const tag = "s3.access"
var (
logger *fluent.Fluent
Logger *fluent.Fluent
hostname = os.Getenv("HOSTNAME")
)
func init() {
func InitAuditLog(config string) {
configContent, readErr := os.ReadFile(config)
if readErr != nil {
glog.Fatalf("fail to read fluent config %s : %v", config, readErr)
}
var fluentConfig fluent.Config
if err := json.Unmarshal(configContent, &fluentConfig); err != nil {
glog.Fatalf("fail to parse fluent config %s : %v", config, err)
}
var err error
logger, err = fluent.New(fluent.Config{})
Logger, err = fluent.New(fluentConfig)
if err != nil {
glog.Fatalf("fail to load fluent config: %v", err)
}
@ -64,16 +72,9 @@ func getREST(httpMetod string, resourceType string) string {
return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType)
}
func getResourceType(object string, query string, metod string) (string, bool) {
if len(object) > 0 {
switch query {
case "tagging":
return getREST(metod, "OBJECTTAGGING"), true
default:
return getREST(metod, "OBJECT"), false
}
} else {
switch query {
func getResourceType(object string, query_key string, metod string) (string, bool) {
if object == "/" {
switch query_key {
case "delete":
return "BATCH.DELETE.OBJECT", true
case "tagging":
@ -87,6 +88,13 @@ func getResourceType(object string, query string, metod string) (string, bool) {
default:
return getREST(metod, "BUCKET"), false
}
} else {
switch query_key {
case "tagging":
return getREST(metod, "OBJECTTAGGING"), true
default:
return getREST(metod, "OBJECT"), false
}
}
}
@ -94,40 +102,69 @@ func getOperation(object string, r *http.Request) string {
queries := r.URL.Query()
var operation string
var queryFound bool
for query, _ := range queries {
if operation, queryFound = getResourceType(object, query, r.Method); queryFound {
for key, _ := range queries {
operation, queryFound = getResourceType(object, key, r.Method)
if queryFound {
return operation
}
}
if len(queries) == 0 {
operation, _ = getResourceType(object, "", r.Method)
}
return operation
}
func GetAccessLog(r *http.Request, s3errCode ErrorCode) AccessLog {
func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP {
return AccessLogHTTP{
RequestURI: r.RequestURI,
Referer: r.Header.Get("Referer"),
}
}
func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog {
bucket, key := xhttp.GetBucketAndObject(r)
var errorCode string
if s3errCode != ErrNone {
errorCode = GetAPIError(s3errCode).Code
}
return AccessLog{
HostHeader: r.Header.Get("Host"),
remoteIP := r.Header.Get("X-Real-IP")
if len(remoteIP) == 0 {
remoteIP = r.RemoteAddr
}
hostHeader := r.Header.Get("Host")
if len(hostHeader) == 0 {
hostHeader = r.URL.Hostname()
}
return &AccessLog{
HostHeader: hostHeader,
RequestID: r.Header.Get("X-Request-ID"),
RemoteIP: r.Header.Get("X-Real-IP"),
RemoteIP: remoteIP,
Requester: r.Header.Get(xhttp.AmzIdentityId),
UserAgent: r.Header.Get("UserAgent"),
HostId: hostname,
Bucket: bucket,
Time: time.Now(),
HTTPStatus: HTTPStatusCode,
Time: time.Now().Unix(),
Key: key,
Operation: getOperation(key, r),
ErrorCode: errorCode,
}
}
func PostLog(r *http.Request, errorCode ErrorCode) {
if logger == nil {
func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) {
if Logger == nil {
return
}
err := logger.Post(tag, GetAccessLog(r, errorCode))
if err != nil {
glog.Error("Error while posting log: ", err)
if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil {
glog.Warning("Error while posting log: ", err)
}
}
func PostAccessLog(log *AccessLog) {
if Logger == nil || log == nil {
return
}
if err := Logger.Post(tag, *log); err != nil {
glog.Warning("Error while posting log: ", err)
}
}

View file

@ -25,7 +25,7 @@ func WriteXMLResponse(w http.ResponseWriter, r *http.Request, statusCode int, re
func WriteEmptyResponse(w http.ResponseWriter, r *http.Request, statusCode int) {
WriteResponse(w, r, statusCode, []byte{}, mimeNone)
PostLog(r, ErrNone)
PostLog(r, statusCode, ErrNone)
}
func WriteErrorResponse(w http.ResponseWriter, r *http.Request, errorCode ErrorCode) {
@ -40,7 +40,7 @@ func WriteErrorResponse(w http.ResponseWriter, r *http.Request, errorCode ErrorC
errorResponse := getRESTErrorResponse(apiError, r.URL.Path, bucket, object)
encodedErrorResponse := EncodeXMLResponse(errorResponse)
WriteResponse(w, r, apiError.HTTPStatusCode, encodedErrorResponse, MimeXML)
PostLog(r, errorCode)
PostLog(r, apiError.HTTPStatusCode, errorCode)
}
func getRESTErrorResponse(err APIError, resource string, bucket, object string) RESTErrorResponse {