diff --git a/weed/command/backup.go b/weed/command/backup.go index 207df770b..4c5a2d820 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.Lookup(func() string { return *s.master }, vid.String()) + lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 0372e47b0..ea68f8763 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -79,16 +79,21 @@ func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err erro return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil } +func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error) { + results, err := LookupVolumeIds(masterFn, grpcDialOption, []string{vid}) + return results[vid], err +} + // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { - ret := make(map[string]LookupResult) +func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error) { + ret := make(map[string]*LookupResult) var unknown_vids []string //check vid cache first for _, vid := range vids { - locations, cache_err := vc.Get(vid) - if cache_err == nil { - ret[vid] = LookupResult{VolumeId: vid, Locations: locations} + locations, cacheErr := vc.Get(vid) + if cacheErr == nil { + ret[vid] = &LookupResult{VolumeId: vid, Locations: locations} } else { unknown_vids = append(unknown_vids, vid) } @@ -122,7 +127,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids if vidLocations.Error != "" { vc.Set(vidLocations.VolumeId, locations, 10*time.Minute) } - ret[vidLocations.VolumeId] = LookupResult{ + ret[vidLocations.VolumeId] = &LookupResult{ VolumeId: vidLocations.VolumeId, Locations: locations, Error: vidLocations.Error, diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index 79a5b3812..e3f2c0664 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -13,7 +13,7 @@ import ( func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { // find volume location, replication, ttl info - lookup, err := Lookup(masterFn, vid.String()) + lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String()) if err != nil { return fmt.Errorf("look up volume %d: %v", vid, err) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 8602b0854..29428730e 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -65,7 +65,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) return } - lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) + lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err != nil || len(lookupResult.Locations) <= 0 { glog.V(0).Infoln("lookup error:", err, r.URL.Path) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index aeb7d6e65..bda8b986e 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -53,7 +53,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r) + isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r) // http 204 status code does not allow body if writeError == nil && isUnchanged { @@ -146,7 +146,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } } - _, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r) + _, err := topology.ReplicatedDelete(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, n, r) writeDeleteResult(err, count, w, r) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 6d68bb26f..061c5a12c 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "net/http" "net/url" "strconv" @@ -18,7 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { +func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -27,7 +28,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { // this is the initial request - remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterFn) + remoteLocations, err = getWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -92,16 +93,14 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, s *storage.Store, volumeId return } -func ReplicatedDelete(masterFn operation.GetMasterFn, store *storage.Store, - volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size types.Size, err error) { +func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size types.Size, err error) { //check JWT jwt := security.GetJwt(r) var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { - remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterFn) + remoteLocations, err = getWritableRemoteReplications(store, grpcDialOption, volumeId, masterFn) if err != nil { glog.V(0).Infoln(err) return @@ -161,8 +160,7 @@ func DistributedOperation(locations []operation.Location, op func(location opera return ret.Error() } -func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterFn operation.GetMasterFn) ( - remoteLocations []operation.Location, err error) { +func getWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, masterFn operation.GetMasterFn) (remoteLocations []operation.Location, err error) { v := s.GetVolume(volumeId) if v != nil && v.ReplicaPlacement.GetCopyCount() == 1 { @@ -170,7 +168,7 @@ func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, m } // not on local store, or has replications - lookupResult, lookupErr := operation.Lookup(masterFn, volumeId.String()) + lookupResult, lookupErr := operation.LookupVolumeId(masterFn, grpcDialOption, volumeId.String()) if lookupErr == nil { selfUrl := s.Ip + ":" + strconv.Itoa(s.Port) for _, location := range lookupResult.Locations {