Hadoop: fix entry not found for HCFS

also fix cipher related changes.
This commit is contained in:
Chris Lu 2020-04-26 05:21:54 -07:00
parent 0c2248f83a
commit b52b8ec685
4 changed files with 36 additions and 12 deletions

View file

@ -14,7 +14,7 @@ public class FilerClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
private FilerGrpcClient filerGrpcClient;
private final FilerGrpcClient filerGrpcClient;
public FilerClient(String host, int grpcPort) {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
@ -181,7 +181,7 @@ public class FilerClient {
.setLimit(limit)
.build());
List<FilerProto.Entry> entries = new ArrayList<>();
while (iter.hasNext()){
while (iter.hasNext()) {
FilerProto.ListEntriesResponse resp = iter.next();
entries.add(fixEntryAfterReading(resp.getEntry()));
}
@ -195,9 +195,12 @@ public class FilerClient {
.setDirectory(directory)
.setName(entryName)
.build()).getEntry();
if (entry == null) {
return null;
}
return fixEntryAfterReading(entry);
} catch (Exception e) {
if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){
if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) {
return null;
}
LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e);

View file

@ -99,11 +99,13 @@ public class SeaweedRead {
data = Gzip.decompress(data);
}
if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) {
try {
data = SeaweedCipher.decrypt(data, chunkView.cipherKey);
} catch (Exception e) {
throw new IOException("fail to decrypt", e);
}
}
return data;

View file

@ -36,7 +36,7 @@ public class SeaweedWrite {
String auth = response.getAuth();
String targetUrl = String.format("http://%s/%s", url, fileId);
ByteString cipherKeyString = null;
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
if (filerGrpcClient.isCipher()) {
cipherKey = genCipherKey();
@ -78,7 +78,7 @@ public class SeaweedWrite {
HttpClient client = new DefaultHttpClient();
InputStream inputStream = null;
if (cipherKey == null) {
if (cipherKey == null || cipherKey.length == 0) {
inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength);
} else {
try {

View file

@ -19,9 +19,11 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
if err == filer_pb.ErrNotFound {
return &filer_pb.LookupDirectoryEntryResponse{}, nil
return &filer_pb.LookupDirectoryEntryResponse{}, err
}
if err != nil {
glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
@ -41,6 +43,8 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
glog.V(4).Infof("ListEntries %v", req)
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
@ -135,6 +139,8 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
glog.V(4).Infof("CreateEntry %v", req)
resp = &filer_pb.CreateEntryResponse{}
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
@ -163,6 +169,8 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
glog.V(4).Infof("UpdateEntry %v", req)
fullpath := util.Join(req.Directory, req.Entry.Name)
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err != nil {
@ -219,6 +227,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).Infof("AppendToEntry %v", req)
fullpath := util.NewFullPath(req.Directory, req.EntryName)
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
@ -250,6 +260,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
glog.V(4).Infof("DeleteEntry %v", req)
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil {
@ -312,6 +325,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
glog.V(4).Infof("DeleteCollection %v", req)
err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
@ -353,12 +368,16 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
return &filer_pb.GetFilerConfigurationResponse{
t := &filer_pb.GetFilerConfigurationResponse{
Masters: fs.option.Masters,
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
}, nil
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
return t, nil
}