This commit is contained in:
Chris Lu 2020-09-12 04:08:03 -07:00
parent ea26a98753
commit 446e476a11
8 changed files with 12 additions and 15 deletions

View file

@ -36,7 +36,7 @@ type SyncOptions struct {
} }
var ( var (
syncOptions SyncOptions syncOptions SyncOptions
syncCpuProfile *string syncCpuProfile *string
syncMemProfile *string syncMemProfile *string
) )

View file

@ -87,7 +87,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0) glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
} else if !synced{ } else if !synced {
synced = true synced = true
glog.V(0).Infof("synced with %s", peer) glog.V(0).Infof("synced with %s", peer)
} }
@ -162,13 +162,13 @@ func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err e
return return
} }
const( const (
MetaOffsetPrefix = "Meta" MetaOffsetPrefix = "Meta"
) )
func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) { func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) {
key := []byte(MetaOffsetPrefix+"xxxx") key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
value, err := f.Store.KvGet(context.Background(), key) value, err := f.Store.KvGet(context.Background(), key)
@ -191,7 +191,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32)
func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) { func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) {
key := []byte(MetaOffsetPrefix+"xxxx") key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
value := make([]byte, 8) value := make([]byte, 8)

View file

@ -84,4 +84,3 @@ func ReadFilerSignature(grpcDialOption grpc.DialOption, filer string) (filerSign
} }
return filerSignature, nil return filerSignature, nil
} }

View file

@ -174,7 +174,7 @@ func (n *Node) localVolumeRatio() float64 {
} }
func (n *Node) localVolumeNextRatio() float64 { func (n *Node) localVolumeNextRatio() float64 {
return divide(len(n.selectedVolumes) + 1, int(n.info.MaxVolumeCount)) return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount))
} }
func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {

View file

@ -20,7 +20,6 @@ func TestIsGoodMove(t *testing.T) {
var tests = []testMoveCase{ var tests = []testMoveCase{
{ {
name: "test 100 move to spread into proper data centers", name: "test 100 move to spread into proper data centers",
replication: "100", replication: "100",
@ -132,7 +131,6 @@ func TestIsGoodMove(t *testing.T) {
targetLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, targetLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
expected: true, expected: true,
}, },
} }
for _, tt := range tests { for _, tt := range tests {

View file

@ -65,7 +65,7 @@ func (nm *NeedleMap) Get(key NeedleId) (existingValue *needle_map.NeedleValue, o
} }
func (nm *NeedleMap) Delete(key NeedleId) error { func (nm *NeedleMap) Delete(key NeedleId) error {
existingValue, ok := nm.m.Get(NeedleId(key)) existingValue, ok := nm.m.Get(NeedleId(key))
if !ok || existingValue.Size.IsDeleted(){ if !ok || existingValue.Size.IsDeleted() {
return nil return nil
} }
deletedBytes := nm.m.Delete(NeedleId(key)) deletedBytes := nm.m.Delete(NeedleId(key))

View file

@ -17,7 +17,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) {
for i := 0; i < 10000; i++ { for i := 0; i < 10000; i++ {
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1)) nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1))
if rand.Float32() < 0.2 { if rand.Float32() < 0.2 {
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1))) nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i)) + 1)))
} }
} }

View file

@ -12,11 +12,11 @@ import (
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
) )
var( var (
UnsupportedCompression = fmt.Errorf("unsupported compression") UnsupportedCompression = fmt.Errorf("unsupported compression")
) )
func MaybeGzipData(input []byte) ([]byte) { func MaybeGzipData(input []byte) []byte {
if IsGzippedContent(input) { if IsGzippedContent(input) {
return input return input
} }
@ -24,13 +24,13 @@ func MaybeGzipData(input []byte) ([]byte) {
if err != nil { if err != nil {
return input return input
} }
if len(gzipped) * 10 > len(input) * 9 { if len(gzipped)*10 > len(input)*9 {
return input return input
} }
return gzipped return gzipped
} }
func MaybeDecompressData(input []byte) ([]byte) { func MaybeDecompressData(input []byte) []byte {
uncompressed, err := DecompressData(input) uncompressed, err := DecompressData(input)
if err != nil { if err != nil {
if err != UnsupportedCompression { if err != UnsupportedCompression {