seaweedfs/weed/server/filer_server_handlers_write.go

277 lines
8 KiB
Go
Raw Normal View History

2016-06-03 03:05:34 +00:00
package weed_server
import (
2019-03-15 22:26:09 +00:00
"context"
"errors"
2022-02-20 15:30:42 +00:00
"fmt"
2016-06-03 03:05:34 +00:00
"net/http"
2019-02-15 08:09:19 +00:00
"os"
"strings"
2018-05-28 06:59:49 +00:00
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
ErrReadOnly = errors.New("read only")
2016-06-03 03:05:34 +00:00
)
type FilerPostResult struct {
Name string `json:"name,omitempty"`
Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
}
2020-11-16 00:58:48 +00:00
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
2019-06-23 05:53:52 +00:00
stats.FilerRequestCounter.WithLabelValues(stats.ChunkAssign).Inc()
2019-06-23 05:53:52 +00:00
start := time.Now()
defer func() {
stats.FilerRequestHistogram.WithLabelValues(stats.ChunkAssign).Observe(time.Since(start).Seconds())
}()
2019-06-23 05:53:52 +00:00
2020-11-16 00:58:48 +00:00
ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
2016-06-08 07:46:14 +00:00
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
2016-06-08 07:46:14 +00:00
err = ae
return
}
fileId = assignResult.Fid
assignUrl := assignResult.Url
// Prefer same data center
if fs.option.DataCenter != "" {
for _, repl := range assignResult.Replicas {
if repl.DataCenter == fs.option.DataCenter {
assignUrl = repl.Url
break
}
}
}
urlLocation = "http://" + assignUrl + "/" + assignResult.Fid
2020-11-15 22:41:56 +00:00
if so.Fsync {
2020-04-12 06:37:10 +00:00
urlLocation += "?fsync=true"
}
2019-02-15 08:09:19 +00:00
auth = assignResult.Auth
2016-06-08 07:46:14 +00:00
return
}
2021-03-31 04:07:34 +00:00
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
2019-03-15 22:55:34 +00:00
ctx := context.Background()
destination := r.RequestURI
if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" {
destination = finalDestination
}
2016-06-03 03:05:34 +00:00
query := r.URL.Query()
so, err := fs.detectStorageOption0(destination,
2020-11-16 00:58:48 +00:00
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
2020-12-13 20:05:31 +00:00
query.Get("disk"),
query.Get("fsync"),
2020-11-16 00:58:48 +00:00
query.Get("dataCenter"),
query.Get("rack"),
query.Get("dataNode"),
query.Get("saveInside"),
2020-11-16 00:58:48 +00:00
)
if err != nil {
if err == ErrReadOnly {
w.WriteHeader(http.StatusInsufficientStorage)
} else {
glog.V(1).Infoln("post", r.RequestURI, ":", err.Error())
w.WriteHeader(http.StatusInternalServerError)
}
return
}
2020-11-15 22:41:56 +00:00
// When DiskType is empty,use filer's -disk
if so.DiskType == "" {
so.DiskType = fs.option.DiskType
}
if strings.HasPrefix(r.URL.Path, "/etc") {
so.SaveInside = true
}
2022-02-23 23:34:42 +00:00
if query.Has("mv.from") {
2022-02-20 15:30:42 +00:00
fs.move(ctx, w, r, so)
} else {
fs.autoChunk(ctx, w, r, contentLength, so)
}
2020-11-30 10:45:00 +00:00
util.CloseRequest(r)
2019-06-23 05:53:52 +00:00
2016-06-03 03:05:34 +00:00
}
2022-02-20 15:30:42 +00:00
func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
2022-02-23 23:34:42 +00:00
src := r.URL.Query().Get("mv.from")
2022-02-20 15:30:42 +00:00
dst := r.URL.Path
glog.V(2).Infof("FilerServer.move %v to %v", src, dst)
var err error
if src, err = clearName(src); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
if dst, err = clearName(dst); err != nil {
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
src = strings.TrimRight(src, "/")
if src == "" {
err = fmt.Errorf("invalid source '/'")
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
srcPath := util.FullPath(src)
dstPath := util.FullPath(dst)
srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
if err != nil {
err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
oldDir, oldName := srcPath.DirAndName()
newDir, newName := dstPath.DirAndName()
newName = util.Nvl(newName, oldName)
dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/")))
if err != nil && err != filer_pb.ErrNotFound {
err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() {
err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
_, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
OldName: oldName,
NewDirectory: newDir,
NewName: newName,
})
if err != nil {
err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err)
writeJsonError(w, r, http.StatusBadRequest, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
2016-06-03 03:05:34 +00:00
// curl -X DELETE http://localhost:8888/path/to
2018-07-25 05:33:26 +00:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
2019-12-11 22:58:22 +00:00
// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
2016-06-03 03:05:34 +00:00
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
2018-05-14 06:56:16 +00:00
2018-07-25 05:33:26 +00:00
isRecursive := r.FormValue("recursive") == "true"
if !isRecursive && fs.option.recursiveDelete {
if r.FormValue("recursive") != "false" {
isRecursive = true
}
}
ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
2019-12-11 22:58:22 +00:00
skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
2018-07-25 05:33:26 +00:00
objectPath := r.URL.Path
if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
objectPath = objectPath[0 : len(objectPath)-1]
}
err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
2018-05-14 06:56:16 +00:00
if err != nil {
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
httpStatus := http.StatusInternalServerError
2020-03-08 01:01:39 +00:00
if err == filer_pb.ErrNotFound {
httpStatus = http.StatusNoContent
writeJsonQuiet(w, r, httpStatus, nil)
return
}
writeJsonError(w, r, httpStatus, err)
2018-05-14 06:56:16 +00:00
return
}
2018-07-22 01:49:47 +00:00
w.WriteHeader(http.StatusNoContent)
2016-06-03 03:05:34 +00:00
}
func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
2020-11-16 00:58:48 +00:00
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
if rule.ReadOnly {
return nil, ErrReadOnly
}
// required by buckets folder
bucketDefaultCollection := ""
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketDefaultCollection = fs.filer.DetectBucket(util.FullPath(requestURI))
}
2020-11-16 00:58:48 +00:00
if ttlSeconds == 0 {
ttl, err := needle.ReadTTL(rule.GetTtl())
if err != nil {
glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
}
2020-11-16 00:59:28 +00:00
ttlSeconds = int32(ttl.Minutes()) * 60
2020-11-16 00:58:48 +00:00
}
return &operation.StorageOption{
Replication: util.Nvl(qReplication, rule.Replication, fs.option.DefaultReplication),
Collection: util.Nvl(qCollection, rule.Collection, bucketDefaultCollection, fs.option.Collection),
DataCenter: util.Nvl(dataCenter, rule.DataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, rule.Rack, fs.option.Rack),
DataNode: util.Nvl(dataNode, rule.DataNode, fs.option.DataNode),
TtlSeconds: ttlSeconds,
2020-12-16 17:10:14 +00:00
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}, nil
2020-11-16 00:58:48 +00:00
}
func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
2020-11-16 00:58:48 +00:00
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
if so != nil {
2022-05-19 15:17:17 +00:00
if fsync == "false" {
so.Fsync = false
} else if fsync == "true" {
so.Fsync = true
}
if saveInside == "true" {
so.SaveInside = true
} else {
so.SaveInside = false
}
}
return so, err
}