mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #3350 from shichanglin5/optimize_masterclient_vidmap
Solve the problem that `LookupFileId` lookup urls is empty due to lea…
This commit is contained in:
commit
0716092b39
|
@ -24,18 +24,20 @@ type MasterClient struct {
|
||||||
grpcDialOption grpc.DialOption
|
grpcDialOption grpc.DialOption
|
||||||
|
|
||||||
vidMap
|
vidMap
|
||||||
|
vidMapCacheSize int
|
||||||
|
|
||||||
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
|
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
|
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
|
||||||
return &MasterClient{
|
return &MasterClient{
|
||||||
FilerGroup: filerGroup,
|
FilerGroup: filerGroup,
|
||||||
clientType: clientType,
|
clientType: clientType,
|
||||||
clientHost: clientHost,
|
clientHost: clientHost,
|
||||||
masters: masters,
|
masters: masters,
|
||||||
grpcDialOption: grpcDialOption,
|
grpcDialOption: grpcDialOption,
|
||||||
vidMap: newVidMap(clientDataCenter),
|
vidMap: newVidMap(clientDataCenter),
|
||||||
|
vidMapCacheSize: 5,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,10 +177,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
|
||||||
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
|
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
mc.vidMap = newVidMap("")
|
//mc.vidMap = newVidMap("")
|
||||||
|
mc.resetVidMap()
|
||||||
mc.updateVidMap(resp)
|
mc.updateVidMap(resp)
|
||||||
} else {
|
} else {
|
||||||
mc.vidMap = newVidMap("")
|
mc.resetVidMap()
|
||||||
|
//mc.vidMap = newVidMap("")
|
||||||
}
|
}
|
||||||
mc.currentMaster = master
|
mc.currentMaster = master
|
||||||
|
|
||||||
|
@ -263,3 +267,17 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *MasterClient) resetVidMap() {
|
||||||
|
tail := &vidMap{vid2Locations: mc.vid2Locations, ecVid2Locations: mc.ecVid2Locations, cache: mc.cache}
|
||||||
|
mc.vidMap = newVidMap("")
|
||||||
|
mc.vidMap.cache = tail
|
||||||
|
|
||||||
|
for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
|
||||||
|
if i == mc.vidMapCacheSize-1 {
|
||||||
|
tail.cache = nil
|
||||||
|
} else {
|
||||||
|
tail = tail.cache
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ type vidMap struct {
|
||||||
ecVid2Locations map[uint32][]Location
|
ecVid2Locations map[uint32][]Location
|
||||||
DataCenter string
|
DataCenter string
|
||||||
cursor int32
|
cursor int32
|
||||||
|
cache *vidMap
|
||||||
}
|
}
|
||||||
|
|
||||||
func newVidMap(dataCenter string) vidMap {
|
func newVidMap(dataCenter string) vidMap {
|
||||||
|
@ -119,17 +120,29 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
|
func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
|
||||||
|
glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
|
||||||
|
locations, found = vc.getLocations(vid)
|
||||||
|
if found && len(locations) > 0 {
|
||||||
|
return locations, found
|
||||||
|
}
|
||||||
|
|
||||||
|
if vc.cache != nil {
|
||||||
|
return vc.cache.GetLocations(vid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) {
|
||||||
vc.RLock()
|
vc.RLock()
|
||||||
defer vc.RUnlock()
|
defer vc.RUnlock()
|
||||||
|
|
||||||
glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
|
|
||||||
|
|
||||||
locations, found = vc.vid2Locations[vid]
|
locations, found = vc.vid2Locations[vid]
|
||||||
if found && len(locations) > 0 {
|
if found && len(locations) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
locations, found = vc.ecVid2Locations[vid]
|
locations, found = vc.ecVid2Locations[vid]
|
||||||
return locations, found && len(locations) > 0
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vc *vidMap) addLocation(vid uint32, location Location) {
|
func (vc *vidMap) addLocation(vid uint32, location Location) {
|
||||||
|
@ -177,6 +190,10 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
|
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
|
||||||
|
if vc.cache != nil {
|
||||||
|
vc.cache.deleteLocation(vid, location)
|
||||||
|
}
|
||||||
|
|
||||||
vc.Lock()
|
vc.Lock()
|
||||||
defer vc.Unlock()
|
defer vc.Unlock()
|
||||||
|
|
||||||
|
@ -193,10 +210,13 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
|
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
|
||||||
|
if vc.cache != nil {
|
||||||
|
vc.cache.deleteLocation(vid, location)
|
||||||
|
}
|
||||||
|
|
||||||
vc.Lock()
|
vc.Lock()
|
||||||
defer vc.Unlock()
|
defer vc.Unlock()
|
||||||
|
|
||||||
|
@ -213,5 +233,4 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,9 @@ package wdclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,6 +62,76 @@ func TestLocationIndex(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLookupFileId(t *testing.T) {
|
||||||
|
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", nil)
|
||||||
|
length := 5
|
||||||
|
|
||||||
|
//Construct a cache linked list of length 5
|
||||||
|
for i := 0; i < length; i++ {
|
||||||
|
mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)})
|
||||||
|
mc.resetVidMap()
|
||||||
|
}
|
||||||
|
for i := 0; i < length; i++ {
|
||||||
|
locations, found := mc.GetLocations(uint32(i))
|
||||||
|
if !found || len(locations) != 1 || locations[0].Url != strconv.FormatInt(int64(i), 10) {
|
||||||
|
t.Fatalf("urls of vid=%d is not valid.", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//When continue to add nodes to the linked list, the previous node will be deleted, and the cache of the response will be gone.
|
||||||
|
for i := length; i < length+5; i++ {
|
||||||
|
mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)})
|
||||||
|
mc.resetVidMap()
|
||||||
|
}
|
||||||
|
for i := 0; i < length; i++ {
|
||||||
|
locations, found := mc.GetLocations(uint32(i))
|
||||||
|
if found {
|
||||||
|
t.Fatalf("urls of vid[%d] should not exists, but found: %v", i, locations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//The delete operation will be applied to all cache nodes
|
||||||
|
_, found := mc.GetLocations(uint32(length))
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("urls of vid[%d] not found", length)
|
||||||
|
}
|
||||||
|
|
||||||
|
//If the locations of the current node exist, return directly
|
||||||
|
newUrl := "abc"
|
||||||
|
mc.addLocation(uint32(length), Location{Url: newUrl})
|
||||||
|
locations, found := mc.GetLocations(uint32(length))
|
||||||
|
if !found || locations[0].Url != newUrl {
|
||||||
|
t.Fatalf("urls of vid[%d] not found", length)
|
||||||
|
}
|
||||||
|
|
||||||
|
//After delete `abc`, cache nodes are searched
|
||||||
|
deleteLoc := Location{Url: newUrl}
|
||||||
|
mc.deleteLocation(uint32(length), deleteLoc)
|
||||||
|
locations, found = mc.GetLocations(uint32(length))
|
||||||
|
if found && locations[0].Url != strconv.FormatInt(int64(length), 10) {
|
||||||
|
t.Fatalf("urls of vid[%d] not expected", length)
|
||||||
|
}
|
||||||
|
|
||||||
|
//lock: concurrent test
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
_, _ = mc.GetLocations(uint32(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
mc.addLocation(uint32(i), Location{})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkLocationIndex(b *testing.B) {
|
func BenchmarkLocationIndex(b *testing.B) {
|
||||||
b.SetParallelism(8)
|
b.SetParallelism(8)
|
||||||
vm := vidMap{
|
vm := vidMap{
|
||||||
|
|
Loading…
Reference in a new issue