Merge pull request #10 from chrislusf/master

sync
This commit is contained in:
hilimd 2020-08-03 18:49:24 +08:00 committed by GitHub
commit 6769d07604
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 112 additions and 48 deletions

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.3</version>
<version>1.4.4</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.3</version>
<version>1.4.4</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
<version>1.4.3</version>
<version>1.4.4</version>
<parent>
<groupId>org.sonatype.oss</groupId>

View file

@ -15,6 +15,7 @@ public class ChunkCache {
}
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.weakValues()
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}

View file

@ -76,8 +76,11 @@ public class FileChunkManifest {
LOG.debug("doFetchFullChunkData:{}", chunkView);
chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
}
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
ifchunk.getIsChunkManifest()){
// only cache manifest chunks
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData);
}
return chunkData;

View file

@ -18,14 +18,18 @@ public class Gzip {
return compressed;
}
public static byte[] decompress(byte[] compressed) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
GZIPInputStream gis = new GZIPInputStream(bis);
return readAll(gis);
public static byte[] decompress(byte[] compressed) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
GZIPInputStream gis = new GZIPInputStream(bis);
return readAll(gis);
} catch (Exception e) {
return compressed;
}
}
private static byte[] readAll(InputStream input) throws IOException {
try( ByteArrayOutputStream output = new ByteArrayOutputStream()){
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byte[] buffer = new byte[4096];
int n;
while (-1 != (n = input.read(buffer))) {

View file

@ -1,7 +1,10 @@
package seaweedfs.client;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
@ -78,7 +81,7 @@ public class SeaweedRead {
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
byte[] data = null;
@ -87,6 +90,18 @@ public class SeaweedRead {
try {
HttpEntity entity = response.getEntity();
Header contentEncodingHeader = entity.getContentEncoding();
if (contentEncodingHeader != null) {
HeaderElement[] encodings =contentEncodingHeader.getElements();
for (int i = 0; i < encodings.length; i++) {
if (encodings[i].getName().equalsIgnoreCase("gzip")) {
entity = new GzipDecompressingEntity(entity);
break;
}
}
}
data = EntityUtils.toByteArray(entity);
EntityUtils.consume(entity);
@ -96,10 +111,6 @@ public class SeaweedRead {
request.releaseConnection();
}
if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
try {
data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
@ -108,6 +119,10 @@ public class SeaweedRead {
}
}
if (chunkView.isCompressed) {
data = Gzip.decompress(data);
}
LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
return data;

View file

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>

View file

@ -127,7 +127,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<seaweedfs.client.version>1.4.3</seaweedfs.client.version>
<seaweedfs.client.version>1.4.4</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>

View file

@ -57,7 +57,7 @@ func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error)
if isCompressed {
var err error
if buffer, err = util.DecompressData(buffer); err != nil {
return nil, err
glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
}
}
cm := ChunkManifest{}

View file

@ -26,6 +26,8 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
// println(r.Method + " " + r.URL.Path)
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
@ -142,20 +144,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
if ext != ".gz" && ext != ".zst" {
if n.IsCompressed() {
if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
if n.Data, err = util.DecompressData(n.Data); err != nil {
glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
}
} else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) {
w.Header().Set("Content-Encoding", "zstd")
} else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) {
w.Header().Set("Content-Encoding", "gzip")
} else {
if n.Data, err = util.DecompressData(n.Data); err != nil {
glog.V(0).Infoln("uncompress error:", err, r.URL.Path)
}
if n.IsCompressed() {
if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
if n.Data, err = util.DecompressData(n.Data); err != nil {
glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
}
} else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) {
w.Header().Set("Content-Encoding", "zstd")
} else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) {
w.Header().Set("Content-Encoding", "gzip")
} else {
if n.Data, err = util.DecompressData(n.Data); err != nil {
glog.V(0).Infoln("uncompress error:", err, r.URL.Path)
}
}
}

View file

@ -8,6 +8,8 @@ import (
func NewDiskStatus(path string) (disk *volume_server_pb.DiskStatus) {
disk = &volume_server_pb.DiskStatus{Dir: path}
fillInDiskStatus(disk)
glog.V(2).Infof("read disk size: %v", disk)
if disk.PercentUsed > 95 {
glog.V(0).Infof("disk status: %v", disk)
}
return
}

View file

@ -44,7 +44,7 @@ type Needle struct {
}
func (n *Needle) String() (str string) {
str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime)
str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s Compressed:%v", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime, n.IsCompressed())
return
}
@ -81,6 +81,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
}
}
if pu.IsGzipped {
// println(r.URL.Path, "is set to compressed", pu.FileName, pu.IsGzipped, "dataSize", pu.OriginalDataSize)
n.SetIsCompressed()
}
if n.LastModified == 0 {

View file

@ -54,7 +54,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
pu.OriginalDataSize = len(pu.Data)
pu.UncompressedData = pu.Data
// println("received data", len(pu.Data), "isGzipped", pu.IsCompressed, "mime", pu.MimeType, "name", pu.FileName)
// println("received data", len(pu.Data), "isGzipped", pu.IsGzipped, "mime", pu.MimeType, "name", pu.FileName)
if pu.IsGzipped {
if unzipped, e := util.DecompressData(pu.Data); e == nil {
pu.OriginalDataSize = len(unzipped)
@ -72,7 +72,7 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
mimeType = ""
}
if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
// println("ext", ext, "iAmSure", iAmSure, "shouldGzip", shouldGzip, "mimeType", pu.MimeType)
// println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType)
if compressedData, err := util.GzipData(pu.Data); err == nil {
if len(compressedData)*10 < len(pu.Data)*9 {
pu.Data = compressedData

View file

@ -39,7 +39,7 @@ func DecompressData(input []byte) ([]byte, error) {
if IsZstdContent(input) {
return unzstdData(input)
}
return nil, fmt.Errorf("unsupported compression")
return input, fmt.Errorf("unsupported compression")
}
func ungzipData(input []byte) ([]byte, error) {

View file

@ -68,14 +68,28 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
func Get(url string) ([]byte, error) {
r, err := client.Get(url)
request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
defer response.Body.Close()
var reader io.ReadCloser
switch response.Header.Get("Content-Encoding") {
case "gzip":
reader, err = gzip.NewReader(response.Body)
defer reader.Close()
default:
reader = response.Body
}
b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
return nil, err
@ -269,7 +283,9 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
return err
}
if !isFullChunk {
if isFullChunk {
req.Header.Add("Accept-Encoding", "gzip")
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
@ -282,13 +298,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
contentEncoding := r.Header.Get("Content-Encoding")
switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
default:
reader = r.Body
}
var (
m int
)
buf := make([]byte, 64*1024)
for {
m, err = r.Body.Read(buf)
m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
return nil
@ -312,7 +338,7 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
if err != nil {
return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
}
}
if len(decryptedData) < int(offset)+size {
@ -334,6 +360,8 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
}
if rangeHeader != "" {
req.Header.Add("Range", rangeHeader)
} else {
req.Header.Add("Accept-Encoding", "gzip")
}
r, err := client.Do(req)
@ -344,7 +372,17 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
return r.Body, nil
var reader io.ReadCloser
contentEncoding := r.Header.Get("Content-Encoding")
switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
default:
reader = r.Body
}
return reader, nil
}
func CloseResponse(resp *http.Response) {