diff --git a/README.md b/README.md index 5e18ac1e8..72492bb47 100644 --- a/README.md +++ b/README.md @@ -112,15 +112,15 @@ On top of the object store, optional [Filer] can support directories and POSIX a [Back to TOC](#table-of-contents) ## Filer Features ## -* [Filer server][Filer] provide "normal" directories and files via http. +* [Filer server][Filer] provides "normal" directories and files via http. * [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB. -* [Mount filer][Mount] to read and write files directly as a local directory via FUSE. -* [Amazon S3 compatible API][AmazonS3API] to access files with S3 tooling. -* [Hadoop Compatible File System][Hadoop] to access files from Hadoop/Spark/Flink/etc jobs. +* [Mount filer][Mount] reads and writes files directly as a local directory via FUSE. +* [Amazon S3 compatible API][AmazonS3API] accesses files with S3 tooling. +* [Hadoop Compatible File System][Hadoop] accesses files from Hadoop/Spark/Flink/etc jobs. * [Async Backup To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze. -* [WebDAV] access as a mapped drive on Mac and Windows, or from mobile devices. +* [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices. * [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data. -* [File TTL][FilerTTL] automatically purge file metadata and actual file data. +* [File TTL][FilerTTL] automatically purges file metadata and actual file data. * [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/) [Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files diff --git a/go.mod b/go.mod index e5fc3bbfd..cdb951f9c 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( cloud.google.com/go v0.44.3 github.com/Azure/azure-pipeline-go v0.2.2 // indirect github.com/Azure/azure-storage-blob-go v0.8.0 - github.com/DataDog/zstd v1.4.1 // indirect github.com/OneOfOne/xxhash v1.2.2 github.com/Shopify/sarama v1.23.1 github.com/aws/aws-sdk-go v1.23.13 diff --git a/go.sum b/go.sum index 28461324e..bc37db039 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= -github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190605020000-c4ba1fdf4d36/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -234,8 +232,6 @@ github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= -github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 8664e9394..73d9a67e4 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 1.87 \ No newline at end of file +version: 1.88 \ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index b2fbc17e5..6fb25e0a3 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "1.87" + imageTag: "1.88" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 4d8f93bff..6727f749f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 4d8f93bff..6727f749f 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index bb2ba5e74..ed3f07298 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.4 + 1.4.5 org.sonatype.oss diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 1248ff13f..79e8d9bc4 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -76,8 +76,7 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - if(chunk.getIsChunkManifest()){ - // only cache manifest chunks + if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); } diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index d00291c98..c54f8d2a7 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -120,6 +120,180 @@ + + + org.apache.hadoop + hadoop-client + 2.9.2 + provided + + + hadoop-hdfs-client + org.apache.hadoop + + + hadoop-mapreduce-client-app + org.apache.hadoop + + + hadoop-yarn-api + org.apache.hadoop + + + hadoop-mapreduce-client-core + org.apache.hadoop + + + hadoop-mapreduce-client-jobclient + org.apache.hadoop + + + hadoop-annotations + org.apache.hadoop + + + + + org.apache.hadoop + hadoop-common + 2.9.2 + provided + + + commons-cli + commons-cli + + + commons-math3 + org.apache.commons + + + xmlenc + xmlenc + + + commons-io + commons-io + + + commons-net + commons-net + + + commons-collections + commons-collections + + + servlet-api + javax.servlet + + + jetty + org.mortbay.jetty + + + jetty-util + org.mortbay.jetty + + + jetty-sslengine + org.mortbay.jetty + + + jsp-api + javax.servlet.jsp + + + jersey-core + com.sun.jersey + + + jersey-json + com.sun.jersey + + + jersey-server + com.sun.jersey + + + log4j + log4j + + + jets3t + net.java.dev.jets3t + + + commons-lang + commons-lang + + + commons-configuration + commons-configuration + + + commons-lang3 + org.apache.commons + + + slf4j-log4j12 + org.slf4j + + + jackson-core-asl + org.codehaus.jackson + + + jackson-mapper-asl + org.codehaus.jackson + + + avro + org.apache.avro + + + hadoop-auth + org.apache.hadoop + + + jsch + com.jcraft + + + curator-client + org.apache.curator + + + curator-recipes + org.apache.curator + + + htrace-core4 + org.apache.htrace + + + zookeeper + org.apache.zookeeper + + + commons-compress + org.apache.commons + + + stax2-api + org.codehaus.woodstox + + + woodstox-core + com.fasterxml.woodstox + + + hadoop-annotations + org.apache.hadoop + + + + ossrh @@ -127,7 +301,7 @@ - 1.4.4 + 1.4.5 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 6d9191727..2c8d4ce32 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.4 + 1.4.5 2.9.2 @@ -147,6 +147,7 @@ org.apache.hadoop hadoop-client ${hadoop.version} + provided com.github.chrislusf @@ -157,6 +158,7 @@ org.apache.hadoop hadoop-common ${hadoop.version} + provided diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 0dcc49b3f..5f1e278f8 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.4.4 + 1.4.5 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 05a613759..b1bd27f74 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.4 + 1.4.5 3.1.1 @@ -147,6 +147,7 @@ org.apache.hadoop hadoop-client ${hadoop.version} + provided com.github.chrislusf @@ -157,6 +158,7 @@ org.apache.hadoop hadoop-common ${hadoop.version} + provided diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go new file mode 100644 index 000000000..e8368d124 --- /dev/null +++ b/unmaintained/s3/presigned_put/presigned_put.go @@ -0,0 +1,73 @@ +package main + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "encoding/base64" + "fmt" + "crypto/md5" + "strings" + "time" + "net/http" +) + +// Downloads an item from an S3 Bucket in the region configured in the shared config +// or AWS_REGION environment variable. +// +// Usage: +// go run presigned_put.go +// For this exampl to work, the domainName is needd +// weed s3 -domainName=localhost +func main() { + h := md5.New() + content := strings.NewReader(stringContent) + content.WriteTo(h) + + // Initialize a session in us-west-2 that the SDK will use to load + // credentials from the shared credentials file ~/.aws/credentials. + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + }) + + // Create S3 service client + svc := s3.New(sess) + + putRequest, output := svc.PutObjectRequest(&s3.PutObjectInput{ + Bucket: aws.String("dev"), + Key: aws.String("testKey"), + }) + fmt.Printf("output: %+v\n", output) + + md5s := base64.StdEncoding.EncodeToString(h.Sum(nil)) + putRequest.HTTPRequest.Header.Set("Content-MD5", md5s) + + url, err := putRequest.Presign(15 * time.Minute) + if err != nil { + fmt.Println("error presigning request", err) + return + } + + fmt.Println(url) + + req, err := http.NewRequest("PUT", url, strings.NewReader(stringContent)) + req.Header.Set("Content-MD5", md5s) + if err != nil { + fmt.Println("error creating request", url) + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("error put request: %v\n", err) + return + } + fmt.Printf("response: %+v\n", resp) +} + +var stringContent = `Generate a Pre-Signed URL for an Amazon S3 PUT Operation with a Specific Payload +You can generate a pre-signed URL for a PUT operation that checks whether users upload the correct content. When the SDK pre-signs a request, it computes the checksum of the request body and generates an MD5 checksum that is included in the pre-signed URL. Users must upload the same content that produces the same MD5 checksum generated by the SDK; otherwise, the operation fails. This is not the Content-MD5, but the signature. To enforce Content-MD5, simply add the header to the request. + +The following example adds a Body field to generate a pre-signed PUT operation that requires a specific payload to be uploaded by users. +` \ No newline at end of file diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index c7df007ec..e9707d3ae 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,7 +2,6 @@ package filer2 import ( "bytes" - "fmt" "io" "math" "strings" @@ -15,7 +14,7 @@ import ( func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - fmt.Printf("start to stream content for chunks: %+v\n", chunks) + // fmt.Printf("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) fileId2Url := make(map[string]string) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 2214b1ac7..08332d967 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -101,18 +101,22 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } + }) } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + }) } @@ -312,6 +316,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.deleteFileChunks(entry.Chunks) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) glog.V(3).Infof("remove file: %v", req) @@ -328,13 +334,17 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.fsNodeCache.DeleteFsNode(t) dir.wfs.metaCache.DeleteEntry(context.Background(), t) glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) if err != nil { - glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) + if strings.Contains(err.Error(), "non-empty"){ + return fuse.EEXIST + } return fuse.ENOENT } @@ -420,6 +430,8 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp func (dir *Dir) Forget() { glog.V(3).Infof("Forget dir %s", dir.FullPath()) + + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } func (dir *Dir) maybeLoadEntry() error { diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 120dffd1d..da4f1b232 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -62,6 +62,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector } // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) delete(dir.wfs.handles, oldPath.AsInode()) return err diff --git a/weed/filesys/file.go b/weed/filesys/file.go index f96cd8118..dcda93522 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -213,6 +213,7 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) glog.V(3).Infof("Forget file %s", t) + file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ca35bfd02..b9d224fb2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -9,11 +9,12 @@ import ( "os" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type FileHandle struct { @@ -225,6 +226,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil + // special handling of one chunk md5 + if len(chunks) == 1 { + } + if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go new file mode 100644 index 000000000..b146f0615 --- /dev/null +++ b/weed/filesys/fscache.go @@ -0,0 +1,207 @@ +package filesys + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse/fs" +) + +type FsCache struct { + root *FsNode + sync.RWMutex +} +type FsNode struct { + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode +} + +func newFsCache(root fs.Node) *FsCache { + return &FsCache{ + root: &FsNode{ + node: root, + }, + } +} + +func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return nil + } + } + return t.node +} + +func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { + t := c.root + for _, p := range path.Split() { + t = t.ensureChild(p) + } + t.node = node +} + +func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) + if t != nil { + return t + } + t = genNodeFn() + c.doSetFsNode(path, t) + return t +} + +func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return + } + } + if t.parent != nil { + t.parent.disconnectChild(t) + } + t.deleteSelf() +} + +// oldPath and newPath are full path including the new name +func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + + c.Lock() + defer c.Unlock() + + // find old node + src := c.root + for _, p := range oldPath.Split() { + src = src.findChild(p) + if src == nil { + return src + } + } + if src.parent != nil { + src.parent.disconnectChild(src) + } + + // find new node + target := c.root + for _, p := range newPath.Split() { + target = target.ensureChild(p) + } + parent := target.parent + src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } + if f, ok := src.node.(*File); ok { + f.Name = target.name + if f.entry != nil { + f.entry.Name = f.Name + } + } + parent.disconnectChild(target) + + target.deleteSelf() + + src.connectToParent(parent) + + return src +} + +func (n *FsNode) connectToParent(parent *FsNode) { + n.parent = parent + oldNode := parent.findChild(n.name) + if oldNode != nil { + oldNode.deleteSelf() + } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + if f, ok := n.node.(*File); ok { + f.dir = parent.node.(*Dir) + } + n.childrenLock.Lock() + parent.children[n.name] = n + n.childrenLock.Unlock() +} + +func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + + child, found := n.children[name] + if found { + return child + } + return nil +} + +func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + + if n.children == nil { + n.children = make(map[string]*FsNode) + } + child, found := n.children[name] + if found { + return child + } + t := &FsNode{ + parent: n, + node: nil, + name: name, + children: nil, + } + n.children[name] = t + return t +} + +func (n *FsNode) disconnectChild(child *FsNode) { + n.childrenLock.Lock() + delete(n.children, child.name) + n.childrenLock.Unlock() + child.parent = nil +} + +func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() + for _, child := range n.children { + child.deleteSelf() + } + n.children = nil + n.childrenLock.Unlock() + + n.node = nil + n.parent = nil + +} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go new file mode 100644 index 000000000..67f9aacc8 --- /dev/null +++ b/weed/filesys/fscache_test.go @@ -0,0 +1,96 @@ +package filesys + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestPathSplit(t *testing.T) { + parts := util.FullPath("/").Split() + if len(parts) != 0 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + + parts = util.FullPath("/readme.md").Split() + if len(parts) != 1 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + +} + +func TestFsCache(t *testing.T) { + + cache := newFsCache(nil) + + x := cache.GetFsNode(util.FullPath("/y/x")) + if x != nil { + t.Errorf("wrong node!") + } + + p := util.FullPath("/a/b/c") + cache.SetFsNode(p, &File{Name: "cc"}) + tNode := cache.GetFsNode(p) + tFile := tNode.(*File) + if tFile.Name != "cc" { + t.Errorf("expecting a FsNode") + } + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + b := cache.GetFsNode(util.FullPath("/a/b")) + if b != nil { + t.Errorf("unexpected node!") + } + + a := cache.GetFsNode(util.FullPath("/a")) + if a == nil { + t.Errorf("missing node!") + } + + cache.DeleteFsNode(util.FullPath("/a")) + if b != nil { + t.Errorf("unexpected node!") + } + + a = cache.GetFsNode(util.FullPath("/a")) + if a != nil { + t.Errorf("wrong DeleteFsNode!") + } + + z := cache.GetFsNode(util.FullPath("/z")) + if z == nil { + t.Errorf("missing node!") + } + + y := cache.GetFsNode(util.FullPath("/x/y")) + if y != nil { + t.Errorf("wrong node!") + } + +} + +func TestFsCacheMove(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) + + d := cache.GetFsNode(util.FullPath("/z/x/d")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "dd" { + t.Errorf("unexpected non dd node!") + } + +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 68ad987be..22f0b655a 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -63,6 +63,7 @@ type WFS struct { stats statsCache root fs.Node + fsNodeCache *FsCache chunkCache *chunk_cache.ChunkCache metaCache *meta_cache.MetaCache @@ -82,7 +83,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, }, } - cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, 0755) @@ -100,6 +101,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }) wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 74d58d1b5..177c620f4 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -18,7 +18,7 @@ type MockClient struct { } func (m *MockClient) Do(req *http.Request) (*http.Response, error) { - n, originalSize, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) + n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) if m.needleHandling != nil { m.needleHandling(n, originalSize, err) } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index cb129daa2..6fd8a60d1 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,7 +2,6 @@ package operation import ( "bytes" - "crypto/md5" "encoding/json" "errors" "fmt" @@ -23,14 +22,14 @@ import ( ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` - CipherKey []byte `json:"cipherKey,omitempty"` - Mime string `json:"mime,omitempty"` - Gzip uint32 `json:"gzip,omitempty"` - Md5 string `json:"md5,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + ContentMd5 string `json:"contentMd5,omitempty"` } func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { @@ -65,20 +64,12 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") // Upload sends a POST request to a volume server to upload the content with adjustable compression level func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = util.Md5(data) - } return } // Upload sends a POST request to a volume server to upload the content with fast compression func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { - hash := md5.New() - reader = io.TeeReader(reader, hash) uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) - if uploadResult != nil { - uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) - } return } @@ -241,6 +232,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, errors.New(ret.Error) } ret.ETag = etag + ret.ContentMd5 = resp.Header.Get("Content-MD5") return &ret, nil } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index db5f4c8a3..851f6d4a3 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -107,6 +107,16 @@ func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identi return nil, nil, false } +func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, found bool) { + + for _, ident := range iam.identities { + if ident.Name == "anonymous" { + return ident, true + } + } + return nil, false +} + func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc { if !iam.isEnabled() { @@ -127,6 +137,7 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) ErrorCode { var identity *Identity var s3Err ErrorCode + var found bool switch getRequestAuthType(r) { case authTypeStreamingSigned: return ErrNone @@ -146,7 +157,10 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) glog.V(3).Infof("jwt auth type") return ErrNotImplemented case authTypeAnonymous: - return ErrAccessDenied + identity, found = iam.lookupAnonymous() + if !found { + return ErrAccessDenied + } default: return ErrNotImplemented } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 357ac9ce0..84d685fa8 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -40,20 +40,22 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } - rAuthType := getRequestAuthType(r) dataReader := r.Body - var s3ErrCode ErrorCode - switch rAuthType { - case authTypeStreamingSigned: - dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) - case authTypeSignedV2, authTypePresignedV2: - _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) - case authTypePresigned, authTypeSigned: - _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) - } - if s3ErrCode != ErrNone { - writeErrorResponse(w, s3ErrCode, r.URL) - return + if s3a.iam.isEnabled() { + rAuthType := getRequestAuthType(r) + var s3ErrCode ErrorCode + switch rAuthType { + case authTypeStreamingSigned: + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) + case authTypeSignedV2, authTypePresignedV2: + _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) + case authTypePresigned, authTypeSigned: + _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) + } + if s3ErrCode != ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } } defer dataReader.Close() diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 0ed96afa2..7611b1e7e 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -179,20 +179,22 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ return } - rAuthType := getRequestAuthType(r) dataReader := r.Body - var s3ErrCode ErrorCode - switch rAuthType { - case authTypeStreamingSigned: - dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) - case authTypeSignedV2, authTypePresignedV2: - _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) - case authTypePresigned, authTypeSigned: - _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) - } - if s3ErrCode != ErrNone { - writeErrorResponse(w, s3ErrCode, r.URL) - return + if s3a.iam.isEnabled() { + rAuthType := getRequestAuthType(r) + var s3ErrCode ErrorCode + switch rAuthType { + case authTypeStreamingSigned: + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) + case authTypeSignedV2, authTypePresignedV2: + _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) + case authTypePresigned, authTypeSigned: + _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) + } + if s3ErrCode != ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } } defer dataReader.Close() diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 9203c56f3..311442551 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -2,6 +2,7 @@ package s3api import ( "context" + "encoding/xml" "fmt" "io" "net/http" @@ -12,10 +13,24 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) +type ListBucketResultV2 struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + MaxKeys int `xml:"MaxKeys"` + Delimiter string `xml:"Delimiter,omitempty"` + IsTruncated bool `xml:"IsTruncated"` + Contents []ListEntry `xml:"Contents,omitempty"` + CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` + ContinuationToken string `xml:"ContinuationToken,omitempty"` + NextContinuationToken string `xml:"NextContinuationToken,omitempty"` + KeyCount int `xml:"KeyCount"` + StartAfter string `xml:"StartAfter,omitempty"` +} + func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html @@ -23,7 +38,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ // collect parameters bucket, _ := getBucketAndObject(r) - originalPrefix, marker, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) + originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query()) if maxKeys < 0 { writeErrorResponse(w, ErrInvalidMaxKeys, r.URL) @@ -34,7 +49,8 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ return } - if marker == "" { + marker := continuationToken + if continuationToken == "" { marker = startAfter } @@ -44,8 +60,22 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ writeErrorResponse(w, ErrInternalError, r.URL) return } + responseV2 := &ListBucketResultV2{ + XMLName: response.XMLName, + Name: response.Name, + CommonPrefixes: response.CommonPrefixes, + Contents: response.Contents, + ContinuationToken: continuationToken, + Delimiter: response.Delimiter, + IsTruncated: response.IsTruncated, + KeyCount: len(response.Contents), + MaxKeys: response.MaxKeys, + NextContinuationToken: response.NextMarker, + Prefix: response.Prefix, + StartAfter: startAfter, + } - writeSuccessResponseXML(w, encodeResponse(response)) + writeSuccessResponseXML(w, encodeResponse(responseV2)) } func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { @@ -76,70 +106,39 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ writeSuccessResponseXML(w, encodeResponse(response)) } -func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { +func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { // convert full path prefix into directory name and prefix for entry name - dir, prefix := filepath.Split(originalPrefix) - if strings.HasPrefix(dir, "/") { - dir = dir[1:] + reqDir, prefix := filepath.Split(originalPrefix) + if strings.HasPrefix(reqDir, "/") { + reqDir = reqDir[1:] } + if strings.HasSuffix(reqDir, "/") { + // remove trailing "/" + reqDir = reqDir[:len(reqDir)-1] + } + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) + reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir) + + var contents []ListEntry + var commonPrefixes []PrefixEntry + var isTruncated bool + var doErr error + var nextMarker string // check filer err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.ListEntriesRequest{ - Directory: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, dir), - Prefix: prefix, - Limit: uint32(maxKeys + 1), - StartFromFileName: marker, - InclusiveStartFrom: false, - } - - stream, err := client.ListEntries(context.Background(), request) - if err != nil { - return fmt.Errorf("list buckets: %v", err) - } - - var contents []ListEntry - var commonPrefixes []PrefixEntry - var counter int - var lastEntryName string - var isTruncated bool - - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - - entry := resp.Entry - counter++ - if counter > maxKeys { - isTruncated = true - break - } - lastEntryName = entry.Name + _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { - if entry.Name != ".uploads" { + if delimiter == "/" { prefix = fmt.Sprintf("%s%s/", dir, entry.Name) - commonPrefixes = append(commonPrefixes, PrefixEntry{ - Prefix: prefix, + Prefix: prefix[len(bucketPrefix):], }) - - if delimiter != "/" { - response, _ := s3a.listFilerEntries(bucket, prefix, maxKeys, marker, delimiter) - for _, content := range response.Contents { - contents = append(contents, content) - } - } } } else { contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s%s", dir, entry.Name), + Key: fmt.Sprintf("%s%s", dir[len(bucketPrefix):], entry.Name), LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), ETag: "\"" + filer2.ETag(entry) + "\"", Size: int64(filer2.TotalSize(entry.Chunks)), @@ -150,29 +149,123 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys StorageClass: "STANDARD", }) } + }) + if doErr != nil { + return doErr + } + if !isTruncated { + nextMarker = "" } response = ListBucketResult{ Name: bucket, Prefix: originalPrefix, Marker: marker, - NextMarker: lastEntryName, + NextMarker: nextMarker, MaxKeys: maxKeys, - Delimiter: "/", + Delimiter: delimiter, IsTruncated: isTruncated, Contents: contents, CommonPrefixes: commonPrefixes, } - glog.V(4).Infof("read directory: %v, found: %v, %+v", request, counter, response) - return nil }) return } +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) { + // invariants + // prefix and marker should be under dir, marker may contain "/" + // maxKeys should be updated for each recursion + + if prefix == "/" && delimiter == "/" { + return + } + if maxKeys <= 0 { + return + } + + if strings.Contains(marker, "/") { + sepIndex := strings.Index(marker, "/") + subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:] + // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys) + subCounter, _, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn) + if subErr != nil { + err = subErr + return + } + maxKeys -= subCounter + nextMarker = subDir + "/" + subNextMarker + counter += subCounter + // finished processing this sub directory + marker = subDir + } + + // now marker is also a direct child of dir + request := &filer_pb.ListEntriesRequest{ + Directory: dir, + Prefix: prefix, + Limit: uint32(maxKeys + 1), + StartFromFileName: marker, + InclusiveStartFrom: false, + } + + stream, listErr := client.ListEntries(context.Background(), request) + if listErr != nil { + err = fmt.Errorf("list entires %+v: %v", request, listErr) + return + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + err = fmt.Errorf("iterating entires %+v: %v", request, recvErr) + return + } + } + if counter >= maxKeys { + isTruncated = true + return + } + entry := resp.Entry + nextMarker = entry.Name + if entry.IsDirectory { + // println("ListEntries", dir, "dir:", entry.Name) + if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys + eachEntryFn(dir, entry) + if delimiter != "/" { + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter) + subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn) + if subErr != nil { + err = fmt.Errorf("doListFilerEntries2: %v", subErr) + return + } + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated) + counter += subCounter + nextMarker = entry.Name + "/" + subNextMarker + if subIsTruncated { + isTruncated = true + return + } + } else { + counter++ + } + } + } else { + // println("ListEntries", dir, "file:", entry.Name) + eachEntryFn(dir, entry) + counter++ + } + } + return +} + func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int) { prefix = values.Get("prefix") token = values.Get("continuation-token") diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index da66178ce..d22376a45 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,22 +2,11 @@ package weed_server import ( "context" - "crypto/md5" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -98,206 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { - return - } + fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - - return - } - - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) - return - } - - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) - if err != nil { - return - } - - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: int64(ret.Size), - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) -} - -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() - - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" - } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: util.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, - Mime: ret.Mime, - Md5: md5value, - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if entry.Attr.Mime == "" { - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return - } - - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() - - ret = &operation.UploadResult{} - - md5Hash := md5.New() - body := r.Body - if r.Method == "PUT" { - // only PUT or large chunked files has Md5 in attributes - body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) - } - - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: body, - Host: r.Host, - ContentLength: r.ContentLength, - } - - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - // use filer calculated md5 ETag, instead of the volume server crc ETag - if r.Method == "PUT" { - md5value = md5Hash.Sum(nil) - } - ret.ETag = getEtag(resp) - return } // curl -X DELETE http://localhost:8888/path/to diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index be0438efb..0365ea3ab 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,9 +3,11 @@ package weed_server import ( "context" "crypto/md5" + "hash" "io" "io/ioutil" "net/http" + "os" "path" "strconv" "strings" @@ -21,11 +23,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -35,54 +33,43 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if maxMB <= 0 && fs.option.MaxMB > 0 { maxMB = int32(fs.option.MaxMB) } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") chunkSize := 1024 * 1024 * maxMB - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } - - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false - } - - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - return true -} - -func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) }() + var reply *FilerPostResult + var err error + var md5bytes []byte + if r.Method == "POST" { + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } else { + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + if len(md5bytes) > 0 { + w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + } + writeJsonQuiet(w, r, http.StatusCreated, reply) + } +} + +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { + multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { - return nil, multipartReaderErr + return nil, nil, multipartReaderErr } part1, part1Err := multipartReader.NextPart() if part1Err != nil { - return nil, part1Err + return nil, nil, part1Err } fileName := part1.FileName() @@ -90,46 +77,13 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r fileName = path.Base(fileName) } contentType := part1.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } - var fileChunks []*filer_pb.FileChunk - - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash)) - - chunkOffset := int64(0) - - for chunkOffset < contentLength { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - if assignErr != nil { - return nil, assignErr - } - - // upload the chunk to the volume server - uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) - if uploadErr != nil { - return nil, uploadErr - } - - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) - - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) - - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err } fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) @@ -138,6 +92,48 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r return } + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + + return +} + +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { + + fileName := "" + contentType := "" + + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err + } + + fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } + + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + + return +} + +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -145,20 +141,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + // fix the crTime + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + crTime := time.Now() + if err == nil && existingEntry != nil { + crTime = existingEntry.Crtime + } + + glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ FullPath: util.FullPath(path), Attr: filer2.Attr{ Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, + Crtime: crTime, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: ttlSec, Mime: contentType, - Md5: md5Hash.Sum(nil), + Md5: md5bytes, }, Chunks: fileChunks, } @@ -173,10 +177,52 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - return } + return filerResult, replyerr +} - return +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { + var fileChunks []*filer_pb.FileChunk + + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + chunkOffset := int64(0) + + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, nil, 0, assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) + if uploadErr != nil { + return nil, nil, 0, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break + } + } + return fileChunks, md5Hash, chunkOffset, nil } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) { @@ -209,4 +255,3 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil } } - diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8413496b8..6ec06d3de 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Collection: collection, TtlSec: ttlSeconds, Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1ee214deb..579b30a6a 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -19,14 +19,13 @@ import ( func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode - t := ms.Topo defer func() { if dn != nil { // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other - t.UnRegisterDataNode(dn) + ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ @@ -62,11 +61,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } - t.Sequence.SetMax(heartbeat.MaxFileKey) + ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - dc := t.GetOrCreateDataCenter(dcName) + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, @@ -88,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.UpAdjustMaxVolumeCountDelta(delta) } - glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) + glog.V(5).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, @@ -102,12 +101,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ message.DeletedVids = append(message.DeletedVids, volInfo.Id) } // update master internal volume layouts - t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) + ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { // process heartbeat.Volumes - newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) @@ -122,7 +121,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { // update master internal volume layouts - t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { message.NewVids = append(message.NewVids, s.Id) @@ -138,7 +137,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) - newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) + newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients for _, s := range newShards { @@ -163,7 +162,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } // tell the volume servers about the leader - newLeader, err := t.Leader() + newLeader, err := ms.Topo.Leader() if err != nil { glog.Warningf("SendHeartbeat find leader: %v", err) return err diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 7cb836344..c62a4a388 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,7 +2,6 @@ package weed_server import ( "fmt" - "net" "time" "google.golang.org/grpc" @@ -87,12 +86,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) } } } - if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { - glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) newLeader = in.GetLeader() doneChan <- nil return @@ -169,13 +168,13 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err } case <-ecShardTickChan: - glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) + glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err @@ -185,16 +184,3 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } } } - -func isSameIP(ip string, host string) bool { - ips, err := net.LookupIP(host) - if err != nil { - return false - } - for _, t := range ips { - if ip == t.String() { - return true - } - } - return false -} diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 74dad28de..b4f8a90b2 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -70,6 +70,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) + w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 150d6ee4b..7c7aa3feb 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -48,7 +48,7 @@ func (n *Needle) String() (str string) { return } -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) { +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) { n = new(Needle) pu, e := ParseUpload(r, sizeLimit) if e != nil { @@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit originalSize = pu.OriginalDataSize n.LastModified = pu.ModifiedTime n.Ttl = pu.Ttl + contentMd5 = pu.ContentMd5 if len(pu.FileName) < 256 { n.Name = []byte(pu.FileName) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index dd678f87f..4d244046e 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -29,6 +29,7 @@ type ParsedUpload struct { Ttl *TTL IsChunkedFile bool UncompressedData []byte + ContentMd5 string } func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { @@ -83,11 +84,13 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { } } + // md5 + h := md5.New() + h.Write(pu.UncompressedData) + pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { - h := md5.New() - h.Write(pu.UncompressedData) - if receivedChecksum := base64.StdEncoding.EncodeToString(h.Sum(nil)); expectedChecksum != receivedChecksum { - e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, receivedChecksum) + if expectedChecksum != pu.ContentMd5 { + e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData)) return } } diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 9702cf939..575a72e40 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -168,7 +168,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, ver func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { - return fmt.Errorf("entry not found: offset %d found id %d size %d, expected size %d", offset, n.Id, n.Size, size) + return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { case Version1: diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 0650919c0..890d50586 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "encoding/base64" "fmt" "io" ) @@ -109,8 +110,28 @@ func HashToInt32(data []byte) (v int32) { return } -func Md5(data []byte) string { +func Base64Encode(data []byte) string { + return base64.StdEncoding.EncodeToString(data) +} + +func Base64Md5(data []byte) string { + return Base64Encode(Md5(data)) +} + +func Md5(data []byte) []byte { hash := md5.New() hash.Write(data) - return fmt.Sprintf("%x", hash.Sum(nil)) + return hash.Sum(nil) +} + +func Md5String(data []byte) string { + return fmt.Sprintf("%x", Md5(data)) +} + +func Base64Md5ToBytes(contentMd5 string) []byte { + data, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + return nil + } + return data } diff --git a/weed/util/constants.go b/weed/util/constants.go index 9f0e00506..10955acde 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 87) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 88) COMMIT = "" )