mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
vacuum metrics and force sync dst files (#3832)
This commit is contained in:
parent
f5d4952d73
commit
1f7e52c63e
|
@ -2,6 +2,9 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/procfs"
|
"github.com/prometheus/procfs"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
|
@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
|
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
|
||||||
|
start := time.Now()
|
||||||
|
defer func(start time.Time) {
|
||||||
|
stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
|
||||||
|
}(start)
|
||||||
|
|
||||||
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
|
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
|
||||||
reportInterval := int64(1024 * 1024 * 128)
|
reportInterval := int64(1024 * 1024 * 128)
|
||||||
|
@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
|
glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if sendErr != nil {
|
if sendErr != nil {
|
||||||
glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
|
glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
|
||||||
return sendErr
|
return sendErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
|
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
|
||||||
|
start := time.Now()
|
||||||
|
defer func(start time.Time) {
|
||||||
|
stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
|
||||||
|
}(start)
|
||||||
|
|
||||||
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
|
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
|
||||||
|
|
||||||
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
|
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("commit volume %d: %v", req.VolumeId, err)
|
glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
|
||||||
} else {
|
} else {
|
||||||
glog.V(1).Infof("commit volume %d", req.VolumeId)
|
glog.V(1).Infof("commit volume %d", req.VolumeId)
|
||||||
}
|
}
|
||||||
|
stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
|
||||||
resp.IsReadOnly = readOnly
|
resp.IsReadOnly = readOnly
|
||||||
return resp, err
|
return resp, err
|
||||||
|
|
||||||
|
@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
|
||||||
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
|
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
|
glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
|
||||||
} else {
|
} else {
|
||||||
glog.V(1).Infof("cleanup volume %d", req.VolumeId)
|
glog.V(1).Infof("cleanup volume %d", req.VolumeId)
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,6 +137,31 @@ var (
|
||||||
Help: "Counter of volume server requests.",
|
Help: "Counter of volume server requests.",
|
||||||
}, []string{"type"})
|
}, []string{"type"})
|
||||||
|
|
||||||
|
VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "volumeServer",
|
||||||
|
Name: "vacuuming_compact_count",
|
||||||
|
Help: "Counter of volume vacuuming Compact counter",
|
||||||
|
}, []string{"success"})
|
||||||
|
|
||||||
|
VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "volumeServer",
|
||||||
|
Name: "vacuuming_commit_count",
|
||||||
|
Help: "Counter of volume vacuuming commit counter",
|
||||||
|
}, []string{"success"})
|
||||||
|
|
||||||
|
VolumeServerVacuumingHistogram = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: Namespace,
|
||||||
|
Subsystem: "volumeServer",
|
||||||
|
Name: "vacuuming_seconds",
|
||||||
|
Help: "Bucketed histogram of volume server vacuuming processing time.",
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
|
||||||
|
}, []string{"type"})
|
||||||
|
|
||||||
VolumeServerRequestHistogram = prometheus.NewHistogramVec(
|
VolumeServerRequestHistogram = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
Namespace: Namespace,
|
Namespace: Namespace,
|
||||||
|
@ -223,6 +248,9 @@ func init() {
|
||||||
|
|
||||||
Gather.MustRegister(VolumeServerRequestCounter)
|
Gather.MustRegister(VolumeServerRequestCounter)
|
||||||
Gather.MustRegister(VolumeServerRequestHistogram)
|
Gather.MustRegister(VolumeServerRequestHistogram)
|
||||||
|
Gather.MustRegister(VolumeServerVacuumingCompactCounter)
|
||||||
|
Gather.MustRegister(VolumeServerVacuumingCommitCounter)
|
||||||
|
Gather.MustRegister(VolumeServerVacuumingHistogram)
|
||||||
Gather.MustRegister(VolumeServerVolumeCounter)
|
Gather.MustRegister(VolumeServerVolumeCounter)
|
||||||
Gather.MustRegister(VolumeServerMaxVolumeCounter)
|
Gather.MustRegister(VolumeServerMaxVolumeCounter)
|
||||||
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)
|
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)
|
||||||
|
|
|
@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (df *DiskFile) Close() error {
|
func (df *DiskFile) Close() error {
|
||||||
|
if err := df.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return df.File.Close()
|
return df.File.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int {
|
||||||
func (l *DiskLocation) SetStopping() {
|
func (l *DiskLocation) SetStopping() {
|
||||||
l.volumesLock.Lock()
|
l.volumesLock.Lock()
|
||||||
for _, v := range l.volumes {
|
for _, v := range l.volumes {
|
||||||
v.SetStopping()
|
v.SyncToDisk()
|
||||||
}
|
}
|
||||||
l.volumesLock.Unlock()
|
l.volumesLock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -125,6 +125,7 @@ func (ev *EcVolume) Close() {
|
||||||
ev.ecjFileAccessLock.Unlock()
|
ev.ecjFileAccessLock.Unlock()
|
||||||
}
|
}
|
||||||
if ev.ecxFile != nil {
|
if ev.ecxFile != nil {
|
||||||
|
_ = ev.ecxFile.Sync()
|
||||||
_ = ev.ecxFile.Close()
|
_ = ev.ecxFile.Close()
|
||||||
ev.ecxFile = nil
|
ev.ecxFile = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer idxFile.Close()
|
defer func() {
|
||||||
|
idxFile.Sync()
|
||||||
|
idxFile.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
return cm.AscendingVisit(func(value NeedleValue) error {
|
return cm.AscendingVisit(func(value NeedleValue) error {
|
||||||
if value.Offset.IsZero() || value.Size.IsDeleted() {
|
if value.Offset.IsZero() || value.Size.IsDeleted() {
|
||||||
|
|
|
@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType {
|
||||||
return v.location.DiskType
|
return v.location.DiskType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) SetStopping() {
|
|
||||||
v.dataFileAccessLock.Lock()
|
|
||||||
defer v.dataFileAccessLock.Unlock()
|
|
||||||
if v.nm != nil {
|
|
||||||
if err := v.nm.Sync(); err != nil {
|
|
||||||
glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if v.DataBackend != nil {
|
|
||||||
if err := v.DataBackend.Sync(); err != nil {
|
|
||||||
glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (v *Volume) SyncToDisk() {
|
func (v *Volume) SyncToDisk() {
|
||||||
v.dataFileAccessLock.Lock()
|
v.dataFileAccessLock.Lock()
|
||||||
defer v.dataFileAccessLock.Unlock()
|
defer v.dataFileAccessLock.Unlock()
|
||||||
|
@ -228,10 +213,9 @@ func (v *Volume) Close() {
|
||||||
v.nm = nil
|
v.nm = nil
|
||||||
}
|
}
|
||||||
if v.DataBackend != nil {
|
if v.DataBackend != nil {
|
||||||
if err := v.DataBackend.Sync(); err != nil {
|
if err := v.DataBackend.Close(); err != nil {
|
||||||
glog.Warningf("Volume Close fail to sync volume %d", v.Id)
|
glog.Warningf("Volume Close fail to sync volume %d", v.Id)
|
||||||
}
|
}
|
||||||
_ = v.DataBackend.Close()
|
|
||||||
v.DataBackend = nil
|
v.DataBackend = nil
|
||||||
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
|
||||||
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
v.lastCompactRevision = v.SuperBlock.CompactionRevision
|
||||||
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
|
||||||
if err := v.DataBackend.Sync(); err != nil {
|
if err := v.DataBackend.Sync(); err != nil {
|
||||||
glog.V(0).Infof("compact fail to sync volume %d", v.Id)
|
glog.V(0).Infof("compact failed to sync volume %d", v.Id)
|
||||||
}
|
}
|
||||||
if err := v.nm.Sync(); err != nil {
|
if err := v.nm.Sync(); err != nil {
|
||||||
glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
|
glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
|
||||||
}
|
}
|
||||||
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
|
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
|
||||||
}
|
}
|
||||||
|
@ -83,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
|
||||||
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
|
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
|
||||||
}
|
}
|
||||||
if err := v.DataBackend.Sync(); err != nil {
|
if err := v.DataBackend.Sync(); err != nil {
|
||||||
glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err)
|
glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err)
|
||||||
}
|
}
|
||||||
if err := v.nm.Sync(); err != nil {
|
if err := v.nm.Sync(); err != nil {
|
||||||
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
|
glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
|
||||||
}
|
}
|
||||||
return v.copyDataBasedOnIndexFile(
|
return v.copyDataBasedOnIndexFile(
|
||||||
v.FileName(".dat"), v.FileName(".idx"),
|
v.FileName(".dat"), v.FileName(".idx"),
|
||||||
|
@ -120,7 +120,7 @@ func (v *Volume) CommitCompact() error {
|
||||||
}
|
}
|
||||||
if v.DataBackend != nil {
|
if v.DataBackend != nil {
|
||||||
if err := v.DataBackend.Close(); err != nil {
|
if err := v.DataBackend.Close(); err != nil {
|
||||||
glog.V(0).Infof("fail to close volume %d", v.Id)
|
glog.V(0).Infof("failed to close volume %d", v.Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
v.DataBackend = nil
|
v.DataBackend = nil
|
||||||
|
@ -270,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
||||||
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
|
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer idx.Close()
|
defer func() {
|
||||||
|
idx.Sync()
|
||||||
|
idx.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
stat, err := idx.Stat()
|
stat, err := idx.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("stat file %s: %v", idx.Name(), err)
|
return fmt.Errorf("stat file %s: %v", idx.Name(), err)
|
||||||
|
@ -387,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
|
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
|
||||||
var (
|
var dst backend.BackendStorageFile
|
||||||
dst backend.BackendStorageFile
|
|
||||||
)
|
|
||||||
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
|
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -493,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
|
||||||
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
|
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer indexFile.Close()
|
defer func() {
|
||||||
|
indexFile.Sync()
|
||||||
|
indexFile.Close()
|
||||||
|
}()
|
||||||
if v.tmpNm != nil {
|
if v.tmpNm != nil {
|
||||||
v.tmpNm.Close()
|
v.tmpNm.Close()
|
||||||
v.tmpNm = nil
|
v.tmpNm = nil
|
||||||
|
|
Loading…
Reference in a new issue