Merge pull request #6 from chrislusf/master

sync
This commit is contained in:
hilimd 2020-07-30 09:46:44 +08:00 committed by GitHub
commit 4a88cca543
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 35 additions and 20 deletions

View file

@ -40,7 +40,7 @@ linux: deps
mkdir -p linux mkdir -p linux
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o linux/$(BINARY) $(SOURCE_DIR) GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -ldflags "$(LDFLAGS)" -o linux/$(BINARY) $(SOURCE_DIR)
release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_build 5_byte_darwin_build 5_byte_windows_build release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_build 5_byte_arm64_build 5_byte_darwin_build 5_byte_windows_build
##### LINUX BUILDS ##### ##### LINUX BUILDS #####
5_byte_linux_build: 5_byte_linux_build:
@ -55,6 +55,14 @@ release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_buil
$(call build_large,windows,amd64,.exe) $(call build_large,windows,amd64,.exe)
$(call zip_large,windows,amd64,.exe) $(call zip_large,windows,amd64,.exe)
5_byte_arm_build: $(sources)
$(call build_large,linux,arm,)
$(call tar_large,linux,arm)
5_byte_arm64_build: $(sources)
$(call build_large,linux,arm64,)
$(call tar_large,linux,arm64)
linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz
build/linux_386.tar.gz: $(sources) build/linux_386.tar.gz: $(sources)

View file

@ -6,7 +6,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public class FileChunkManifest { public class FileChunkManifest {
@ -51,13 +50,17 @@ public class FileChunkManifest {
private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
String vid = "" + chunk.getFid().getVolumeId(); String vid = "" + chunk.getFid().getVolumeId();
FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid);
if (locations == null) {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
lookupRequest.addVolumeIds(vid); lookupRequest.addVolumeIds(vid);
FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
.getBlockingStub().lookupVolume(lookupRequest.build()); .getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); locations = lookupResponse.getLocationsMapMap().get(vid);
FilerProto.Locations locations = vid2Locations.get(vid); filerGrpcClient.vidLocations.put(vid, locations);
LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
}
SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView(
FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId()

View file

@ -9,6 +9,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class FilerGrpcClient { public class FilerGrpcClient {
@ -24,6 +26,7 @@ public class FilerGrpcClient {
} }
} }
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
private final ManagedChannel channel; private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;

View file

@ -15,7 +15,7 @@ public class SeaweedRead {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
static ChunkCache chunkCache = new ChunkCache(0); static ChunkCache chunkCache = new ChunkCache(4);
// returns bytesRead // returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
@ -62,6 +62,7 @@ public class SeaweedRead {
if (chunkData == null) { if (chunkData == null) {
chunkData = doFetchFullChunkData(chunkView, locations); chunkData = doFetchFullChunkData(chunkView, locations);
chunkCache.setChunk(chunkView.fileId, chunkData);
} }
int len = (int) chunkView.size; int len = (int) chunkView.size;
@ -69,8 +70,6 @@ public class SeaweedRead {
chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len);
System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
chunkCache.setChunk(chunkView.fileId, chunkData);
return len; return len;
} }
@ -109,6 +108,8 @@ public class SeaweedRead {
} }
} }
LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length);
return data; return data;
} }

View file

@ -61,9 +61,6 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
return FilerProto.FileChunk.newBuilder() return FilerProto.FileChunk.newBuilder()

View file

@ -9,7 +9,6 @@ type MountOptions struct {
filerMountRootPath *string filerMountRootPath *string
dir *string dir *string
dirAutoCreate *bool dirAutoCreate *bool
dirListCacheLimit *int64
collection *string collection *string
replication *string replication *string
ttlSec *int ttlSec *int
@ -35,7 +34,6 @@ func init() {
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server") mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")

View file

@ -165,7 +165,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
CacheDir: *option.cacheDir, CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB, CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter, DataCenter: *option.dataCenter,
DirListCacheLimit: *option.dirListCacheLimit,
EntryCacheTtl: 3 * time.Second, EntryCacheTtl: 3 * time.Second,
MountUid: uid, MountUid: uid,
MountGid: gid, MountGid: gid,

View file

@ -151,6 +151,7 @@ func (s3opt *S3Options) startS3Server() bool {
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: *s3opt.filer, Filer: *s3opt.filer,
Port: *s3opt.port,
FilerGrpcAddress: filerGrpcAddress, FilerGrpcAddress: filerGrpcAddress,
Config: *s3opt.config, Config: *s3opt.config,
DomainName: *s3opt.domainName, DomainName: *s3opt.domainName,

View file

@ -3,6 +3,7 @@ package filesys
import ( import (
"bytes" "bytes"
"context" "context"
"math"
"os" "os"
"strings" "strings"
"time" "time"
@ -277,7 +278,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirPath := util.FullPath(dir.FullPath()) dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
if listErr != nil { if listErr != nil {
glog.Errorf("list meta cache: %v", listErr) glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO return nil, fuse.EIO

View file

@ -34,7 +34,6 @@ type Option struct {
CacheDir string CacheDir string
CacheSizeMB int64 CacheSizeMB int64
DataCenter string DataCenter string
DirListCacheLimit int64
EntryCacheTtl time.Duration EntryCacheTtl time.Duration
Umask os.FileMode Umask os.FileMode

View file

@ -1,6 +1,7 @@
package s3api package s3api
import ( import (
"fmt"
"net/http" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -9,6 +10,7 @@ import (
type S3ApiServerOption struct { type S3ApiServerOption struct {
Filer string Filer string
Port int
FilerGrpcAddress string FilerGrpcAddress string
Config string Config string
DomainName string DomainName string
@ -37,7 +39,10 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
apiRouter := router.PathPrefix("/").Subrouter() apiRouter := router.PathPrefix("/").Subrouter()
var routers []*mux.Router var routers []*mux.Router
if s3a.option.DomainName != "" { if s3a.option.DomainName != "" {
routers = append(routers, apiRouter.Host("{bucket:.+}."+s3a.option.DomainName).Subrouter()) routers = append(routers, apiRouter.Host(
fmt.Sprintf("%s.%s:%d", "{bucket:.+}", s3a.option.DomainName, s3a.option.Port)).Subrouter())
routers = append(routers, apiRouter.Host(
fmt.Sprintf("%s.%s", "{bucket:.+}", s3a.option.DomainName)).Subrouter())
} }
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter()) routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())