diff --git a/README.md b/README.md index 69857b524..8e73f811c 100644 --- a/README.md +++ b/README.md @@ -121,15 +121,16 @@ On top of the object store, optional [Filer] can support directories and POSIX a ## Filer Features ## * [Filer server][Filer] provides "normal" directories and files via http. -* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB. +* [File TTL][FilerTTL] automatically expires file metadata and actual file data. * [Mount filer][Mount] reads and writes files directly as a local directory via FUSE. +* [Filer Store Replication][FilerStoreReplication] enables HA for filer meta data stores. * [Active-Active Replication][ActiveActiveAsyncReplication] enables asynchronous one-way or two-way cross cluster continuous replication. * [Amazon S3 compatible API][AmazonS3API] accesses files with S3 tooling. * [Hadoop Compatible File System][Hadoop] accesses files from Hadoop/Spark/Flink/etc or even runs HBase. * [Async Replication To Cloud][BackupToCloud] has extremely fast local access and backups to Amazon S3, Google Cloud Storage, Azure, BackBlaze. * [WebDAV] accesses as a mapped drive on Mac and Windows, or from mobile devices. * [AES256-GCM Encrypted Storage][FilerDataEncryption] safely stores the encrypted data. -* [File TTL][FilerTTL] automatically purges file metadata and actual file data. +* [Super Large Files][SuperLargeFiles] stores large or super large files in tens of TB. * [Kubernetes CSI Driver][SeaweedFsCsiDriver] A Container Storage Interface (CSI) Driver. [![Docker Pulls](https://img.shields.io/docker/pulls/chrislusf/seaweedfs-csi-driver.svg?maxAge=4800)](https://hub.docker.com/r/chrislusf/seaweedfs-csi-driver/) [Filer]: https://github.com/chrislusf/seaweedfs/wiki/Directories-and-Files @@ -146,6 +147,7 @@ On top of the object store, optional [Filer] can support directories and POSIX a [VolumeServerTTL]: https://github.com/chrislusf/seaweedfs/wiki/Store-file-with-a-Time-To-Live [SeaweedFsCsiDriver]: https://github.com/seaweedfs/seaweedfs-csi-driver [ActiveActiveAsyncReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Active-Active-cross-cluster-continuous-synchronization +[FilerStoreReplication]: https://github.com/chrislusf/seaweedfs/wiki/Filer-Store-Replication [Back to TOC](#table-of-contents) diff --git a/go.mod b/go.mod index 51c24bcf0..ab3da6f20 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/json-iterator/go v1.1.10 github.com/karlseguin/ccache v2.0.3+incompatible github.com/karlseguin/expect v1.0.1 // indirect - github.com/klauspost/compress v1.10.9 + github.com/klauspost/compress v1.10.9 // indirect github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/crc32 v1.2.0 github.com/klauspost/reedsolomon v1.9.2 diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 5a4bbaead..4bfc5ab8f 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.4 + 1.5.6 org.sonatype.oss @@ -68,6 +68,11 @@ 4.13.1 test + + javax.annotation + javax.annotation-api + 1.3.2 + diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index e24bcca9f..c3c960a28 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.4 + 1.5.6 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 724c3c3b9..acdf621a5 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.4 + 1.5.6 org.sonatype.oss diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index 7cbb56ec3..f7c48d0ab 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ com.github.chrislusf seaweedfs-client - 1.5.4 + 1.5.6 compile com.github.chrislusf seaweedfs-hadoop2-client - 1.5.4 + 1.5.6 compile diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index b598d8402..f7873a435 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ - 1.5.4 + 1.5.6 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index de518a0dc..bda0eba56 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.5.4 + 1.5.6 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 262c3ca80..20b52e20f 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ - 1.5.4 + 1.5.6 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 1952305e9..85d8db859 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.5.4 + 1.5.6 3.1.1 diff --git a/unmaintained/s3/benchmark/hsbench.sh b/unmaintained/s3/benchmark/hsbench.sh new file mode 100755 index 000000000..285b51405 --- /dev/null +++ b/unmaintained/s3/benchmark/hsbench.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +hsbench -a accesstoken -s secret -z 4K -d 10 -t 10 -b 10 -u http://localhost:8333 -m "cxipgdx" -bp "hsbench-" diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 69219fbfa..4415d45d9 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -47,7 +47,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR } if shouldDeleteChunks && !isCollection { - go f.DeleteChunks(chunks) + f.DirectDeleteChunks(chunks) } // A case not handled: // what if the chunk is in a different collection? diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 126d162ec..9eee38277 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -68,6 +68,50 @@ func (f *Filer) loopProcessingDeletion() { } } +func (f *Filer) doDeleteFileIds(fileIds []string) { + + lookupFunc := LookupByMasterClientFn(f.MasterClient) + DeletionBatchSize := 100000 // roughly 20 bytes cost per file id. + + for len(fileIds) > 0 { + var toDeleteFileIds []string + if len(fileIds) > DeletionBatchSize { + toDeleteFileIds = fileIds[:DeletionBatchSize] + fileIds = fileIds[DeletionBatchSize:] + } else { + toDeleteFileIds = fileIds + fileIds = fileIds[:0] + } + deletionCount := len(toDeleteFileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + if err != nil { + if !strings.Contains(err.Error(), "already deleted") { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } + } + } +} + +func (f *Filer) DirectDeleteChunks(chunks []*filer_pb.FileChunk) { + var fildIdsToDelete []string + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + fildIdsToDelete = append(fildIdsToDelete, dChunk.GetFileIdString()) + } + fildIdsToDelete = append(fildIdsToDelete, chunk.GetFileIdString()) + } + + f.doDeleteFileIds(fildIdsToDelete) +} + func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { if !chunk.IsChunkManifest { diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 177c620f4..aba89616e 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -75,6 +75,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "") } + /* { mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) @@ -98,6 +99,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { zstdData, _ := util.ZstdData([]byte(textContent)) Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "") } + */ } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 7ea49f2c6..401e2f96c 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -113,12 +113,6 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque bucket, object := getBucketAndObject(r) - response, _ := s3a.listFilerEntries(bucket, object, 1, "", "/") - if len(response.Contents) != 0 && strings.HasSuffix(object, "/") { - w.WriteHeader(http.StatusNoContent) - return - } - destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true", s3a.option.Filer, s3a.option.BucketsPath, bucket, object) @@ -266,11 +260,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des resp, postErr := client.Do(proxyReq) - if (resp.ContentLength == -1 || resp.StatusCode == 404) && !strings.HasSuffix(destUrl, "/") { - writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL) - return - } - if postErr != nil { glog.Errorf("post to filer: %v", postErr) writeErrorResponse(w, s3err.ErrInternalError, r.URL) @@ -278,6 +267,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des } defer util.CloseResponse(resp) + if (resp.ContentLength == -1 || resp.StatusCode == 404) && !strings.HasSuffix(destUrl, "/") { + writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL) + return + } + responseFn(resp, w) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 15fd446e7..5a5495df6 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -159,8 +159,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if n.Data, err = util.DecompressData(n.Data); err != nil { glog.V(0).Infoln("ungzip error:", err, r.URL.Path) } - } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) { - w.Header().Set("Content-Encoding", "zstd") + // } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) { + // w.Header().Set("Content-Encoding", "zstd") } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) { w.Header().Set("Content-Encoding", "gzip") } else { diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 28b9cebbd..5777d5780 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -22,7 +22,7 @@ func (c *commandCollectionDelete) Name() string { func (c *commandCollectionDelete) Help() string { return `delete specified collection - collection.delete -collectin -force + collection.delete -collection -force ` } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index a097a3a4e..e0525defa 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -72,8 +72,9 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W bytes, _ := proto.Marshal(respLookupEntry.Entry) gzippedBytes, _ := util.GzipData(bytes) - zstdBytes, _ := util.ZstdData(bytes) - fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes)) + // zstdBytes, _ := util.ZstdData(bytes) + // fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes)) + fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes)) return nil diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 53222ca29..928dec02a 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -306,16 +306,16 @@ func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*Vol dcs[targetNode.dc] = true racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++ - if len(dcs) > placement.DiffDataCenterCount+1 { + if len(dcs) != placement.DiffDataCenterCount+1 { return false } - if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 { + if len(racks) != placement.DiffRackCount+placement.DiffDataCenterCount+1 { return false } for _, sameRackCount := range racks { - if sameRackCount > placement.SameRackCount+1 { + if sameRackCount != placement.SameRackCount+1 { return false } } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go index 9e154dc00..696bc7fac 100644 --- a/weed/shell/command_volume_balance_test.go +++ b/weed/shell/command_volume_balance_test.go @@ -20,6 +20,22 @@ func TestIsGoodMove(t *testing.T) { var tests = []testMoveCase{ + { + name: "test 100 move to wrong data centers", + replication: "100", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + targetLocation: location{"dc2", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: false, + }, + { name: "test 100 move to spread into proper data centers", replication: "100", diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 4d244046e..84e69c73e 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -23,7 +23,7 @@ type ParsedUpload struct { MimeType string PairMap map[string]string IsGzipped bool - IsZstd bool + // IsZstd bool OriginalDataSize int ModifiedTime uint64 Ttl *TTL @@ -100,7 +100,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" - pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" + // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" pu.MimeType = r.Header.Get("Content-Type") pu.FileName = "" pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1)) @@ -194,7 +194,7 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error } pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" - pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" + // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" } return diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index ffe36e95b..4c9c84459 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -132,6 +132,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi } func (vl *VolumeLayout) String() string { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) } diff --git a/weed/util/compression.go b/weed/util/compression.go index cf3ac7c57..33506834b 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -9,7 +9,7 @@ import ( "strings" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/klauspost/compress/zstd" + // "github.com/klauspost/compress/zstd" ) var ( @@ -55,19 +55,16 @@ func GzipData(input []byte) ([]byte, error) { return buf.Bytes(), nil } -var zstdEncoder, _ = zstd.NewWriter(nil) - -func ZstdData(input []byte) ([]byte, error) { - return zstdEncoder.EncodeAll(input, nil), nil -} func DecompressData(input []byte) ([]byte, error) { if IsGzippedContent(input) { return ungzipData(input) } + /* if IsZstdContent(input) { return unzstdData(input) } + */ return input, UnsupportedCompression } @@ -82,12 +79,6 @@ func ungzipData(input []byte) ([]byte, error) { return output, err } -var decoder, _ = zstd.NewReader(nil) - -func unzstdData(input []byte) ([]byte, error) { - return decoder.DecodeAll(input, nil) -} - func IsGzippedContent(data []byte) bool { if len(data) < 2 { return false @@ -95,12 +86,26 @@ func IsGzippedContent(data []byte) bool { return data[0] == 31 && data[1] == 139 } +/* +var zstdEncoder, _ = zstd.NewWriter(nil) + +func ZstdData(input []byte) ([]byte, error) { + return zstdEncoder.EncodeAll(input, nil), nil +} + +var decoder, _ = zstd.NewReader(nil) + +func unzstdData(input []byte) ([]byte, error) { + return decoder.DecodeAll(input, nil) +} + func IsZstdContent(data []byte) bool { if len(data) < 4 { return false } return data[3] == 0xFD && data[2] == 0x2F && data[1] == 0xB5 && data[0] == 0x28 } +*/ /* * Default not to compressed since compression can be done on client side.