From b6ce40e87f11c1f85f6285345edd701a4508ca4b Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:16:17 -0600 Subject: [PATCH] Add AutoChunking to the Filer API, so that you can upload really large files through the filer API. --- weed/server/filer_server_handlers_write.go | 208 +++++++++++++++++++++ 1 file changed, 208 insertions(+) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..872d8c4b9 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {