From 87fee21ef597a8b1bac5352d1327c13f87eeb000 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:14:24 -0600 Subject: [PATCH 01/10] Changing needle_byte_cache so that it doesn't grow so big when larger files are added. --- weed/storage/needle_byte_cache.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go index ae35a48ba..930ead81d 100644 --- a/weed/storage/needle_byte_cache.go +++ b/weed/storage/needle_byte_cache.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/golang-lru" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -24,7 +25,7 @@ In caching, the string~[]byte mapping is cached */ func init() { bytesPool = util.NewBytesPool() - bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { + bytesCache, _ = lru.NewWithEvict(50, func(key interface{}, value interface{}) { value.(*Block).decreaseReference() }) } @@ -46,22 +47,37 @@ func (block *Block) increaseReference() { // get bytes from the LRU cache of []byte first, then from the bytes pool // when []byte in LRU cache is evicted, it will be put back to the bytes pool func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { + //Skip the cache if we are looking for a block that is too big to fit in the cache (defaulting to 10MB) + cacheable := readSize <= (1024*1024*10) + if !cacheable { + glog.V(4).Infoln("Block too big to keep in cache. Size:", readSize) + } + cacheKey := string("") + if cacheable { // check cache, return if found - cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) + cacheKey = fmt.Sprintf("%d:%d:%d", r.Fd(), offset >> 3, readSize) if obj, found := bytesCache.Get(cacheKey); found { + glog.V(4).Infoln("Found block in cache. Size:", readSize) block = obj.(*Block) block.increaseReference() dataSlice = block.Bytes[0:readSize] return dataSlice, block, nil + } } // get the []byte from pool b := bytesPool.Get(readSize) // refCount = 2, one by the bytesCache, one by the actual needle object - block = &Block{Bytes: b, refCount: 2} + refCount := int32(1) + if cacheable { + refCount = 2 + } + block = &Block{Bytes: b, refCount: refCount} dataSlice = block.Bytes[0:readSize] _, err = r.ReadAt(dataSlice, offset) + if cacheable { bytesCache.Add(cacheKey, block) + } return dataSlice, block, err } From b6ce40e87f11c1f85f6285345edd701a4508ca4b Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:16:17 -0600 Subject: [PATCH 02/10] Add AutoChunking to the Filer API, so that you can upload really large files through the filer API. --- weed/server/filer_server_handlers_write.go | 208 +++++++++++++++++++++ 1 file changed, 208 insertions(+) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..872d8c4b9 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { From 09059bfdccdeff1a588ee1326318075adb068b0f Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:19:10 -0600 Subject: [PATCH 03/10] Add AutoChunking to the Filer API, so that you can upload really large files through the filer API. --- weed/command/filer.go | 3 + weed/command/server.go | 2 + weed/server/filer_server.go | 2 + weed/server/filer_server_handlers_write.go | 208 +++++++++++++++++++++ 4 files changed, 215 insertions(+) diff --git a/weed/command/filer.go b/weed/command/filer.go index 582d4e9c8..0bd508e0b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,6 +24,7 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool + maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -42,6 +43,7 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -82,6 +84,7 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, diff --git a/weed/command/server.go b/weed/command/server.go index 1211c7137..7a6677a65 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,6 +86,7 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -169,6 +170,7 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b99bbd7c9..c9bc0e021 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,6 +28,7 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer + maxMB int masterNodes *storage.MasterNodes } @@ -43,6 +44,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, + maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..872d8c4b9 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { From 14d4252904ed0fad8a7d6d6156a70fcbc3eda12c Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:22:46 -0600 Subject: [PATCH 04/10] Ooops. Missed a line. --- weed/server/filer_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9bc0e021..3c7c1fd9e 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -34,6 +34,7 @@ type FilerServer struct { func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, replication string, redirectOnRead bool, disableDirListing bool, + maxMB int, secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, From 34837afc7adb8ea6955d5cf962af10f8f30fb476 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:23:43 -0600 Subject: [PATCH 05/10] Adding HTTP verb whitelisting options. --- weed/command/filer.go | 74 ++++++++++++++++++++ weed/command/master.go | 15 ++-- weed/command/server.go | 77 +++++++++++++++++++-- weed/command/volume.go | 16 +++-- weed/security/guard.go | 48 +++++++++++-- weed/server/filer_server.go | 13 ++++ weed/server/filer_server_handlers.go | 10 +-- weed/server/master_server.go | 40 ++++++----- weed/server/master_server_handlers_admin.go | 2 +- weed/server/volume_server.go | 57 +++++++++------ weed/server/volume_server_handlers.go | 10 +-- 11 files changed, 290 insertions(+), 72 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index 0bd508e0b..f58e38403 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" + "strings" ) var ( @@ -31,6 +32,31 @@ type FilerOptions struct { redis_server *string redis_password *string redis_database *int + get_ip_whitelist_option *string + get_root_whitelist_option *string + head_ip_whitelist_option *string + head_root_whitelist_option *string + delete_ip_whitelist_option *string + delete_root_whitelist_option *string + put_ip_whitelist_option *string + put_root_whitelist_option *string + post_ip_whitelist_option *string + post_root_whitelist_option *string + get_secure_key *string + head_secure_key *string + delete_secure_key *string + put_secure_key *string + post_secure_key *string + get_ip_whitelist []string + get_root_whitelist []string + head_ip_whitelist []string + head_root_whitelist []string + delete_ip_whitelist []string + delete_root_whitelist []string + put_ip_whitelist []string + put_root_whitelist []string + post_ip_whitelist []string + post_root_whitelist []string } func init() { @@ -50,6 +76,21 @@ func init() { f.redis_password = cmdFiler.Flag.String("redis.password", "", "password in clear text") f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + f.get_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.get", "", "comma separated Ip addresses having get permission. No limit if empty.") + f.get_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.get", "", "comma separated root paths having get permission. No limit if empty.") + f.head_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.head", "", "comma separated Ip addresses having head permission. No limit if empty.") + f.head_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.head", "", "comma separated root paths having head permission. No limit if empty.") + f.delete_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.delete", "", "comma separated Ip addresses having delete permission. No limit if empty.") + f.delete_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.delete", "", "comma separated root paths having delete permission. No limit if empty.") + f.put_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.put", "", "comma separated Ip addresses having put permission. No limit if empty.") + f.put_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.put", "", "comma separated root paths having put permission. No limit if empty.") + f.post_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.post", "", "comma separated Ip addresses having post permission. No limit if empty.") + f.post_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.post", "", "comma separated root paths having post permission. No limit if empty.") + f.get_secure_key = cmdFiler.Flag.String("secure.secret.get", "", "secret to encrypt Json Web Token(JWT)") + f.head_secure_key = cmdFiler.Flag.String("secure.secret.head", "", "secret to encrypt Json Web Token(JWT)") + f.delete_secure_key = cmdFiler.Flag.String("secure.secret.delete", "", "secret to encrypt Json Web Token(JWT)") + f.put_secure_key = cmdFiler.Flag.String("secure.secret.put", "", "secret to encrypt Json Web Token(JWT)") + f.post_secure_key = cmdFiler.Flag.String("secure.secret.post", "", "secret to encrypt Json Web Token(JWT)") } @@ -81,6 +122,36 @@ func runFiler(cmd *Command, args []string) bool { glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err) } + if *f.get_ip_whitelist_option != "" { + f.get_ip_whitelist = strings.Split(*f.get_ip_whitelist_option, ",") + } + if *f.get_root_whitelist_option != "" { + f.get_root_whitelist = strings.Split(*f.get_root_whitelist_option, ",") + } + if *f.head_ip_whitelist_option != "" { + f.head_ip_whitelist = strings.Split(*f.head_ip_whitelist_option, ",") + } + if *f.head_root_whitelist_option != "" { + f.head_root_whitelist = strings.Split(*f.head_root_whitelist_option, ",") + } + if *f.delete_ip_whitelist_option != "" { + f.delete_ip_whitelist = strings.Split(*f.delete_ip_whitelist_option, ",") + } + if *f.delete_root_whitelist_option != "" { + f.delete_root_whitelist = strings.Split(*f.delete_root_whitelist_option, ",") + } + if *f.put_ip_whitelist_option != "" { + f.put_ip_whitelist = strings.Split(*f.put_ip_whitelist_option, ",") + } + if *f.put_root_whitelist_option != "" { + f.put_root_whitelist = strings.Split(*f.put_root_whitelist_option, ",") + } + if *f.post_ip_whitelist_option != "" { + f.post_ip_whitelist = strings.Split(*f.post_ip_whitelist_option, ",") + } + if *f.post_root_whitelist_option != "" { + f.post_root_whitelist = strings.Split(*f.post_root_whitelist_option, ",") + } r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, @@ -88,6 +159,9 @@ func runFiler(cmd *Command, args []string) bool { *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, + f.get_ip_whitelist, f.head_ip_whitelist, f.delete_ip_whitelist, f.put_ip_whitelist, f.post_ip_whitelist, + f.get_root_whitelist, f.head_root_whitelist, f.delete_root_whitelist, f.put_root_whitelist, f.post_root_whitelist, + *f.get_secure_key, *f.head_secure_key, *f.delete_secure_key, *f.put_secure_key, *f.post_secure_key, ) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/master.go b/weed/command/master.go index cd15defce..f140750ea 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -41,11 +41,13 @@ var ( mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") - masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + masterReadWhiteListOption = cmdMaster.Flag.String("readWhiteList", "", "comma separated Ip addresses having read permission. No limit if empty.") + masterWriteWhiteListOption = cmdMaster.Flag.String("writeWhiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") - masterWhiteList []string + masterReadWhiteList []string + masterWriteWhiteList []string ) func runMaster(cmd *Command, args []string) bool { @@ -67,14 +69,17 @@ func runMaster(cmd *Command, args []string) bool { if err := util.TestFolderWritable(*metaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) } - if *masterWhiteListOption != "" { - masterWhiteList = strings.Split(*masterWhiteListOption, ",") + if *masterReadWhiteListOption != "" { + masterReadWhiteList = strings.Split(*masterReadWhiteListOption, ",") + } + if *masterWriteWhiteListOption != "" { + masterWriteWhiteList = strings.Split(*masterWriteWhiteListOption, ",") } r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *mport, *metaFolder, *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, - masterWhiteList, *masterSecureKey, + masterReadWhiteList, masterWriteWhiteList, nil, *masterSecureKey, ) listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) diff --git a/weed/command/server.go b/weed/command/server.go index 7a6677a65..9a19ef2af 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -54,7 +54,8 @@ var ( serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") - serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + serverReadWhiteListOption = cmdServer.Flag.String("read.whitelist", "", "comma separated Ip addresses having read permission. No limit if empty.") + serverWriteWhiteListOption = cmdServer.Flag.String("write.whitelist", "", "comma separated Ip addresses having write permission. No limit if empty.") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") @@ -74,7 +75,8 @@ var ( volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") - serverWhiteList []string + serverReadWhiteList []string + serverWriteWhiteList []string ) func init() { @@ -82,7 +84,7 @@ func init() { filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") - filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified") + filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") @@ -92,6 +94,21 @@ func init() { filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") filerOptions.redis_password = cmdServer.Flag.String("filer.redis.password", "", "redis password in clear text") filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") + filerOptions.get_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.get", "", "comma separated Ip addresses having filer GET permission. No limit if empty.") + filerOptions.get_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.get", "", "comma separated root paths having filer GET permission. No limit if empty.") + filerOptions.head_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.head", "", "comma separated Ip addresses having filer HEAD permission. No limit if empty.") + filerOptions.head_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.head", "", "comma separated root paths having filer HEAD permission. No limit if empty.") + filerOptions.delete_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.delete", "", "comma separated Ip addresses having filer DELETE permission. No limit if empty.") + filerOptions.delete_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.delete", "", "comma separated root paths having filer DELETE permission. No limit if empty.") + filerOptions.put_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.put", "", "comma separated Ip addresses having filer PUT permission. No limit if empty.") + filerOptions.put_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.put", "", "comma separated root paths having filer PUT permission. No limit if empty.") + filerOptions.post_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.post", "", "comma separated Ip addresses having filer POST permission. No limit if empty.") + filerOptions.post_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.post", "", "comma separated root paths having filer POST permission. No limit if empty.") + filerOptions.get_secure_key = cmdServer.Flag.String("filer.secure.secret.get", "", "secret to encrypt Json Web Token(JWT)") + filerOptions.head_secure_key = cmdServer.Flag.String("filer.secure.secret.head", "", "secret to encrypt Json Web Token(JWT)") + filerOptions.delete_secure_key = cmdServer.Flag.String("filer.secure.secret.delete", "", "secret to encrypt Json Web Token(JWT)") + filerOptions.put_secure_key = cmdServer.Flag.String("filer.secure.secret.put", "", "secret to encrypt Json Web Token(JWT)") + filerOptions.post_secure_key = cmdServer.Flag.String("filer.secure.secret.post", "", "secret to encrypt Json Web Token(JWT)") } func runServer(cmd *Command, args []string) bool { @@ -154,13 +171,56 @@ func runServer(cmd *Command, args []string) bool { if err := util.TestFolderWritable(*filerOptions.dir); err != nil { glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) } + if *filerOptions.get_ip_whitelist_option != "" { + glog.V(0).Infof("Filer GET IP whitelist: %s", *filerOptions.get_ip_whitelist_option) + filerOptions.get_ip_whitelist = strings.Split(*filerOptions.get_ip_whitelist_option, ",") + } + if *filerOptions.get_root_whitelist_option != "" { + glog.V(0).Infof("Filer GET root whitelist: %s", *filerOptions.get_root_whitelist_option) + filerOptions.get_root_whitelist = strings.Split(*filerOptions.get_root_whitelist_option, ",") + } + if *filerOptions.head_ip_whitelist_option != "" { + glog.V(0).Infof("Filer HEAD IP whitelist: %s", *filerOptions.head_ip_whitelist_option) + filerOptions.head_ip_whitelist = strings.Split(*filerOptions.head_ip_whitelist_option, ",") + } + if *filerOptions.head_root_whitelist_option != "" { + glog.V(0).Infof("Filer HEAD root whitelist: %s", *filerOptions.head_root_whitelist_option) + filerOptions.head_root_whitelist = strings.Split(*filerOptions.head_root_whitelist_option, ",") + } + if *filerOptions.delete_ip_whitelist_option != "" { + glog.V(0).Infof("Filer DELETE IP whitelist: %s", *filerOptions.delete_ip_whitelist_option) + filerOptions.delete_ip_whitelist = strings.Split(*filerOptions.delete_ip_whitelist_option, ",") + } + if *filerOptions.delete_root_whitelist_option != "" { + glog.V(0).Infof("Filer DELETE root whitelist: %s", *filerOptions.delete_root_whitelist_option) + filerOptions.delete_root_whitelist = strings.Split(*filerOptions.delete_root_whitelist_option, ",") + } + if *filerOptions.put_ip_whitelist_option != "" { + glog.V(0).Infof("Filer PUT IP whitelist: %s", *filerOptions.put_ip_whitelist_option) + filerOptions.put_ip_whitelist = strings.Split(*filerOptions.put_ip_whitelist_option, ",") + } + if *filerOptions.put_root_whitelist_option != "" { + glog.V(0).Infof("Filer PUT root whitelist: %s", *filerOptions.put_root_whitelist_option) + filerOptions.put_root_whitelist = strings.Split(*filerOptions.put_root_whitelist_option, ",") + } + if *filerOptions.post_ip_whitelist_option != "" { + glog.V(0).Infof("Filer POST IP whitelist: %s", *filerOptions.post_ip_whitelist_option) + filerOptions.post_ip_whitelist = strings.Split(*filerOptions.post_ip_whitelist_option, ",") + } + if *filerOptions.post_root_whitelist_option != "" { + glog.V(0).Infof("Filer POST root whitelist: %s", *filerOptions.post_root_whitelist_option) + filerOptions.post_root_whitelist = strings.Split(*filerOptions.post_root_whitelist_option, ",") + } } if err := util.TestFolderWritable(*masterMetaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) } - if *serverWhiteListOption != "" { - serverWhiteList = strings.Split(*serverWhiteListOption, ",") + if *serverReadWhiteListOption != "" { + serverReadWhiteList = strings.Split(*serverReadWhiteListOption, ",") + } + if *serverWriteWhiteListOption != "" { + serverWriteWhiteList = strings.Split(*serverWriteWhiteListOption, ",") } if *isStartingFiler { @@ -174,6 +234,9 @@ func runServer(cmd *Command, args []string) bool { *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, + filerOptions.get_ip_whitelist, filerOptions.head_ip_whitelist, filerOptions.delete_ip_whitelist, filerOptions.put_ip_whitelist, filerOptions.post_ip_whitelist, + filerOptions.get_root_whitelist, filerOptions.head_root_whitelist, filerOptions.delete_root_whitelist, filerOptions.put_root_whitelist, filerOptions.post_root_whitelist, + *f.get_secure_key, *f.head_secure_key, *f.delete_secure_key, *f.put_secure_key, *f.post_secure_key, ) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) @@ -202,7 +265,7 @@ func runServer(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, - serverWhiteList, *serverSecureKey, + serverReadWhiteList, serverWriteWhiteList, nil, *serverSecureKey, ) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) @@ -256,7 +319,7 @@ func runServer(cmd *Command, args []string) bool { folders, maxCounts, volumeNeedleMapKind, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, - serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, + serverReadWhiteList, serverWriteWhiteList, nil, *volumeFixJpgOrientation, *volumeReadRedirect, ) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) diff --git a/weed/command/volume.go b/weed/command/volume.go index 21369cbe9..68f5edd9e 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -2,6 +2,7 @@ package command import ( "net/http" + _ "net/http/pprof" "os" "runtime" "strconv" @@ -32,7 +33,8 @@ type VolumeServerOptions struct { maxCpu *int dataCenter *string rack *string - whiteList []string + readWhitelist []string + writeWhitelist []string indexType *string fixJpgOrientation *bool readRedirect *bool @@ -67,7 +69,8 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") - volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + volumeReadWhiteListOption = cmdVolume.Flag.String("read.whitelist", "", "comma separated Ip addresses having read permission. No limit if empty.") + volumeWriteWhiteListOption = cmdVolume.Flag.String("write.whitelist", "", "comma separated Ip addresses having write permission. No limit if empty.") ) func runVolume(cmd *Command, args []string) bool { @@ -96,8 +99,11 @@ func runVolume(cmd *Command, args []string) bool { } //security related white list configuration - if *volumeWhiteListOption != "" { - v.whiteList = strings.Split(*volumeWhiteListOption, ",") + if *volumeReadWhiteListOption != "" { + v.readWhitelist = strings.Split(*volumeReadWhiteListOption, ",") + } + if *volumeWriteWhiteListOption != "" { + v.writeWhitelist = strings.Split(*volumeWriteWhiteListOption, ",") } if *v.ip == "" { @@ -130,7 +136,7 @@ func runVolume(cmd *Command, args []string) bool { v.folders, v.folderMaxLimits, volumeNeedleMapKind, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, - v.whiteList, + v.readWhitelist, v.writeWhitelist, nil, *v.fixJpgOrientation, *v.readRedirect, ) diff --git a/weed/security/guard.go b/weed/security/guard.go index dea3b12f2..6292c67c9 100644 --- a/weed/security/guard.go +++ b/weed/security/guard.go @@ -41,17 +41,31 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go */ type Guard struct { - whiteList []string + ipWhiteList []string + rootWhiteList []string SecretKey Secret isActive bool } -func NewGuard(whiteList []string, secretKey string) *Guard { - g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)} - g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0 +func NewGuard(ipWhiteList []string, rootWhiteList []string, secretKey string) *Guard { + g := &Guard{ipWhiteList: ipWhiteList, rootWhiteList: rootWhiteList, SecretKey: Secret(secretKey)} + g.isActive = len(g.ipWhiteList) != 0 || len(g.SecretKey) != 0 return g } +func (g *Guard) WhiteList2(f func(w http.ResponseWriter, r *http.Request, b bool)) func(w http.ResponseWriter, r *http.Request, b bool) { + if !g.isActive { + //if no security needed, just skip all checkings + return f + } + return func(w http.ResponseWriter, r *http.Request, b bool) { + if err := g.checkWhiteList(w, r); err != nil { + w.WriteHeader(http.StatusUnauthorized) + return + } + f(w, r, b) + } +} func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { if !g.isActive { @@ -96,13 +110,14 @@ func GetActualRemoteHost(r *http.Request) (host string, err error) { } func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { - if len(g.whiteList) == 0 { + if len(g.ipWhiteList) == 0 { + glog.V(0).Info("No whitelist specified for operation") return nil } host, err := GetActualRemoteHost(r) if err == nil { - for _, ip := range g.whiteList { + for _, ip := range g.ipWhiteList { // If the whitelist entry contains a "/" it // is a CIDR range, and we should check the @@ -114,6 +129,7 @@ func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { } remote := net.ParseIP(host) if cidrnet.Contains(remote) { + glog.V(0).Infof("Found %s in CIDR whitelist.", r.RemoteAddr) return nil } } @@ -122,8 +138,28 @@ func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { // Otherwise we're looking for a literal match. // if ip == host { + glog.V(0).Infof("Found %s in whitelist.", r.RemoteAddr) return nil } + // ::1 is the same as 127.0.0.1 and localhost + if host == "::1" && (ip == "127.0.0.1" || ip == "localhost") { + glog.V(0).Infof("Found %s (localhost) in whitelist.", r.RemoteAddr) + return nil + } + } + } + // The root whitelist allows exceptions to the IP whitelist, but only by certain root paths in the request. + if len(g.rootWhiteList) > 0 { + pathParts := strings.Split(r.RequestURI, "/") + if len(pathParts) > 0 { + requestedRoot := pathParts[1] + for _, root := range g.rootWhiteList { + if root == requestedRoot { + glog.V(0).Infof("Found %s in root whitelist.", requestedRoot) + return nil + } + } + glog.V(0).Infof("Not in root whitelist: %s", requestedRoot) } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3c7c1fd9e..2a432af65 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -30,6 +30,11 @@ type FilerServer struct { filer filer.Filer maxMB int masterNodes *storage.MasterNodes + get_guard *security.Guard + head_guard *security.Guard + delete_guard *security.Guard + put_guard *security.Guard + post_guard *security.Guard } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, @@ -38,6 +43,9 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, + get_ip_whitelist []string, head_ip_whitelist []string, delete_ip_whitelist []string, put_ip_whitelist []string, post_ip_whitelist []string, + get_root_whitelist []string, head_root_whitelist []string, delete_root_whitelist []string, put_root_whitelist []string, post_root_whitelist []string, + get_secure_key string, head_secure_key string, delete_secure_key string, put_secure_key string, post_secure_key string, ) (fs *FilerServer, err error) { fs = &FilerServer{ master: master, @@ -46,6 +54,11 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, maxMB: maxMB, + get_guard: security.NewGuard(get_ip_whitelist, get_root_whitelist, get_secure_key), + head_guard: security.NewGuard(head_ip_whitelist, head_root_whitelist, head_secure_key), + delete_guard: security.NewGuard(delete_ip_whitelist, delete_root_whitelist, delete_secure_key), + put_guard: security.NewGuard(put_ip_whitelist, put_root_whitelist, put_secure_key), + post_guard: security.NewGuard(post_ip_whitelist, post_root_whitelist, post_secure_key), port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 4e39258af..4db170590 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -7,14 +7,14 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": - fs.GetOrHeadHandler(w, r, true) + fs.get_guard.WhiteList2(fs.GetOrHeadHandler)(w, r, true) case "HEAD": - fs.GetOrHeadHandler(w, r, false) + fs.head_guard.WhiteList2(fs.GetOrHeadHandler)(w, r, false) case "DELETE": - fs.DeleteHandler(w, r) + fs.delete_guard.WhiteList(fs.DeleteHandler)(w, r) case "PUT": - fs.PostHandler(w, r) + fs.put_guard.WhiteList(fs.PostHandler)(w, r) case "POST": - fs.PostHandler(w, r) + fs.post_guard.WhiteList(fs.PostHandler)(w, r) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 61bda6988..3bd77c819 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -23,7 +23,8 @@ type MasterServer struct { pulseSeconds int defaultReplicaPlacement string garbageThreshold string - guard *security.Guard + read_guard *security.Guard + write_guard *security.Guard Topo *topology.Topology vg *topology.VolumeGrowth @@ -38,7 +39,9 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, confFile string, defaultReplicaPlacement string, garbageThreshold string, - whiteList []string, + ipReadWhiteList []string, + ipWriteWhiteList []string, + rootWhiteList []string, secureKey string, ) *MasterServer { ms := &MasterServer{ @@ -58,24 +61,25 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - ms.guard = security.NewGuard(whiteList, secureKey) + ms.read_guard = security.NewGuard(ipReadWhiteList, rootWhiteList, secureKey) + ms.write_guard = security.NewGuard(ipWriteWhiteList, rootWhiteList, secureKey) r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + r.HandleFunc("/ui/index.html", ms.read_guard.WhiteList(ms.uiStatusHandler)) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.write_guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.read_guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(ms.write_guard.WhiteList(ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.read_guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.write_guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.read_guard.WhiteList(ms.volumeLookupHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.write_guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.read_guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.write_guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.write_guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", ms.write_guard.WhiteList(ms.deleteFromMasterServerHandler)) + r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.read_guard.WhiteList(ms.redirectHandler))) + r.HandleFunc("/stats/counter", ms.read_guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.read_guard.WhiteList(statsMemoryHandler)) ms.Topo.StartRefreshWritableVolumes(garbageThreshold) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index a762bf416..385290079 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -61,7 +61,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { ms.Topo.ProcessJoinMessage(joinMessage) writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), + SecretKey: string(ms.write_guard.SecretKey), }) } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 79a4276b1..a40eeb9c0 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + //"net/http/pprof" ) type VolumeServer struct { @@ -18,7 +19,8 @@ type VolumeServer struct { dataCenter string rack string store *storage.Store - guard *security.Guard + read_guard *security.Guard + write_guard *security.Guard needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -31,7 +33,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, needleMapKind storage.NeedleMapType, masterNode string, pulseSeconds int, dataCenter string, rack string, - whiteList []string, + ipReadWhiteList []string, + ipWriteWhiteList []string, + rootWhiteList []string, fixJpgOrientation bool, readRedirect bool) *VolumeServer { vs := &VolumeServer{ @@ -45,28 +49,40 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - vs.guard = security.NewGuard(whiteList, "") + vs.read_guard = security.NewGuard(ipReadWhiteList, rootWhiteList, "") + vs.write_guard = security.NewGuard(ipWriteWhiteList, rootWhiteList, "") - adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler)) - adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) - adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) - adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) - adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) - adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) - adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) - adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler)) - adminMux.HandleFunc("/", vs.privateStoreHandler) + adminMux.HandleFunc("/ui/index.html", vs.read_guard.WhiteList(vs.uiStatusHandler)) + adminMux.HandleFunc("/status", vs.read_guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.write_guard.WhiteList(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum/check", vs.write_guard.WhiteList(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum/compact", vs.write_guard.WhiteList(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum/commit", vs.write_guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.write_guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/admin/sync/status", vs.read_guard.WhiteList(vs.getVolumeSyncStatusHandler)) + adminMux.HandleFunc("/admin/sync/index", vs.write_guard.WhiteList(vs.getVolumeIndexContentHandler)) + adminMux.HandleFunc("/admin/sync/data", vs.write_guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/stats/counter", vs.read_guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.read_guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.read_guard.WhiteList(vs.statsDiskHandler)) + adminMux.HandleFunc("/delete", vs.write_guard.WhiteList(vs.batchDeleteHandler)) + adminMux.HandleFunc("/", vs.read_guard.WhiteList(vs.privateStoreHandler)) if publicMux != adminMux { // separated admin and public port publicMux.HandleFunc("/favicon.ico", vs.faviconHandler) publicMux.HandleFunc("/", vs.publicReadOnlyHandler) } + /* + // add in profiling support + adminMux.HandleFunc("/debug/pprof/", pprof.Index) + adminMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + adminMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + adminMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + adminMux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + adminMux.Handle("/debug/pprof/heap", pprof.Handler("heap")) + adminMux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + adminMux.Handle("/debug/pprof/block", pprof.Handler("block")) + */ go func() { connected := true @@ -82,7 +98,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, if !connected { connected = true vs.SetMasterNode(master) - vs.guard.SecretKey = secretKey + vs.read_guard.SecretKey = secretKey + vs.write_guard.SecretKey = secretKey glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { @@ -121,5 +138,5 @@ func (vs *VolumeServer) Shutdown() { } func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.guard.SecretKey, fileId) + return security.GenJwt(vs.read_guard.SecretKey, fileId) } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 2d6fe7849..f223af6c0 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -25,19 +25,19 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET": stats.ReadRequest() - vs.GetOrHeadHandler(w, r) + vs.write_guard.WhiteList(vs.GetOrHeadHandler)(w, r) case "HEAD": stats.ReadRequest() - vs.GetOrHeadHandler(w, r) + vs.write_guard.WhiteList(vs.GetOrHeadHandler)(w, r) case "DELETE": stats.DeleteRequest() - vs.guard.WhiteList(vs.DeleteHandler)(w, r) + vs.write_guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT": stats.WriteRequest() - vs.guard.WhiteList(vs.PostHandler)(w, r) + vs.write_guard.WhiteList(vs.PostHandler)(w, r) case "POST": stats.WriteRequest() - vs.guard.WhiteList(vs.PostHandler)(w, r) + vs.write_guard.WhiteList(vs.PostHandler)(w, r) } } From ce99bb927d163707e83de6a265ce9b77dd4f9d44 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:45:48 -0600 Subject: [PATCH 06/10] Revert "Adding HTTP verb whitelisting options." This reverts commit 34837afc7adb8ea6955d5cf962af10f8f30fb476. --- weed/command/filer.go | 74 -------------------- weed/command/master.go | 15 ++-- weed/command/server.go | 77 ++------------------- weed/command/volume.go | 16 ++--- weed/security/guard.go | 48 ++----------- weed/server/filer_server.go | 13 ---- weed/server/filer_server_handlers.go | 10 +-- weed/server/master_server.go | 40 +++++------ weed/server/master_server_handlers_admin.go | 2 +- weed/server/volume_server.go | 57 ++++++--------- weed/server/volume_server_handlers.go | 10 +-- 11 files changed, 72 insertions(+), 290 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index f58e38403..0bd508e0b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "strings" ) var ( @@ -32,31 +31,6 @@ type FilerOptions struct { redis_server *string redis_password *string redis_database *int - get_ip_whitelist_option *string - get_root_whitelist_option *string - head_ip_whitelist_option *string - head_root_whitelist_option *string - delete_ip_whitelist_option *string - delete_root_whitelist_option *string - put_ip_whitelist_option *string - put_root_whitelist_option *string - post_ip_whitelist_option *string - post_root_whitelist_option *string - get_secure_key *string - head_secure_key *string - delete_secure_key *string - put_secure_key *string - post_secure_key *string - get_ip_whitelist []string - get_root_whitelist []string - head_ip_whitelist []string - head_root_whitelist []string - delete_ip_whitelist []string - delete_root_whitelist []string - put_ip_whitelist []string - put_root_whitelist []string - post_ip_whitelist []string - post_root_whitelist []string } func init() { @@ -76,21 +50,6 @@ func init() { f.redis_password = cmdFiler.Flag.String("redis.password", "", "password in clear text") f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") - f.get_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.get", "", "comma separated Ip addresses having get permission. No limit if empty.") - f.get_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.get", "", "comma separated root paths having get permission. No limit if empty.") - f.head_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.head", "", "comma separated Ip addresses having head permission. No limit if empty.") - f.head_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.head", "", "comma separated root paths having head permission. No limit if empty.") - f.delete_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.delete", "", "comma separated Ip addresses having delete permission. No limit if empty.") - f.delete_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.delete", "", "comma separated root paths having delete permission. No limit if empty.") - f.put_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.put", "", "comma separated Ip addresses having put permission. No limit if empty.") - f.put_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.put", "", "comma separated root paths having put permission. No limit if empty.") - f.post_ip_whitelist_option = cmdFiler.Flag.String("whitelist.ip.post", "", "comma separated Ip addresses having post permission. No limit if empty.") - f.post_root_whitelist_option = cmdFiler.Flag.String("whitelist.root.post", "", "comma separated root paths having post permission. No limit if empty.") - f.get_secure_key = cmdFiler.Flag.String("secure.secret.get", "", "secret to encrypt Json Web Token(JWT)") - f.head_secure_key = cmdFiler.Flag.String("secure.secret.head", "", "secret to encrypt Json Web Token(JWT)") - f.delete_secure_key = cmdFiler.Flag.String("secure.secret.delete", "", "secret to encrypt Json Web Token(JWT)") - f.put_secure_key = cmdFiler.Flag.String("secure.secret.put", "", "secret to encrypt Json Web Token(JWT)") - f.post_secure_key = cmdFiler.Flag.String("secure.secret.post", "", "secret to encrypt Json Web Token(JWT)") } @@ -122,36 +81,6 @@ func runFiler(cmd *Command, args []string) bool { glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err) } - if *f.get_ip_whitelist_option != "" { - f.get_ip_whitelist = strings.Split(*f.get_ip_whitelist_option, ",") - } - if *f.get_root_whitelist_option != "" { - f.get_root_whitelist = strings.Split(*f.get_root_whitelist_option, ",") - } - if *f.head_ip_whitelist_option != "" { - f.head_ip_whitelist = strings.Split(*f.head_ip_whitelist_option, ",") - } - if *f.head_root_whitelist_option != "" { - f.head_root_whitelist = strings.Split(*f.head_root_whitelist_option, ",") - } - if *f.delete_ip_whitelist_option != "" { - f.delete_ip_whitelist = strings.Split(*f.delete_ip_whitelist_option, ",") - } - if *f.delete_root_whitelist_option != "" { - f.delete_root_whitelist = strings.Split(*f.delete_root_whitelist_option, ",") - } - if *f.put_ip_whitelist_option != "" { - f.put_ip_whitelist = strings.Split(*f.put_ip_whitelist_option, ",") - } - if *f.put_root_whitelist_option != "" { - f.put_root_whitelist = strings.Split(*f.put_root_whitelist_option, ",") - } - if *f.post_ip_whitelist_option != "" { - f.post_ip_whitelist = strings.Split(*f.post_ip_whitelist_option, ",") - } - if *f.post_root_whitelist_option != "" { - f.post_root_whitelist = strings.Split(*f.post_root_whitelist_option, ",") - } r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, @@ -159,9 +88,6 @@ func runFiler(cmd *Command, args []string) bool { *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, - f.get_ip_whitelist, f.head_ip_whitelist, f.delete_ip_whitelist, f.put_ip_whitelist, f.post_ip_whitelist, - f.get_root_whitelist, f.head_root_whitelist, f.delete_root_whitelist, f.put_root_whitelist, f.post_root_whitelist, - *f.get_secure_key, *f.head_secure_key, *f.delete_secure_key, *f.put_secure_key, *f.post_secure_key, ) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/master.go b/weed/command/master.go index f140750ea..cd15defce 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -41,13 +41,11 @@ var ( mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds") mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") - masterReadWhiteListOption = cmdMaster.Flag.String("readWhiteList", "", "comma separated Ip addresses having read permission. No limit if empty.") - masterWriteWhiteListOption = cmdMaster.Flag.String("writeWhiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") masterCpuProfile = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file") - masterReadWhiteList []string - masterWriteWhiteList []string + masterWhiteList []string ) func runMaster(cmd *Command, args []string) bool { @@ -69,17 +67,14 @@ func runMaster(cmd *Command, args []string) bool { if err := util.TestFolderWritable(*metaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) } - if *masterReadWhiteListOption != "" { - masterReadWhiteList = strings.Split(*masterReadWhiteListOption, ",") - } - if *masterWriteWhiteListOption != "" { - masterWriteWhiteList = strings.Split(*masterWriteWhiteListOption, ",") + if *masterWhiteListOption != "" { + masterWhiteList = strings.Split(*masterWhiteListOption, ",") } r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *mport, *metaFolder, *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, - masterReadWhiteList, masterWriteWhiteList, nil, *masterSecureKey, + masterWhiteList, *masterSecureKey, ) listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) diff --git a/weed/command/server.go b/weed/command/server.go index 9a19ef2af..7a6677a65 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -54,8 +54,7 @@ var ( serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") - serverReadWhiteListOption = cmdServer.Flag.String("read.whitelist", "", "comma separated Ip addresses having read permission. No limit if empty.") - serverWriteWhiteListOption = cmdServer.Flag.String("write.whitelist", "", "comma separated Ip addresses having write permission. No limit if empty.") + serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") @@ -75,8 +74,7 @@ var ( volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") - serverReadWhiteList []string - serverWriteWhiteList []string + serverWhiteList []string ) func init() { @@ -84,7 +82,7 @@ func init() { filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") - filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") + filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") @@ -94,21 +92,6 @@ func init() { filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") filerOptions.redis_password = cmdServer.Flag.String("filer.redis.password", "", "redis password in clear text") filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") - filerOptions.get_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.get", "", "comma separated Ip addresses having filer GET permission. No limit if empty.") - filerOptions.get_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.get", "", "comma separated root paths having filer GET permission. No limit if empty.") - filerOptions.head_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.head", "", "comma separated Ip addresses having filer HEAD permission. No limit if empty.") - filerOptions.head_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.head", "", "comma separated root paths having filer HEAD permission. No limit if empty.") - filerOptions.delete_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.delete", "", "comma separated Ip addresses having filer DELETE permission. No limit if empty.") - filerOptions.delete_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.delete", "", "comma separated root paths having filer DELETE permission. No limit if empty.") - filerOptions.put_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.put", "", "comma separated Ip addresses having filer PUT permission. No limit if empty.") - filerOptions.put_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.put", "", "comma separated root paths having filer PUT permission. No limit if empty.") - filerOptions.post_ip_whitelist_option = cmdServer.Flag.String("filer.whitelist.ip.post", "", "comma separated Ip addresses having filer POST permission. No limit if empty.") - filerOptions.post_root_whitelist_option = cmdServer.Flag.String("filer.whitelist.root.post", "", "comma separated root paths having filer POST permission. No limit if empty.") - filerOptions.get_secure_key = cmdServer.Flag.String("filer.secure.secret.get", "", "secret to encrypt Json Web Token(JWT)") - filerOptions.head_secure_key = cmdServer.Flag.String("filer.secure.secret.head", "", "secret to encrypt Json Web Token(JWT)") - filerOptions.delete_secure_key = cmdServer.Flag.String("filer.secure.secret.delete", "", "secret to encrypt Json Web Token(JWT)") - filerOptions.put_secure_key = cmdServer.Flag.String("filer.secure.secret.put", "", "secret to encrypt Json Web Token(JWT)") - filerOptions.post_secure_key = cmdServer.Flag.String("filer.secure.secret.post", "", "secret to encrypt Json Web Token(JWT)") } func runServer(cmd *Command, args []string) bool { @@ -171,56 +154,13 @@ func runServer(cmd *Command, args []string) bool { if err := util.TestFolderWritable(*filerOptions.dir); err != nil { glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) } - if *filerOptions.get_ip_whitelist_option != "" { - glog.V(0).Infof("Filer GET IP whitelist: %s", *filerOptions.get_ip_whitelist_option) - filerOptions.get_ip_whitelist = strings.Split(*filerOptions.get_ip_whitelist_option, ",") - } - if *filerOptions.get_root_whitelist_option != "" { - glog.V(0).Infof("Filer GET root whitelist: %s", *filerOptions.get_root_whitelist_option) - filerOptions.get_root_whitelist = strings.Split(*filerOptions.get_root_whitelist_option, ",") - } - if *filerOptions.head_ip_whitelist_option != "" { - glog.V(0).Infof("Filer HEAD IP whitelist: %s", *filerOptions.head_ip_whitelist_option) - filerOptions.head_ip_whitelist = strings.Split(*filerOptions.head_ip_whitelist_option, ",") - } - if *filerOptions.head_root_whitelist_option != "" { - glog.V(0).Infof("Filer HEAD root whitelist: %s", *filerOptions.head_root_whitelist_option) - filerOptions.head_root_whitelist = strings.Split(*filerOptions.head_root_whitelist_option, ",") - } - if *filerOptions.delete_ip_whitelist_option != "" { - glog.V(0).Infof("Filer DELETE IP whitelist: %s", *filerOptions.delete_ip_whitelist_option) - filerOptions.delete_ip_whitelist = strings.Split(*filerOptions.delete_ip_whitelist_option, ",") - } - if *filerOptions.delete_root_whitelist_option != "" { - glog.V(0).Infof("Filer DELETE root whitelist: %s", *filerOptions.delete_root_whitelist_option) - filerOptions.delete_root_whitelist = strings.Split(*filerOptions.delete_root_whitelist_option, ",") - } - if *filerOptions.put_ip_whitelist_option != "" { - glog.V(0).Infof("Filer PUT IP whitelist: %s", *filerOptions.put_ip_whitelist_option) - filerOptions.put_ip_whitelist = strings.Split(*filerOptions.put_ip_whitelist_option, ",") - } - if *filerOptions.put_root_whitelist_option != "" { - glog.V(0).Infof("Filer PUT root whitelist: %s", *filerOptions.put_root_whitelist_option) - filerOptions.put_root_whitelist = strings.Split(*filerOptions.put_root_whitelist_option, ",") - } - if *filerOptions.post_ip_whitelist_option != "" { - glog.V(0).Infof("Filer POST IP whitelist: %s", *filerOptions.post_ip_whitelist_option) - filerOptions.post_ip_whitelist = strings.Split(*filerOptions.post_ip_whitelist_option, ",") - } - if *filerOptions.post_root_whitelist_option != "" { - glog.V(0).Infof("Filer POST root whitelist: %s", *filerOptions.post_root_whitelist_option) - filerOptions.post_root_whitelist = strings.Split(*filerOptions.post_root_whitelist_option, ",") - } } if err := util.TestFolderWritable(*masterMetaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) } - if *serverReadWhiteListOption != "" { - serverReadWhiteList = strings.Split(*serverReadWhiteListOption, ",") - } - if *serverWriteWhiteListOption != "" { - serverWriteWhiteList = strings.Split(*serverWriteWhiteListOption, ",") + if *serverWhiteListOption != "" { + serverWhiteList = strings.Split(*serverWhiteListOption, ",") } if *isStartingFiler { @@ -234,9 +174,6 @@ func runServer(cmd *Command, args []string) bool { *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, - filerOptions.get_ip_whitelist, filerOptions.head_ip_whitelist, filerOptions.delete_ip_whitelist, filerOptions.put_ip_whitelist, filerOptions.post_ip_whitelist, - filerOptions.get_root_whitelist, filerOptions.head_root_whitelist, filerOptions.delete_root_whitelist, filerOptions.put_root_whitelist, filerOptions.post_root_whitelist, - *f.get_secure_key, *f.head_secure_key, *f.delete_secure_key, *f.put_secure_key, *f.post_secure_key, ) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) @@ -265,7 +202,7 @@ func runServer(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, - serverReadWhiteList, serverWriteWhiteList, nil, *serverSecureKey, + serverWhiteList, *serverSecureKey, ) glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) @@ -319,7 +256,7 @@ func runServer(cmd *Command, args []string) bool { folders, maxCounts, volumeNeedleMapKind, *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, - serverReadWhiteList, serverWriteWhiteList, nil, *volumeFixJpgOrientation, *volumeReadRedirect, + serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, ) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) diff --git a/weed/command/volume.go b/weed/command/volume.go index 68f5edd9e..21369cbe9 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -2,7 +2,6 @@ package command import ( "net/http" - _ "net/http/pprof" "os" "runtime" "strconv" @@ -33,8 +32,7 @@ type VolumeServerOptions struct { maxCpu *int dataCenter *string rack *string - readWhitelist []string - writeWhitelist []string + whiteList []string indexType *string fixJpgOrientation *bool readRedirect *bool @@ -69,8 +67,7 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") - volumeReadWhiteListOption = cmdVolume.Flag.String("read.whitelist", "", "comma separated Ip addresses having read permission. No limit if empty.") - volumeWriteWhiteListOption = cmdVolume.Flag.String("write.whitelist", "", "comma separated Ip addresses having write permission. No limit if empty.") + volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") ) func runVolume(cmd *Command, args []string) bool { @@ -99,11 +96,8 @@ func runVolume(cmd *Command, args []string) bool { } //security related white list configuration - if *volumeReadWhiteListOption != "" { - v.readWhitelist = strings.Split(*volumeReadWhiteListOption, ",") - } - if *volumeWriteWhiteListOption != "" { - v.writeWhitelist = strings.Split(*volumeWriteWhiteListOption, ",") + if *volumeWhiteListOption != "" { + v.whiteList = strings.Split(*volumeWhiteListOption, ",") } if *v.ip == "" { @@ -136,7 +130,7 @@ func runVolume(cmd *Command, args []string) bool { v.folders, v.folderMaxLimits, volumeNeedleMapKind, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, - v.readWhitelist, v.writeWhitelist, nil, + v.whiteList, *v.fixJpgOrientation, *v.readRedirect, ) diff --git a/weed/security/guard.go b/weed/security/guard.go index 6292c67c9..dea3b12f2 100644 --- a/weed/security/guard.go +++ b/weed/security/guard.go @@ -41,31 +41,17 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go */ type Guard struct { - ipWhiteList []string - rootWhiteList []string + whiteList []string SecretKey Secret isActive bool } -func NewGuard(ipWhiteList []string, rootWhiteList []string, secretKey string) *Guard { - g := &Guard{ipWhiteList: ipWhiteList, rootWhiteList: rootWhiteList, SecretKey: Secret(secretKey)} - g.isActive = len(g.ipWhiteList) != 0 || len(g.SecretKey) != 0 +func NewGuard(whiteList []string, secretKey string) *Guard { + g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)} + g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0 return g } -func (g *Guard) WhiteList2(f func(w http.ResponseWriter, r *http.Request, b bool)) func(w http.ResponseWriter, r *http.Request, b bool) { - if !g.isActive { - //if no security needed, just skip all checkings - return f - } - return func(w http.ResponseWriter, r *http.Request, b bool) { - if err := g.checkWhiteList(w, r); err != nil { - w.WriteHeader(http.StatusUnauthorized) - return - } - f(w, r, b) - } -} func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { if !g.isActive { @@ -110,14 +96,13 @@ func GetActualRemoteHost(r *http.Request) (host string, err error) { } func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { - if len(g.ipWhiteList) == 0 { - glog.V(0).Info("No whitelist specified for operation") + if len(g.whiteList) == 0 { return nil } host, err := GetActualRemoteHost(r) if err == nil { - for _, ip := range g.ipWhiteList { + for _, ip := range g.whiteList { // If the whitelist entry contains a "/" it // is a CIDR range, and we should check the @@ -129,7 +114,6 @@ func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { } remote := net.ParseIP(host) if cidrnet.Contains(remote) { - glog.V(0).Infof("Found %s in CIDR whitelist.", r.RemoteAddr) return nil } } @@ -138,28 +122,8 @@ func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error { // Otherwise we're looking for a literal match. // if ip == host { - glog.V(0).Infof("Found %s in whitelist.", r.RemoteAddr) return nil } - // ::1 is the same as 127.0.0.1 and localhost - if host == "::1" && (ip == "127.0.0.1" || ip == "localhost") { - glog.V(0).Infof("Found %s (localhost) in whitelist.", r.RemoteAddr) - return nil - } - } - } - // The root whitelist allows exceptions to the IP whitelist, but only by certain root paths in the request. - if len(g.rootWhiteList) > 0 { - pathParts := strings.Split(r.RequestURI, "/") - if len(pathParts) > 0 { - requestedRoot := pathParts[1] - for _, root := range g.rootWhiteList { - if root == requestedRoot { - glog.V(0).Infof("Found %s in root whitelist.", requestedRoot) - return nil - } - } - glog.V(0).Infof("Not in root whitelist: %s", requestedRoot) } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2a432af65..3c7c1fd9e 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -30,11 +30,6 @@ type FilerServer struct { filer filer.Filer maxMB int masterNodes *storage.MasterNodes - get_guard *security.Guard - head_guard *security.Guard - delete_guard *security.Guard - put_guard *security.Guard - post_guard *security.Guard } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, @@ -43,9 +38,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, - get_ip_whitelist []string, head_ip_whitelist []string, delete_ip_whitelist []string, put_ip_whitelist []string, post_ip_whitelist []string, - get_root_whitelist []string, head_root_whitelist []string, delete_root_whitelist []string, put_root_whitelist []string, post_root_whitelist []string, - get_secure_key string, head_secure_key string, delete_secure_key string, put_secure_key string, post_secure_key string, ) (fs *FilerServer, err error) { fs = &FilerServer{ master: master, @@ -54,11 +46,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, maxMB: maxMB, - get_guard: security.NewGuard(get_ip_whitelist, get_root_whitelist, get_secure_key), - head_guard: security.NewGuard(head_ip_whitelist, head_root_whitelist, head_secure_key), - delete_guard: security.NewGuard(delete_ip_whitelist, delete_root_whitelist, delete_secure_key), - put_guard: security.NewGuard(put_ip_whitelist, put_root_whitelist, put_secure_key), - post_guard: security.NewGuard(post_ip_whitelist, post_root_whitelist, post_secure_key), port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 4db170590..4e39258af 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -7,14 +7,14 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": - fs.get_guard.WhiteList2(fs.GetOrHeadHandler)(w, r, true) + fs.GetOrHeadHandler(w, r, true) case "HEAD": - fs.head_guard.WhiteList2(fs.GetOrHeadHandler)(w, r, false) + fs.GetOrHeadHandler(w, r, false) case "DELETE": - fs.delete_guard.WhiteList(fs.DeleteHandler)(w, r) + fs.DeleteHandler(w, r) case "PUT": - fs.put_guard.WhiteList(fs.PostHandler)(w, r) + fs.PostHandler(w, r) case "POST": - fs.post_guard.WhiteList(fs.PostHandler)(w, r) + fs.PostHandler(w, r) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3bd77c819..61bda6988 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -23,8 +23,7 @@ type MasterServer struct { pulseSeconds int defaultReplicaPlacement string garbageThreshold string - read_guard *security.Guard - write_guard *security.Guard + guard *security.Guard Topo *topology.Topology vg *topology.VolumeGrowth @@ -39,9 +38,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, confFile string, defaultReplicaPlacement string, garbageThreshold string, - ipReadWhiteList []string, - ipWriteWhiteList []string, - rootWhiteList []string, + whiteList []string, secureKey string, ) *MasterServer { ms := &MasterServer{ @@ -61,25 +58,24 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - ms.read_guard = security.NewGuard(ipReadWhiteList, rootWhiteList, secureKey) - ms.write_guard = security.NewGuard(ipWriteWhiteList, rootWhiteList, secureKey) + ms.guard = security.NewGuard(whiteList, secureKey) r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.read_guard.WhiteList(ms.uiStatusHandler)) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.write_guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.read_guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(ms.write_guard.WhiteList(ms.dirJoinHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.read_guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.write_guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.read_guard.WhiteList(ms.volumeLookupHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.write_guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.read_guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.write_guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.write_guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/delete", ms.write_guard.WhiteList(ms.deleteFromMasterServerHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.read_guard.WhiteList(ms.redirectHandler))) - r.HandleFunc("/stats/counter", ms.read_guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.read_guard.WhiteList(statsMemoryHandler)) + r.HandleFunc("/ui/index.html", ms.uiStatusHandler) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) + r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) ms.Topo.StartRefreshWritableVolumes(garbageThreshold) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 385290079..a762bf416 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -61,7 +61,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { ms.Topo.ProcessJoinMessage(joinMessage) writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.write_guard.SecretKey), + SecretKey: string(ms.guard.SecretKey), }) } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index a40eeb9c0..79a4276b1 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" - //"net/http/pprof" ) type VolumeServer struct { @@ -19,8 +18,7 @@ type VolumeServer struct { dataCenter string rack string store *storage.Store - read_guard *security.Guard - write_guard *security.Guard + guard *security.Guard needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -33,9 +31,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, needleMapKind storage.NeedleMapType, masterNode string, pulseSeconds int, dataCenter string, rack string, - ipReadWhiteList []string, - ipWriteWhiteList []string, - rootWhiteList []string, + whiteList []string, fixJpgOrientation bool, readRedirect bool) *VolumeServer { vs := &VolumeServer{ @@ -49,40 +45,28 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - vs.read_guard = security.NewGuard(ipReadWhiteList, rootWhiteList, "") - vs.write_guard = security.NewGuard(ipWriteWhiteList, rootWhiteList, "") + vs.guard = security.NewGuard(whiteList, "") - adminMux.HandleFunc("/ui/index.html", vs.read_guard.WhiteList(vs.uiStatusHandler)) - adminMux.HandleFunc("/status", vs.read_guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/admin/assign_volume", vs.write_guard.WhiteList(vs.assignVolumeHandler)) - adminMux.HandleFunc("/admin/vacuum/check", vs.write_guard.WhiteList(vs.vacuumVolumeCheckHandler)) - adminMux.HandleFunc("/admin/vacuum/compact", vs.write_guard.WhiteList(vs.vacuumVolumeCompactHandler)) - adminMux.HandleFunc("/admin/vacuum/commit", vs.write_guard.WhiteList(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/delete_collection", vs.write_guard.WhiteList(vs.deleteCollectionHandler)) - adminMux.HandleFunc("/admin/sync/status", vs.read_guard.WhiteList(vs.getVolumeSyncStatusHandler)) - adminMux.HandleFunc("/admin/sync/index", vs.write_guard.WhiteList(vs.getVolumeIndexContentHandler)) - adminMux.HandleFunc("/admin/sync/data", vs.write_guard.WhiteList(vs.getVolumeDataContentHandler)) - adminMux.HandleFunc("/stats/counter", vs.read_guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.read_guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.read_guard.WhiteList(vs.statsDiskHandler)) - adminMux.HandleFunc("/delete", vs.write_guard.WhiteList(vs.batchDeleteHandler)) - adminMux.HandleFunc("/", vs.read_guard.WhiteList(vs.privateStoreHandler)) + adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) + adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) + adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) + adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler)) + adminMux.HandleFunc("/", vs.privateStoreHandler) if publicMux != adminMux { // separated admin and public port publicMux.HandleFunc("/favicon.ico", vs.faviconHandler) publicMux.HandleFunc("/", vs.publicReadOnlyHandler) } - /* - // add in profiling support - adminMux.HandleFunc("/debug/pprof/", pprof.Index) - adminMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - adminMux.HandleFunc("/debug/pprof/profile", pprof.Profile) - adminMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - adminMux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) - adminMux.Handle("/debug/pprof/heap", pprof.Handler("heap")) - adminMux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) - adminMux.Handle("/debug/pprof/block", pprof.Handler("block")) - */ go func() { connected := true @@ -98,8 +82,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, if !connected { connected = true vs.SetMasterNode(master) - vs.read_guard.SecretKey = secretKey - vs.write_guard.SecretKey = secretKey + vs.guard.SecretKey = secretKey glog.V(0).Infoln("Volume Server Connected with master at", master) } } else { @@ -138,5 +121,5 @@ func (vs *VolumeServer) Shutdown() { } func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.read_guard.SecretKey, fileId) + return security.GenJwt(vs.guard.SecretKey, fileId) } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index f223af6c0..2d6fe7849 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -25,19 +25,19 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET": stats.ReadRequest() - vs.write_guard.WhiteList(vs.GetOrHeadHandler)(w, r) + vs.GetOrHeadHandler(w, r) case "HEAD": stats.ReadRequest() - vs.write_guard.WhiteList(vs.GetOrHeadHandler)(w, r) + vs.GetOrHeadHandler(w, r) case "DELETE": stats.DeleteRequest() - vs.write_guard.WhiteList(vs.DeleteHandler)(w, r) + vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT": stats.WriteRequest() - vs.write_guard.WhiteList(vs.PostHandler)(w, r) + vs.guard.WhiteList(vs.PostHandler)(w, r) case "POST": stats.WriteRequest() - vs.write_guard.WhiteList(vs.PostHandler)(w, r) + vs.guard.WhiteList(vs.PostHandler)(w, r) } } From 0f4c7dd8fdb2cc9278b8f1342a74b0ac1afa61e1 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:46:08 -0600 Subject: [PATCH 07/10] Revert "Ooops. Missed a line." This reverts commit 14d4252904ed0fad8a7d6d6156a70fcbc3eda12c. --- weed/server/filer_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3c7c1fd9e..c9bc0e021 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -34,7 +34,6 @@ type FilerServer struct { func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, replication string, redirectOnRead bool, disableDirListing bool, - maxMB int, secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, From 0d331c1e3ae0d038ae972279a63d2ff9a70e25f4 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:46:45 -0600 Subject: [PATCH 08/10] Revert "Changing needle_byte_cache so that it doesn't grow so big when larger files are added." This reverts commit 87fee21ef597a8b1bac5352d1327c13f87eeb000. --- weed/storage/needle_byte_cache.go | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/weed/storage/needle_byte_cache.go b/weed/storage/needle_byte_cache.go index 930ead81d..ae35a48ba 100644 --- a/weed/storage/needle_byte_cache.go +++ b/weed/storage/needle_byte_cache.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/golang-lru" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -25,7 +24,7 @@ In caching, the string~[]byte mapping is cached */ func init() { bytesPool = util.NewBytesPool() - bytesCache, _ = lru.NewWithEvict(50, func(key interface{}, value interface{}) { + bytesCache, _ = lru.NewWithEvict(512, func(key interface{}, value interface{}) { value.(*Block).decreaseReference() }) } @@ -47,37 +46,22 @@ func (block *Block) increaseReference() { // get bytes from the LRU cache of []byte first, then from the bytes pool // when []byte in LRU cache is evicted, it will be put back to the bytes pool func getBytesForFileBlock(r *os.File, offset int64, readSize int) (dataSlice []byte, block *Block, err error) { - //Skip the cache if we are looking for a block that is too big to fit in the cache (defaulting to 10MB) - cacheable := readSize <= (1024*1024*10) - if !cacheable { - glog.V(4).Infoln("Block too big to keep in cache. Size:", readSize) - } - cacheKey := string("") - if cacheable { // check cache, return if found - cacheKey = fmt.Sprintf("%d:%d:%d", r.Fd(), offset >> 3, readSize) + cacheKey := fmt.Sprintf("%d:%d:%d", r.Fd(), offset>>3, readSize) if obj, found := bytesCache.Get(cacheKey); found { - glog.V(4).Infoln("Found block in cache. Size:", readSize) block = obj.(*Block) block.increaseReference() dataSlice = block.Bytes[0:readSize] return dataSlice, block, nil - } } // get the []byte from pool b := bytesPool.Get(readSize) // refCount = 2, one by the bytesCache, one by the actual needle object - refCount := int32(1) - if cacheable { - refCount = 2 - } - block = &Block{Bytes: b, refCount: refCount} + block = &Block{Bytes: b, refCount: 2} dataSlice = block.Bytes[0:readSize] _, err = r.ReadAt(dataSlice, offset) - if cacheable { bytesCache.Add(cacheKey, block) - } return dataSlice, block, err } From a89a3c86d0bfa20ead98fec1d286cdc6018c3bde Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 15:47:46 -0600 Subject: [PATCH 09/10] Revert "Add AutoChunking to the Filer API, so that you can upload really large files through the filer API." This reverts commit 09059bfdccdeff1a588ee1326318075adb068b0f. --- weed/command/filer.go | 3 - weed/command/server.go | 2 - weed/server/filer_server.go | 2 - weed/server/filer_server_handlers_write.go | 208 --------------------- 4 files changed, 215 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index 0bd508e0b..582d4e9c8 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,7 +24,6 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool - maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -43,7 +42,6 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") - f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -84,7 +82,6 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, - *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, diff --git a/weed/command/server.go b/weed/command/server.go index 7a6677a65..1211c7137 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,7 +86,6 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") - filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -170,7 +169,6 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, - *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9bc0e021..b99bbd7c9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,7 +28,6 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer - maxMB int masterNodes *storage.MasterNodes } @@ -44,7 +43,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, - maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 872d8c4b9..e2d40f532 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,8 +20,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" - "path" - "strconv" ) type FilerPostResult struct { @@ -219,7 +217,6 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -230,10 +227,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } - if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { - return - } - var fileId, urlLocation string var err error @@ -250,17 +243,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) - - // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off - // because they need to provide FIDs instead of file paths... - cm, _ := strconv.ParseBool(query.Get("cm")) - if cm { - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - } glog.V(4).Infoln("post to", u) - request := &http.Request{ Method: r.Method, URL: u, @@ -336,197 +319,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } -func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } - - // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line - query := r.URL.Query() - - parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) - maxMB := int32(parsedMaxMB) - if maxMB <= 0 && fs.maxMB > 0 { - maxMB = int32(fs.maxMB) - } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") - - chunkSize := 1024 * 1024 * maxMB - - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } - - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false - } - - reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - return true -} - -func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { - - multipartReader, multipartReaderErr := r.MultipartReader() - if multipartReaderErr != nil { - return nil, multipartReaderErr - } - - part1, part1Err := multipartReader.NextPart() - if part1Err != nil { - return nil, part1Err - } - - fileName := part1.FileName() - if fileName != "" { - fileName = path.Base(fileName) - } - - chunks := (int64(contentLength) / int64(chunkSize)) + 1 - cm := operation.ChunkManifest{ - Name: fileName, - Size: 0, // don't know yet - Mime: "application/octet-stream", - Chunks: make([]*operation.ChunkInfo, 0, chunks), - } - - totalBytesRead := int64(0) - tmpBufferSize := int32(1024 * 1024) - tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) - chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow - chunkBufOffset := int32(0) - chunkOffset := int64(0) - writtenChunks := 0 - - filerResult = &FilerPostResult{ - Name: fileName, - } - - for totalBytesRead < contentLength { - tmpBuffer.Reset() - bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) - readFully := readErr != nil && readErr == io.EOF - tmpBuf := tmpBuffer.Bytes() - bytesToCopy := tmpBuf[0:int(bytesRead)] - - copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) - chunkBufOffset = chunkBufOffset + int32(bytesRead) - - if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { - writtenChunks = writtenChunks + 1 - fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) - if assignErr != nil { - return nil, assignErr - } - - // upload the chunk to the volume server - chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) - uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) - if uploadErr != nil { - return nil, uploadErr - } - - // Save to chunk manifest structure - cm.Chunks = append(cm.Chunks, - &operation.ChunkInfo{ - Offset: chunkOffset, - Size: int64(chunkBufOffset), - Fid: fileId, - }, - ) - - // reset variables for the next chunk - chunkBufOffset = 0 - chunkOffset = totalBytesRead + int64(bytesRead) - } - - totalBytesRead = totalBytesRead + int64(bytesRead) - - if bytesRead == 0 || readFully { - break - } - - if readErr != nil { - return nil, readErr - } - } - - cm.Size = totalBytesRead - manifestBuf, marshalErr := cm.Marshal() - if marshalErr != nil { - return nil, marshalErr - } - - manifestStr := string(manifestBuf) - glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) - - manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) - if manifestAssignmentErr != nil { - return nil, manifestAssignmentErr - } - glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) - filerResult.Fid = manifestFileId - - u, _ := url.Parse(manifestUrlLocation) - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - - manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) - if manifestUploadErr != nil { - return nil, manifestUploadErr - } - - path := r.URL.Path - // also delete the old fid unless PUT operation - if r.Method != "PUT" { - if oldFid, err := fs.filer.FindFile(path); err == nil { - operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) - } - } - - glog.V(4).Infoln("saving", path, "=>", manifestFileId) - if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { - replyerr = db_err - filerResult.Error = db_err.Error() - operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - return - } - - return -} - -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { - err = nil - - ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) - uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) - if uploadResult != nil { - glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) - } - if uploadError != nil { - err = uploadError - } - return -} - // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { From 01d3f69c5230c31dd37c5b6f2476f155bab9c9a6 Mon Sep 17 00:00:00 2001 From: Mike Tolman Date: Fri, 5 Aug 2016 16:01:30 -0600 Subject: [PATCH 10/10] Adding AutoChunk/MaxMB Support to Filer API This is related to the following issue I added to chrislusf/seaweedfs: https://github.com/chrislusf/seaweedfs/issues/342 --- weed/command/filer.go | 3 + weed/command/server.go | 2 + weed/server/filer_server.go | 3 + weed/server/filer_server_handlers_write.go | 208 +++++++++++++++++++++ 4 files changed, 216 insertions(+) diff --git a/weed/command/filer.go b/weed/command/filer.go index 582d4e9c8..0bd508e0b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -24,6 +24,7 @@ type FilerOptions struct { dir *string redirectOnRead *bool disableDirListing *bool + maxMB *int secretKey *string cassandra_server *string cassandra_keyspace *string @@ -42,6 +43,7 @@ func init() { f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit") f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -82,6 +84,7 @@ func runFiler(cmd *Command, args []string) bool { r := http.NewServeMux() _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.maxMB, *f.secretKey, *f.cassandra_server, *f.cassandra_keyspace, *f.redis_server, *f.redis_password, *f.redis_database, diff --git a/weed/command/server.go b/weed/command/server.go index 1211c7137..7a6677a65 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -86,6 +86,7 @@ func init() { filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit") filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") @@ -169,6 +170,7 @@ func runServer(cmd *Command, args []string) bool { _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.maxMB, *filerOptions.secretKey, *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b99bbd7c9..3c7c1fd9e 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,11 +28,13 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer + maxMB int masterNodes *storage.MasterNodes } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, replication string, redirectOnRead bool, disableDirListing bool, + maxMB int, secret string, cassandra_server string, cassandra_keyspace string, redis_server string, redis_password string, redis_database int, @@ -43,6 +45,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, + maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e2d40f532..872d8c4b9 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,6 +20,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" + "path" + "strconv" ) type FilerPostResult struct { @@ -217,6 +219,7 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -227,6 +230,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } + if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { + return + } + var fileId, urlLocation string var err error @@ -243,7 +250,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) + + // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off + // because they need to provide FIDs instead of file paths... + cm, _ := strconv.ParseBool(query.Get("cm")) + if cm { + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + } glog.V(4).Infoln("post to", u) + request := &http.Request{ Method: r.Method, URL: u, @@ -319,6 +336,197 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } +func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { + if r.Method != "POST" { + glog.V(4).Infoln("AutoChunking not supported for method", r.Method) + return false + } + + // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line + query := r.URL.Query() + + parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) + maxMB := int32(parsedMaxMB) + if maxMB <= 0 && fs.maxMB > 0 { + maxMB = int32(fs.maxMB) + } + if maxMB <= 0 { + glog.V(4).Infoln("AutoChunking not enabled") + return false + } + glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") + + chunkSize := 1024 * 1024 * maxMB + + contentLength := int64(0) + if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { + contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) + if contentLength <= int64(chunkSize) { + glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") + return false + } + } + + if contentLength <= 0 { + glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") + return false + } + + reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } + return true +} + +func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { + + multipartReader, multipartReaderErr := r.MultipartReader() + if multipartReaderErr != nil { + return nil, multipartReaderErr + } + + part1, part1Err := multipartReader.NextPart() + if part1Err != nil { + return nil, part1Err + } + + fileName := part1.FileName() + if fileName != "" { + fileName = path.Base(fileName) + } + + chunks := (int64(contentLength) / int64(chunkSize)) + 1 + cm := operation.ChunkManifest{ + Name: fileName, + Size: 0, // don't know yet + Mime: "application/octet-stream", + Chunks: make([]*operation.ChunkInfo, 0, chunks), + } + + totalBytesRead := int64(0) + tmpBufferSize := int32(1024 * 1024) + tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) + chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow + chunkBufOffset := int32(0) + chunkOffset := int64(0) + writtenChunks := 0 + + filerResult = &FilerPostResult{ + Name: fileName, + } + + for totalBytesRead < contentLength { + tmpBuffer.Reset() + bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) + readFully := readErr != nil && readErr == io.EOF + tmpBuf := tmpBuffer.Bytes() + bytesToCopy := tmpBuf[0:int(bytesRead)] + + copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) + chunkBufOffset = chunkBufOffset + int32(bytesRead) + + if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { + writtenChunks = writtenChunks + 1 + fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) + if assignErr != nil { + return nil, assignErr + } + + // upload the chunk to the volume server + chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + if uploadErr != nil { + return nil, uploadErr + } + + // Save to chunk manifest structure + cm.Chunks = append(cm.Chunks, + &operation.ChunkInfo{ + Offset: chunkOffset, + Size: int64(chunkBufOffset), + Fid: fileId, + }, + ) + + // reset variables for the next chunk + chunkBufOffset = 0 + chunkOffset = totalBytesRead + int64(bytesRead) + } + + totalBytesRead = totalBytesRead + int64(bytesRead) + + if bytesRead == 0 || readFully { + break + } + + if readErr != nil { + return nil, readErr + } + } + + cm.Size = totalBytesRead + manifestBuf, marshalErr := cm.Marshal() + if marshalErr != nil { + return nil, marshalErr + } + + manifestStr := string(manifestBuf) + glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) + + manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) + if manifestAssignmentErr != nil { + return nil, manifestAssignmentErr + } + glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) + filerResult.Fid = manifestFileId + + u, _ := url.Parse(manifestUrlLocation) + q := u.Query() + q.Set("cm", "true") + u.RawQuery = q.Encode() + + manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) + if manifestUploadErr != nil { + return nil, manifestUploadErr + } + + path := r.URL.Path + // also delete the old fid unless PUT operation + if r.Method != "PUT" { + if oldFid, err := fs.filer.FindFile(path); err == nil { + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + } + } + + glog.V(4).Infoln("saving", path, "=>", manifestFileId) + if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { + replyerr = db_err + filerResult.Error = db_err.Error() + operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + return + } + + return +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { + err = nil + + ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) + if uploadResult != nil { + glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) + } + if uploadError != nil { + err = uploadError + } + return +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {