diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go index 137cb82cf..6107f3d48 100644 --- a/unmaintained/diff_volume_servers/diff_volume_servers.go +++ b/unmaintained/diff_volume_servers/diff_volume_servers.go @@ -158,9 +158,6 @@ func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int6 files := map[types.NeedleId]needleState{} err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { if offset.IsZero() || size.IsDeleted() { - if size < 0 { - size = -size - } files[key] = needleState{ state: stateDeleted, size: size, diff --git a/weed/command/fix.go b/weed/command/fix.go index a3435804b..ae9a051b8 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -30,7 +30,6 @@ var ( fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files") fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name") fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") - fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", false, "include deleted entries in the index file") ) type VolumeFileScanner4Fix struct { @@ -51,14 +50,9 @@ func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64 glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed()) if n.Size.IsValid() { pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) - glog.V(2).Infof("saved %s %d bytes with error %v", n.Id.String(), n.Size, pe) + glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { - if val, found := scanner.nm.Get(n.Id); *fixIncludeDeleted && found && val.Size > 0 { - pe := scanner.nm.Set(n.Id, val.Offset, -val.Size) - glog.V(2).Infof("update deleted %s %d bytes with error %v", n.Id.String(), -val.Size, pe) - return nil - } - glog.V(1).Infof("skipping deleted file %s size %d ...", n.Id.String(), n.Size) + glog.V(2).Infof("skipping deleted file ...") return scanner.nm.Delete(n.Id) } return nil @@ -89,7 +83,7 @@ func runFix(cmd *Command, args []string) bool { os.Remove(indexFileName) } - if err := nm.SaveToIdx(indexFileName, *fixIncludeDeleted); err != nil { + if err := nm.SaveToIdx(indexFileName); err != nil { glog.Fatalf("save to .idx File: %v", err) os.Remove(indexFileName) } diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 1662a322e..e91856dfe 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -21,7 +21,7 @@ const ( type NeedleMapper interface { Put(key NeedleId, offset Offset, size Size) error Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) - Delete(key NeedleId) error + Delete(key NeedleId, offset Offset) error Close() Destroy() error ContentSize() uint64 diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index eb9da7f18..b25b5e89a 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -80,7 +80,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) { return } -func (cm *MemDb) SaveToIdx(idxName string, includeDeleted bool) (ret error) { +func (cm *MemDb) SaveToIdx(idxName string) (ret error) { idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return @@ -88,10 +88,7 @@ func (cm *MemDb) SaveToIdx(idxName string, includeDeleted bool) (ret error) { defer idxFile.Close() return cm.AscendingVisit(func(value NeedleValue) error { - if value.Offset.IsZero() { - return nil - } - if !includeDeleted && value.Size.IsDeleted() { + if value.Offset.IsZero() || value.Size.IsDeleted() { return nil } _, err := idxFile.Write(value.ToBytes()) diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index c8820bdb7..415cd14dd 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -74,7 +74,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { } defer db.Close() return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error { - if !offset.IsZero() { + if !offset.IsZero() && size.IsValid() { levelDbWrite(db, key, offset, size) } else { levelDbDelete(db, key) @@ -123,7 +123,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error { return db.Delete(bytes, nil) } -func (m *LevelDbNeedleMap) Delete(key NeedleId) error { +func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { oldNeedle, found := m.Get(key) if !found || oldNeedle.Size.IsDeleted() { return nil @@ -131,7 +131,7 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId) error { m.logDelete(oldNeedle.Size) // write to index file first - if err := m.appendToIndexFile(key, oldNeedle.Offset, -oldNeedle.Size); err != nil { + if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { return err } diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 23ee561d0..d0891dc98 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -30,18 +30,13 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error { nm.MaybeSetMaxFileKey(key) - if !offset.IsZero() { + if !offset.IsZero() && size.IsValid() { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) - oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) if !oldOffset.IsZero() && oldSize.IsValid() { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } else if size < 0 { - // deletion - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(-size) } } else { oldSize := nm.m.Delete(NeedleId(key)) @@ -59,18 +54,14 @@ func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error { nm.logPut(key, oldSize, size) return nm.appendToIndexFile(key, offset, size) } -func (nm *NeedleMap) Get(key NeedleId) (existingValue *needle_map.NeedleValue, ok bool) { - existingValue, ok = nm.m.Get(NeedleId(key)) +func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { + element, ok = nm.m.Get(NeedleId(key)) return } -func (nm *NeedleMap) Delete(key NeedleId) error { - existingValue, ok := nm.m.Get(NeedleId(key)) - if !ok || existingValue.Size.IsDeleted() { - return nil - } +func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error { deletedBytes := nm.m.Delete(NeedleId(key)) nm.logDelete(deletedBytes) - return nm.appendToIndexFile(key, existingValue.Offset, -existingValue.Size) + return nm.appendToIndexFile(key, offset, TombstoneFileSize) } func (nm *NeedleMap) Close() { indexFileName := nm.indexFile.Name() diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go index a460b3408..362659a11 100644 --- a/weed/storage/needle_map_metric_test.go +++ b/weed/storage/needle_map_metric_test.go @@ -17,7 +17,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) { for i := 0; i < 10000; i++ { nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1)) if rand.Float32() < 0.2 { - nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i)) + 1))) + nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0))) } } diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index afb1e782f..1ca113ca9 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -69,9 +69,9 @@ func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error return os.ErrInvalid } -func (m *SortedFileNeedleMap) Delete(key NeedleId) error { +func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { - offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil) + _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil) if err != nil { if err == erasure_coding.NotFoundError { @@ -85,7 +85,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId) error { } // write to index file first - if err := m.appendToIndexFile(key, offset, -size); err != nil { + if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { return err } _, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted) diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index fdae1add4..595bd8a35 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -256,5 +256,5 @@ func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset in if n.Size > 0 && n.Size.IsValid() { return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) } - return scanner.v.nm.Delete(n.Id) + return scanner.v.nm.Delete(n.Id, ToOffset(offset)) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 08cbad57b..e77010dbd 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -200,12 +200,12 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) { size := nv.Size n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) - _, _, _, err := n.Append(v.DataBackend, v.Version()) + offset, _, _, err := n.Append(v.DataBackend, v.Version()) if err != nil { return size, err } v.lastAppendAtNs = n.AppendAtNs - if err = v.nm.Delete(n.Id); err != nil { + if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { return size, err } return size, err @@ -238,12 +238,12 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) { size := nv.Size n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) - _, _, _, err := n.Append(v.DataBackend, v.Version()) + offset, _, _, err := n.Append(v.DataBackend, v.Version()) if err != nil { return size, err } v.lastAppendAtNs = n.AppendAtNs - if err = v.nm.Delete(n.Id); err != nil { + if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { return size, err } return size, err @@ -263,7 +263,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro readSize := nv.Size if readSize.IsDeleted() { if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { - glog.V(3).Infof("reading deleted %s size %d", n.String(), readSize) + glog.V(3).Infof("reading deleted %s", n.String()) readSize = -readSize } else { return -1, errors.New("already deleted") diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 100067693..a3e5800df 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -374,7 +374,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return nil } - err = nm.SaveToIdx(idxName, false) + err = nm.SaveToIdx(idxName) return } @@ -441,7 +441,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str return nil }) - newNm.SaveToIdx(datIdxName, false) + newNm.SaveToIdx(datIdxName) return }