From bb31462b52d24bc7ebf54266b45801c198685f70 Mon Sep 17 00:00:00 2001 From: divinerapier Date: Fri, 13 Sep 2019 20:06:02 +0800 Subject: [PATCH 1/4] fix: thread unsafe Signed-off-by: divinerapier --- weed/wdclient/vid_map.go | 26 +++++++++--- weed/wdclient/vid_map_test.go | 77 +++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 weed/wdclient/vid_map_test.go diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 01d9cdaed..7a3f50aad 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -3,11 +3,11 @@ package wdclient import ( "errors" "fmt" - "math/rand" + "math" "strconv" "strings" "sync" - "time" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -20,16 +20,27 @@ type Location struct { type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location - r *rand.Rand + + cursor int64 } func newVidMap() vidMap { return vidMap{ vid2Locations: make(map[uint32][]Location), - r: rand.New(rand.NewSource(time.Now().UnixNano())), + cursor: -1, } } +func (vc *vidMap) getLocationIndex(length int64) (int64, error) { + if length <= 0 { + return 0, fmt.Errorf("invalid length: %d", length) + } + if atomic.LoadInt64(&vc.cursor) == math.MaxInt64 { + atomic.CompareAndSwapInt64(&vc.cursor, math.MaxInt64, -1) + } + return atomic.AddInt64(&vc.cursor, 1) % length, nil +} + func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { id, err := strconv.Atoi(vid) if err != nil { @@ -94,7 +105,12 @@ func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { return "", fmt.Errorf("volume %d not found", vid) } - return locations[vc.r.Intn(len(locations))].Url, nil + index, err := vc.getLocationIndex(int64(len(locations))) + if err != nil { + return "", fmt.Errorf("volume %d. %v", vid, err) + } + + return locations[index].Url, nil } func (vc *vidMap) addLocation(vid uint32, location Location) { diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go new file mode 100644 index 000000000..ae4680e7a --- /dev/null +++ b/weed/wdclient/vid_map_test.go @@ -0,0 +1,77 @@ +package wdclient + +import ( + "fmt" + "math" + "testing" +) + +func TestLocationIndex(t *testing.T) { + vm := vidMap{} + // test must be failed + mustFailed := func(length int64) { + _, err := vm.getLocationIndex(length) + if err == nil { + t.Errorf("length %d must be failed", length) + } + if err.Error() != fmt.Sprintf("invalid length: %d", length) { + t.Errorf("length %d must be failed. error: %v", length, err) + } + } + + mustFailed(-1) + mustFailed(0) + + mustOk := func(length, cursor, expect int64) { + if length <= 0 { + t.Fatal("please don't do this") + } + vm.cursor = cursor + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != expect { + t.Errorf("cursor: %d, length: %d, expect: %d, got: %d\n", cursor, length, expect, got) + return + } + } + + for i := int64(-1); i < 100; i++ { + mustOk(7, i, (i+1)%7) + } + + // when cursor reaches MaxInt64 + mustOk(7, math.MaxInt64, 0) + + // test with constructor + vm = newVidMap() + length := int64(7) + for i := int64(0); i < 100; i++ { + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != i%length { + t.Errorf("length: %d, i: %d, got: %d\n", length, i, got) + } + } +} + +func BenchmarkLocationIndex(b *testing.B) { + b.SetParallelism(8) + vm := vidMap{ + cursor: math.MaxInt64 - 10000, + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := vm.getLocationIndex(3) + if err != nil { + b.Error(err) + } + } + }) +} From ad3efbb19796b796b3eafe5af66efa5749059d02 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 14 Sep 2019 01:21:51 -0700 Subject: [PATCH 2/4] tweaking data types --- weed/wdclient/vid_map.go | 19 +++++++++++-------- weed/wdclient/vid_map_test.go | 17 ++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 7a3f50aad..97df49cb6 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -3,7 +3,6 @@ package wdclient import ( "errors" "fmt" - "math" "strconv" "strings" "sync" @@ -12,6 +11,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) +const ( + maxCursorIndex = 4096 +) + type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` @@ -21,7 +24,7 @@ type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location - cursor int64 + cursor int32 } func newVidMap() vidMap { @@ -31,14 +34,14 @@ func newVidMap() vidMap { } } -func (vc *vidMap) getLocationIndex(length int64) (int64, error) { +func (vc *vidMap) getLocationIndex(length int) (int, error) { if length <= 0 { return 0, fmt.Errorf("invalid length: %d", length) } - if atomic.LoadInt64(&vc.cursor) == math.MaxInt64 { - atomic.CompareAndSwapInt64(&vc.cursor, math.MaxInt64, -1) + if atomic.LoadInt32(&vc.cursor) == maxCursorIndex { + atomic.CompareAndSwapInt32(&vc.cursor, maxCursorIndex, -1) } - return atomic.AddInt64(&vc.cursor, 1) % length, nil + return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil } func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { @@ -105,9 +108,9 @@ func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { return "", fmt.Errorf("volume %d not found", vid) } - index, err := vc.getLocationIndex(int64(len(locations))) + index, err := vc.getLocationIndex(len(locations)) if err != nil { - return "", fmt.Errorf("volume %d. %v", vid, err) + return "", fmt.Errorf("volume %d: %v", vid, err) } return locations[index].Url, nil diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index ae4680e7a..87be2fc25 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -2,14 +2,13 @@ package wdclient import ( "fmt" - "math" "testing" ) func TestLocationIndex(t *testing.T) { vm := vidMap{} // test must be failed - mustFailed := func(length int64) { + mustFailed := func(length int) { _, err := vm.getLocationIndex(length) if err == nil { t.Errorf("length %d must be failed", length) @@ -22,11 +21,11 @@ func TestLocationIndex(t *testing.T) { mustFailed(-1) mustFailed(0) - mustOk := func(length, cursor, expect int64) { + mustOk := func(length, cursor, expect int) { if length <= 0 { t.Fatal("please don't do this") } - vm.cursor = cursor + vm.cursor = int32(cursor) got, err := vm.getLocationIndex(length) if err != nil { t.Errorf("length: %d, why? %v\n", length, err) @@ -38,17 +37,17 @@ func TestLocationIndex(t *testing.T) { } } - for i := int64(-1); i < 100; i++ { + for i := -1; i < 100; i++ { mustOk(7, i, (i+1)%7) } // when cursor reaches MaxInt64 - mustOk(7, math.MaxInt64, 0) + mustOk(7, maxCursorIndex, 0) // test with constructor vm = newVidMap() - length := int64(7) - for i := int64(0); i < 100; i++ { + length := 7 + for i := 0; i < 100; i++ { got, err := vm.getLocationIndex(length) if err != nil { t.Errorf("length: %d, why? %v\n", length, err) @@ -63,7 +62,7 @@ func TestLocationIndex(t *testing.T) { func BenchmarkLocationIndex(b *testing.B) { b.SetParallelism(8) vm := vidMap{ - cursor: math.MaxInt64 - 10000, + cursor: maxCursorIndex - 4000, } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { From 20ae55943758afea7b6d464e058c88d6886ab861 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Sep 2019 08:19:10 -0700 Subject: [PATCH 3/4] add logs add logs for https://github.com/chrislusf/seaweedfs/issues/1064 --- weed/s3api/filer_util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 1c3814fce..a9b8f833d 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -89,6 +89,7 @@ func (s3a *S3ApiServer) list(ctx context.Context, parentDirectoryPath, prefix, s glog.V(4).Infof("read directory: %v", request) resp, err := client.ListEntries(ctx, request) if err != nil { + glog.V(0).Infof("read directory %v: %v", request, err) return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err) } From 972e881d485a1a5b4587f47ef55bbbf5182837f7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 16 Sep 2019 09:48:30 -0700 Subject: [PATCH 4/4] add more logs add more logs --- weed/s3api/filer_util.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index a9b8f833d..b93b603e2 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -37,6 +37,7 @@ func (s3a *S3ApiServer) mkdir(ctx context.Context, parentDirectoryPath string, d glog.V(1).Infof("mkdir: %v", request) if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("mkdir %v: %v", request, err) return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) } @@ -67,6 +68,7 @@ func (s3a *S3ApiServer) mkFile(ctx context.Context, parentDirectoryPath string, glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName) if _, err := client.CreateEntry(ctx, request); err != nil { + glog.V(0).Infof("create file %v:%v", request, err) return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err) } @@ -115,6 +117,7 @@ func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entr glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request) if _, err := client.DeleteEntry(ctx, request); err != nil { + glog.V(0).Infof("delete entry %v: %v", request, err) return fmt.Errorf("delete entry %s/%s: %v", parentDirectoryPath, entryName, err) } @@ -135,6 +138,7 @@ func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request) resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil { + glog.V(0).Infof("exists entry %v: %v", request, err) return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err) }