mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
refactoring
This commit is contained in:
parent
7192a378cc
commit
df8d976bb0
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
|
||||||
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
"github.com/chrislusf/seaweedfs/weed/wdclient"
|
||||||
"github.com/golang/groupcache/singleflight"
|
"github.com/golang/groupcache/singleflight"
|
||||||
|
@ -45,9 +46,8 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
||||||
locations, found := vidCache[vid]
|
locations, found := vidCache[vid]
|
||||||
vicCacheLock.RUnlock()
|
vicCacheLock.RUnlock()
|
||||||
|
|
||||||
waitTime := time.Second
|
if !found {
|
||||||
for !found && waitTime < ReadWaitTime {
|
util.Retry("lookup volume "+vid, ReadWaitTime, func() error {
|
||||||
// println("looking up volume", vid)
|
|
||||||
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
||||||
VolumeIds: []string{vid},
|
VolumeIds: []string{vid},
|
||||||
|
@ -67,12 +67,8 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
return err
|
||||||
break
|
})
|
||||||
}
|
|
||||||
glog.V(1).Infof("wait for volume %s", vid)
|
|
||||||
time.Sleep(waitTime)
|
|
||||||
waitTime += waitTime / 2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -3,8 +3,6 @@ package meta_cache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
@ -18,7 +16,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
|
||||||
|
|
||||||
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
||||||
|
|
||||||
for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
|
util.Retry("ReadDirAllEntries", filer.ReadWaitTime, func() error {
|
||||||
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
||||||
entry := filer.FromPbEntry(string(dirPath), pbEntry)
|
entry := filer.FromPbEntry(string(dirPath), pbEntry)
|
||||||
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
|
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
|
||||||
|
@ -30,17 +28,13 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
return err
|
||||||
break
|
})
|
||||||
}
|
|
||||||
if strings.Contains(err.Error(), "transport: ") {
|
if err != nil {
|
||||||
glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
|
|
||||||
time.Sleep(waitTime)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err = fmt.Errorf("list %s: %v", dirPath, err)
|
err = fmt.Errorf("list %s: %v", dirPath, err)
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue