diff --git a/go.mod b/go.mod index ee33926c0..bd07b3312 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 github.com/bwmarrin/snowflake v0.3.0 github.com/cespare/xxhash v1.1.0 - github.com/chrislusf/raft v1.0.5 + github.com/chrislusf/raft v1.0.6 github.com/coreos/go-semver v0.3.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 diff --git a/go.sum b/go.sum index 52166e601..4899bd7b8 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,8 @@ github.com/chrislusf/raft v1.0.4 h1:THhbsVik2hxdE0/VXX834f64Wn9RzgVPp+E+XCWZdKM= github.com/chrislusf/raft v1.0.4/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/chrislusf/raft v1.0.5 h1:g8GxKCSStfm0/bGBDpNEbmEXL6MJkpXX+NI0ksbX5D4= github.com/chrislusf/raft v1.0.5/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= +github.com/chrislusf/raft v1.0.6 h1:wunb85WWhMKhNRn7EmdIw35D4Lmew0ZJv8oYDizR/+Y= +github.com/chrislusf/raft v1.0.6/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index d8d069032..c296ce0c6 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.38" -version: 2.38 +appVersion: "2.39" +version: 2.39 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 19c0c78a1..e615090c1 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.38" - started using {.Chart.appVersion} + # imageTag: "2.39" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/command.go b/weed/command/command.go index ce754702f..b6efcead2 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -22,6 +22,7 @@ var Commands = []*Command{ cmdFilerReplicate, cmdFilerSynchronize, cmdFix, + cmdGateway, cmdMaster, cmdMount, cmdS3, diff --git a/weed/command/gateway.go b/weed/command/gateway.go new file mode 100644 index 000000000..8a6f852a5 --- /dev/null +++ b/weed/command/gateway.go @@ -0,0 +1,93 @@ +package command + +import ( + "net/http" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + gatewayOptions GatewayOptions +) + +type GatewayOptions struct { + masters *string + filers *string + bindIp *string + port *int + maxMB *int +} + +func init() { + cmdGateway.Run = runGateway // break init cycle + gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") + gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") + gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") + gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") + gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") +} + +var cmdGateway = &Command{ + UsageLine: "gateway -port=8888 -master=[,]* -filer=[,]*", + Short: "start a gateway server that points to a list of master servers or a list of filers", + Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. + + POST /blobs/ + upload the blob and return a chunk id + DELETE /blobs/ + delete a chunk id + + /* + POST /files/path/to/a/file + save /path/to/a/file on filer + DELETE /files/path/to/a/file + delete /path/to/a/file on filer + + POST /topics/topicName + save on filer to /topics/topicName//ts.json + */ +`, +} + +func runGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + gatewayOptions.startGateway() + + return true +} + +func (gw *GatewayOptions) startGateway() { + + defaultMux := http.NewServeMux() + + _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ + Masters: strings.Split(*gw.masters, ","), + Filers: strings.Split(*gw.filers, ","), + MaxMB: *gw.maxMB, + }) + if gws_err != nil { + glog.Fatalf("Gateway startup error: %v", gws_err) + } + + glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) + gatewayListener, e := util.NewListener( + *gw.bindIp+":"+strconv.Itoa(*gw.port), + time.Duration(10)*time.Second, + ) + if e != nil { + glog.Fatalf("Filer listener error: %v", e) + } + + httpS := &http.Server{Handler: defaultMux} + if err := httpS.Serve(gatewayListener); err != nil { + glog.Fatalf("Gateway Fail to serve: %v", e) + } + +} diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 46457f858..7b918e769 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -372,7 +372,6 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { return fuse.EPERM } - if !req.Dir { return dir.removeOneFile(req) } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 4419888c4..f04952e96 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -200,6 +200,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err fh.Lock() defer fh.Unlock() + fh.f.entryViewCache = nil + if fh.f.isOpen <= 0 { glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) fh.f.isOpen = 0 diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 40e96fd8c..7a7f8aa0c 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -235,7 +235,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error // print("+") resp, post_err := HttpClient.Do(req) if post_err != nil { - if !strings.Contains(post_err.Error(), "connection reset by peer"){ + if !strings.Contains(post_err.Error(), "connection reset by peer") { glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err) debug.PrintStack() } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 81b2ce1b0..3ab45453e 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -6,9 +6,7 @@ import ( "io" "io/ioutil" "net/http" - "runtime" "strings" - "sync" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -20,143 +18,75 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU())) -) +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { + var fileChunks []*filer_pb.FileChunk -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { - - md5Hash = md5.New() + md5Hash := md5.New() var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - // save small content directly - if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { - smallContent, err = ioutil.ReadAll(partReader) - dataSize = int64(len(smallContent)) - return - } + chunkOffset := int64(0) + var smallContent []byte - resultsChan := make(chan *ChunkCreationResult, 2) + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) - var waitForAllData sync.WaitGroup - waitForAllData.Add(1) - go func() { - // process upload results - defer waitForAllData.Done() - for result := range resultsChan { - if result.err != nil { - err = result.err + data, err := ioutil.ReadAll(limitedReader) + if err != nil { + return nil, nil, 0, err, nil + } + if chunkOffset == 0 && !isAppend(r) { + if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { + smallContent = data + chunkOffset += int64(len(data)) + break + } + } + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return nil, nil, 0, assignErr, nil + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) continue } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, result.chunk) + break } - }() - - var lock sync.Mutex - readOffset := int64(0) - var wg sync.WaitGroup - - for err == nil { - - wg.Add(1) - request := func() { - defer wg.Done() - - var localOffset int64 - // read from the input - lock.Lock() - localOffset = readOffset - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - data, readErr := ioutil.ReadAll(limitedReader) - readOffset += int64(len(data)) - lock.Unlock() - // handle read errors - if readErr != nil { - if err == nil { - err = readErr - } - if readErr != io.EOF { - resultsChan <- &ChunkCreationResult{ - err: readErr, - } - } - return - } - if len(data) == 0 { - readErr = io.EOF - if err == nil { - err = readErr - } - return - } - - // upload - dataReader := util.NewBytesReader(data) - fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) - if uploadErr != nil { - if err == nil { - err = uploadErr - } - resultsChan <- &ChunkCreationResult{ - err: uploadErr, - } - return - } - - glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) - - // send back uploaded file chunk - resultsChan <- &ChunkCreationResult{ - chunk: uploadResult.ToPbFileChunk(fileId, localOffset), - } - - } - limitedUploadProcessor.Execute(request) - } - - go func() { - wg.Wait() - close(resultsChan) - }() - - waitForAllData.Wait() - - if err == io.EOF { - err = nil - } - - return fileChunks, md5Hash, readOffset, err, nil -} - -type ChunkCreationResult struct { - chunk *filer_pb.FileChunk - err error -} - -func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return "", nil, assignErr - } - - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue + return nil, nil, 0, uploadErr, nil + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break } - break } - return fileId, uploadResult, uploadErr + + return fileChunks, md5Hash, chunkOffset, nil, smallContent } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go new file mode 100644 index 000000000..608217ed7 --- /dev/null +++ b/weed/server/gateway_server.go @@ -0,0 +1,106 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/operation" + "google.golang.org/grpc" + "math/rand" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/util" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + "github.com/chrislusf/seaweedfs/weed/glog" + _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" + _ "github.com/chrislusf/seaweedfs/weed/notification/log" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type GatewayOption struct { + Masters []string + Filers []string + MaxMB int + IsSecure bool +} + +type GatewayServer struct { + option *GatewayOption + secret security.SigningKey + grpcDialOption grpc.DialOption +} + +func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { + + fs = &GatewayServer{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + } + + if len(option.Masters) == 0 { + glog.Fatal("master list is required!") + } + + defaultMux.HandleFunc("/blobs/", fs.blobsHandler) + defaultMux.HandleFunc("/files/", fs.filesHandler) + defaultMux.HandleFunc("/topics/", fs.topicsHandler) + + return fs, nil +} + +func (fs *GatewayServer) getMaster() string { + randMaster := rand.Intn(len(fs.option.Masters)) + return fs.option.Masters[randMaster] +} + +func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + chunkId := r.URL.Path[len("/blobs/"):] + fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + var jwtAuthorization security.EncodedJwt + if fs.option.IsSecure { + jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) + } + body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + w.WriteHeader(statusCode) + w.Write(body) + case "POST": + submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) + } +} + +func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + case "POST": + } +} + +func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + } +} diff --git a/weed/util/constants.go b/weed/util/constants.go index 40f4deae2..fce35379d 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 38) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 39) COMMIT = "" ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 135d10c45..1630760b1 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -124,6 +124,27 @@ func Delete(url string, jwt string) error { return errors.New(string(body)) } +func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { + req, err := http.NewRequest("DELETE", url, nil) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + if err != nil { + return + } + resp, err := client.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return + } + httpStatus = resp.StatusCode + return +} + func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { r, err := client.PostForm(url, values) if err != nil {