From 09059bfdccdeff1a588ee1326318075adb068b0f Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:19:10 -0600 Subject: [PATCH] Add AutoChunking to the Filer API, so that you can upload really large files through the filer API. --- weed/command/filer.go | 3 + weed/command/server.go | 2 + weed/server/filer_server.go | 2 + weed/server/filer_server_handlers_write.go | 208 +++++++++++++++++++++ 4 files changed, 215 insertions(+) diff --git a/weed/command/filer.go b/weed/command/filer.go index 582d4e9c8..0bd508e0b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,6 +24,7 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool + maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -42,6 +43,7 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -82,6 +84,7 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, diff --git a/weed/command/server.go b/weed/command/server.go index 1211c7137..7a6677a65 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,6 +86,7 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -169,6 +170,7 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b99bbd7c9..c9bc0e021 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,6 +28,7 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer + maxMB int masterNodes *storage.MasterNodes } @@ -43,6 +44,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, + maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..872d8c4b9 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {