diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 57233e86d..4d8f93bff 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.3 + 1.4.4 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 57233e86d..4d8f93bff 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.3 + 1.4.4 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 67bab76ea..bb2ba5e74 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.3 + 1.4.4 org.sonatype.oss diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index 58870d742..7afa2dca0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -15,6 +15,7 @@ public class ChunkCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) + .weakValues() .expireAfterAccess(1, TimeUnit.HOURS) .build(); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 28c2f47fc..1248ff13f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -76,8 +76,11 @@ public class FileChunkManifest { LOG.debug("doFetchFullChunkData:{}", chunkView); chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); } - LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); - SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + if(chunk.getIsChunkManifest()){ + // only cache manifest chunks + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + } return chunkData; diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java index 248285dd3..4909094f5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -18,14 +18,18 @@ public class Gzip { return compressed; } - public static byte[] decompress(byte[] compressed) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(compressed); - GZIPInputStream gis = new GZIPInputStream(bis); - return readAll(gis); + public static byte[] decompress(byte[] compressed) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } catch (Exception e) { + return compressed; + } } private static byte[] readAll(InputStream input) throws IOException { - try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { byte[] buffer = new byte[4096]; int n; while (-1 != (n = input.read(buffer))) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 05457ed48..cd2f55678 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -1,7 +1,10 @@ package seaweedfs.client; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; +import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.util.EntityUtils; @@ -78,7 +81,7 @@ public class SeaweedRead { HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); byte[] data = null; @@ -87,6 +90,18 @@ public class SeaweedRead { try { HttpEntity entity = response.getEntity(); + Header contentEncodingHeader = entity.getContentEncoding(); + + if (contentEncodingHeader != null) { + HeaderElement[] encodings =contentEncodingHeader.getElements(); + for (int i = 0; i < encodings.length; i++) { + if (encodings[i].getName().equalsIgnoreCase("gzip")) { + entity = new GzipDecompressingEntity(entity); + break; + } + } + } + data = EntityUtils.toByteArray(entity); EntityUtils.consume(entity); @@ -96,10 +111,6 @@ public class SeaweedRead { request.releaseConnection(); } - if (chunkView.isCompressed) { - data = Gzip.decompress(data); - } - if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { try { data = SeaweedCipher.decrypt(data, chunkView.cipherKey); @@ -108,6 +119,10 @@ public class SeaweedRead { } } + if (chunkView.isCompressed) { + data = Gzip.decompress(data); + } + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); return data; diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 8df1a6356..d00291c98 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.4.3 + 1.4.4 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 3406e12cb..6d9191727 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.3 + 1.4.4 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 5b13f7d02..0dcc49b3f 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -127,7 +127,7 @@ - 1.4.3 + 1.4.4 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index e5209105d..05a613759 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.3 + 1.4.4 3.1.1 diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 653b7bf13..1bac028ff 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -57,7 +57,7 @@ func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) if isCompressed { var err error if buffer, err = util.DecompressData(buffer); err != nil { - return nil, err + glog.V(0).Infof("fail to decompress chunk manifest: %v", err) } } cm := ChunkManifest{} diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 89b7445e9..d730600e4 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -26,6 +26,8 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { + // println(r.Method + " " + r.URL.Path) + stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() @@ -142,20 +144,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - if ext != ".gz" && ext != ".zst" { - if n.IsCompressed() { - if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize { - if n.Data, err = util.DecompressData(n.Data); err != nil { - glog.V(0).Infoln("ungzip error:", err, r.URL.Path) - } - } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) { - w.Header().Set("Content-Encoding", "zstd") - } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = util.DecompressData(n.Data); err != nil { - glog.V(0).Infoln("uncompress error:", err, r.URL.Path) - } + if n.IsCompressed() { + if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize { + if n.Data, err = util.DecompressData(n.Data); err != nil { + glog.V(0).Infoln("ungzip error:", err, r.URL.Path) + } + } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) { + w.Header().Set("Content-Encoding", "zstd") + } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = util.DecompressData(n.Data); err != nil { + glog.V(0).Infoln("uncompress error:", err, r.URL.Path) } } } diff --git a/weed/stats/disk.go b/weed/stats/disk.go index 8af7240a8..a8f906213 100644 --- a/weed/stats/disk.go +++ b/weed/stats/disk.go @@ -8,6 +8,8 @@ import ( func NewDiskStatus(path string) (disk *volume_server_pb.DiskStatus) { disk = &volume_server_pb.DiskStatus{Dir: path} fillInDiskStatus(disk) - glog.V(2).Infof("read disk size: %v", disk) + if disk.PercentUsed > 95 { + glog.V(0).Infof("disk status: %v", disk) + } return } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 7d02758d6..150d6ee4b 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -44,7 +44,7 @@ type Needle struct { } func (n *Needle) String() (str string) { - str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime) + str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s Compressed:%v", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime, n.IsCompressed()) return } @@ -81,6 +81,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit } } if pu.IsGzipped { + // println(r.URL.Path, "is set to compressed", pu.FileName, pu.IsGzipped, "dataSize", pu.OriginalDataSize) n.SetIsCompressed() } if n.LastModified == 0 { diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 785217761..dd678f87f 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -54,7 +54,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { pu.OriginalDataSize = len(pu.Data) pu.UncompressedData = pu.Data - // println("received data", len(pu.Data), "isGzipped", pu.IsCompressed, "mime", pu.MimeType, "name", pu.FileName) + // println("received data", len(pu.Data), "isGzipped", pu.IsGzipped, "mime", pu.MimeType, "name", pu.FileName) if pu.IsGzipped { if unzipped, e := util.DecompressData(pu.Data); e == nil { pu.OriginalDataSize = len(unzipped) @@ -72,7 +72,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { mimeType = "" } if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure { - // println("ext", ext, "iAmSure", iAmSure, "shouldGzip", shouldGzip, "mimeType", pu.MimeType) + // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType) if compressedData, err := util.GzipData(pu.Data); err == nil { if len(compressedData)*10 < len(pu.Data)*9 { pu.Data = compressedData diff --git a/weed/util/compression.go b/weed/util/compression.go index 4488e019e..2881a7bfd 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -39,7 +39,7 @@ func DecompressData(input []byte) ([]byte, error) { if IsZstdContent(input) { return unzstdData(input) } - return nil, fmt.Errorf("unsupported compression") + return input, fmt.Errorf("unsupported compression") } func ungzipData(input []byte) ([]byte, error) { diff --git a/weed/util/http_util.go b/weed/util/http_util.go index c67eb3276..5159fcd17 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -68,14 +68,28 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout func Get(url string) ([]byte, error) { - r, err := client.Get(url) + + request, err := http.NewRequest("GET", url, nil) + request.Header.Add("Accept-Encoding", "gzip") + + response, err := client.Do(request) if err != nil { return nil, err } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) + defer response.Body.Close() + + var reader io.ReadCloser + switch response.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(response.Body) + defer reader.Close() + default: + reader = response.Body + } + + b, err := ioutil.ReadAll(reader) + if response.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, response.Status) } if err != nil { return nil, err @@ -269,7 +283,9 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is return err } - if !isFullChunk { + if isFullChunk { + req.Header.Add("Accept-Encoding", "gzip") + } else { req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } @@ -282,13 +298,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is return fmt.Errorf("%s: %s", fileUrl, r.Status) } + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + var ( m int ) buf := make([]byte, 64*1024) for { - m, err = r.Body.Read(buf) + m, err = reader.Read(buf) fn(buf[:m]) if err == io.EOF { return nil @@ -312,7 +338,7 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool if isContentCompressed { decryptedData, err = DecompressData(decryptedData) if err != nil { - return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) + glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err) } } if len(decryptedData) < int(offset)+size { @@ -334,6 +360,8 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e } if rangeHeader != "" { req.Header.Add("Range", rangeHeader) + } else { + req.Header.Add("Accept-Encoding", "gzip") } r, err := client.Do(req) @@ -344,7 +372,17 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } - return r.Body, nil + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + + return reader, nil } func CloseResponse(resp *http.Response) {