From d829df4f5964e6c0397fcba30554f7b5e4875ea8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 14 Aug 2019 01:08:01 -0700 Subject: [PATCH] volume: protect against nil needle map fix @mastak reported nil problem in https://github.com/chrislusf/seaweedfs/issues/1037 --- weed/storage/needle_map_metric.go | 30 +++++++++++++++++ weed/storage/store.go | 10 +++--- weed/storage/volume.go | 56 ++++++++++++++++++++++++++----- weed/storage/volume_backup.go | 5 ++- weed/storage/volume_read_write.go | 4 +++ weed/storage/volume_vacuum.go | 4 +-- 6 files changed, 92 insertions(+), 17 deletions(-) diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index 6448b053b..823a04108 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -19,10 +19,16 @@ type mapMetric struct { } func (mm *mapMetric) logDelete(deletedByteCount uint32) { + if mm == nil { + return + } mm.LogDeletionCounter(deletedByteCount) } func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { + if mm == nil { + return + } mm.MaybeSetMaxFileKey(key) mm.LogFileCounter(newSize) if oldSize > 0 && oldSize != TombstoneFileSize { @@ -30,32 +36,56 @@ func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { } } func (mm *mapMetric) LogFileCounter(newSize uint32) { + if mm == nil { + return + } atomic.AddUint32(&mm.FileCounter, 1) atomic.AddUint64(&mm.FileByteCounter, uint64(newSize)) } func (mm *mapMetric) LogDeletionCounter(oldSize uint32) { + if mm == nil { + return + } if oldSize > 0 { atomic.AddUint32(&mm.DeletionCounter, 1) atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize)) } } func (mm *mapMetric) ContentSize() uint64 { + if mm == nil { + return 0 + } return atomic.LoadUint64(&mm.FileByteCounter) } func (mm *mapMetric) DeletedSize() uint64 { + if mm == nil { + return 0 + } return atomic.LoadUint64(&mm.DeletionByteCounter) } func (mm *mapMetric) FileCount() int { + if mm == nil { + return 0 + } return int(atomic.LoadUint32(&mm.FileCounter)) } func (mm *mapMetric) DeletedCount() int { + if mm == nil { + return 0 + } return int(atomic.LoadUint32(&mm.DeletionCounter)) } func (mm *mapMetric) MaxFileKey() NeedleId { + if mm == nil { + return 0 + } t := uint64(mm.MaximumFileKey) return Uint64ToNeedleId(t) } func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) { + if mm == nil { + return + } if key > mm.MaxFileKey() { atomic.StoreUint64(&mm.MaximumFileKey, uint64(key)) } diff --git a/weed/storage/store.go b/weed/storage/store.go index ac13f6a28..f0dc90790 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -137,9 +137,9 @@ func (s *Store) Status() []*VolumeInfo { Collection: v.Collection, ReplicaPlacement: v.ReplicaPlacement, Version: v.Version(), - FileCount: v.nm.FileCount(), - DeleteCount: v.nm.DeletedCount(), - DeletedByteCount: v.nm.DeletedSize(), + FileCount: int(v.FileCount()), + DeleteCount: int(v.DeletedCount()), + DeletedByteCount: v.DeletedSize(), ReadOnly: v.readOnly, Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), @@ -168,8 +168,8 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount location.Lock() for _, v := range location.volumes { - if maxFileKey < v.nm.MaxFileKey() { - maxFileKey = v.nm.MaxFileKey() + if maxFileKey < v.MaxFileKey() { + maxFileKey = v.MaxFileKey() } if !v.expired(s.GetVolumeSizeLimit()) { volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) diff --git a/weed/storage/volume.go b/weed/storage/volume.go index a5e923547..a2e34bd04 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "path" @@ -85,14 +86,54 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) return // -1 causes integer overflow and the volume to become unwritable. } -func (v *Volume) IndexFileSize() uint64 { - return v.nm.IndexFileSize() +func (v *Volume) ContentSize() uint64 { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.ContentSize() +} + +func (v *Volume) DeletedSize() uint64 { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.DeletedSize() } func (v *Volume) FileCount() uint64 { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() return uint64(v.nm.FileCount()) } +func (v *Volume) DeletedCount() uint64 { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return uint64(v.nm.DeletedCount()) +} + +func (v *Volume) MaxFileKey() types.NeedleId { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.MaxFileKey() +} + +func (v *Volume) IndexFileSize() uint64 { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.IndexFileSize() +} + +func (v *Volume) IndexFileContent() ([]byte, error) { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.IndexFileContent() +} + +func (v *Volume) IndexFileName() string { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + return v.nm.IndexFileName() +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -112,10 +153,6 @@ func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } -func (v *Volume) ContentSize() uint64 { - return v.nm.ContentSize() -} - // volume is expired if modified time + volume ttl < now // except when volume is empty // or when the volume does not have a ttl @@ -158,13 +195,14 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { size, _, modTime := v.FileStat() + return &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), Size: size, Collection: v.Collection, - FileCount: uint64(v.nm.FileCount()), - DeleteCount: uint64(v.nm.DeletedCount()), - DeletedByteCount: v.nm.DeletedSize(), + FileCount: uint64(v.FileCount()), + DeleteCount: uint64(v.DeletedCount()), + DeletedByteCount: v.DeletedSize(), ReadOnly: v.readOnly, ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index f56c40019..e10990a9b 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -15,12 +15,15 @@ import ( ) func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} if stat, err := v.dataFile.Stat(); err == nil { syncStatus.TailOffset = uint64(stat.Size()) } syncStatus.Collection = v.Collection - syncStatus.IdxFileSize = v.nm.IndexFileSize() + syncStatus.IdxFileSize = v.IndexFileSize() syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision) syncStatus.Ttl = v.SuperBlock.Ttl.String() syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String() diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 2c67b2dc4..ae05331a4 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -21,6 +21,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { if v.Ttl.String() != "" { return false } + nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { oldNeedle := new(needle.Needle) @@ -138,6 +139,9 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { // read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) readNeedle(n *needle.Needle) (int, error) { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset.IsZero() { return -1, ErrorNotFound diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index ff09df42d..c021c4c18 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 { if v.ContentSize() == 0 { return 0 } - return float64(v.nm.DeletedSize()) / float64(v.ContentSize()) + return float64(v.DeletedSize()) / float64(v.ContentSize()) } func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { @@ -33,7 +33,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error }() filePath := v.FileName() - v.lastCompactIndexOffset = v.nm.IndexFileSize() + v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)