This commit is contained in:
chrislu 2022-10-13 23:10:49 -07:00
commit e05637c42c
10 changed files with 108 additions and 69 deletions

View file

@ -411,6 +411,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant
if len(objectOwner) > 0 { if len(objectOwner) > 0 {
objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner) objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner)
} else {
delete(objectEntry.Extended, s3_constants.ExtAmzOwnerKey)
} }
if len(grants) > 0 { if len(grants) > 0 {
@ -420,6 +422,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant
return s3err.ErrInvalidRequest return s3err.ErrInvalidRequest
} }
objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes
} else {
delete(objectEntry.Extended, s3_constants.ExtAmzAclKey)
} }
return s3err.ErrNone return s3err.ErrNone

View file

@ -487,8 +487,9 @@ func TestDetermineReqGrants(t *testing.T) {
func TestAssembleEntryWithAcp(t *testing.T) { func TestAssembleEntryWithAcp(t *testing.T) {
defaultOwner := "admin" defaultOwner := "admin"
{
//case1 //case1
//assemble with non-empty grants
expectOwner := "accountS" expectOwner := "accountS"
expectGrants := []*s3.Grant{ expectGrants := []*s3.Grant{
{ {
@ -512,22 +513,19 @@ func TestAssembleEntryWithAcp(t *testing.T) {
if !grantsEquals(resultGrants, expectGrants) { if !grantsEquals(resultGrants, expectGrants) {
t.Fatal("grants not expect") t.Fatal("grants not expect")
} }
}
{
//case2
entry := &filer_pb.Entry{}
AssembleEntryWithAcp(entry, "", nil)
resultOwner := GetAcpOwner(entry.Extended, defaultOwner) //case2
//assemble with empty grants (override)
AssembleEntryWithAcp(entry, "", nil)
resultOwner = GetAcpOwner(entry.Extended, defaultOwner)
if resultOwner != defaultOwner { if resultOwner != defaultOwner {
t.Fatalf("owner not expect") t.Fatalf("owner not expect")
} }
resultGrants := GetAcpGrants(entry.Extended) resultGrants = GetAcpGrants(entry.Extended)
if len(resultGrants) != 0 { if len(resultGrants) != 0 {
t.Fatal("grants not expect") t.Fatal("grants not expect")
} }
}
} }

View file

@ -2,6 +2,9 @@ package weed_server
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/stats"
"strconv"
"time"
"github.com/prometheus/procfs" "github.com/prometheus/procfs"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
} }
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
start := time.Now()
defer func(start time.Time) {
stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
}(start)
resp := &volume_server_pb.VacuumVolumeCompactResponse{} resp := &volume_server_pb.VacuumVolumeCompactResponse{}
reportInterval := int64(1024 * 1024 * 128) reportInterval := int64(1024 * 1024 * 128)
@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
return true return true
}) })
stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
if err != nil { if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err) glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
return err return err
} }
if sendErr != nil { if sendErr != nil {
glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr) glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
return sendErr return sendErr
} }
@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
} }
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) { func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
start := time.Now()
defer func(start time.Time) {
stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
}(start)
resp := &volume_server_pb.VacuumVolumeCommitResponse{} resp := &volume_server_pb.VacuumVolumeCommitResponse{}
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil { if err != nil {
glog.Errorf("commit volume %d: %v", req.VolumeId, err) glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
} else { } else {
glog.V(1).Infof("commit volume %d", req.VolumeId) glog.V(1).Infof("commit volume %d", req.VolumeId)
} }
stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
resp.IsReadOnly = readOnly resp.IsReadOnly = readOnly
return resp, err return resp, err
@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil { if err != nil {
glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
} else { } else {
glog.V(1).Infof("cleanup volume %d", req.VolumeId) glog.V(1).Infof("cleanup volume %d", req.VolumeId)
} }

View file

@ -137,6 +137,31 @@ var (
Help: "Counter of volume server requests.", Help: "Counter of volume server requests.",
}, []string{"type"}) }, []string{"type"})
VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_compact_count",
Help: "Counter of volume vacuuming Compact counter",
}, []string{"success"})
VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_commit_count",
Help: "Counter of volume vacuuming commit counter",
}, []string{"success"})
VolumeServerVacuumingHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: "volumeServer",
Name: "vacuuming_seconds",
Help: "Bucketed histogram of volume server vacuuming processing time.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
}, []string{"type"})
VolumeServerRequestHistogram = prometheus.NewHistogramVec( VolumeServerRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Namespace: Namespace, Namespace: Namespace,
@ -223,6 +248,9 @@ func init() {
Gather.MustRegister(VolumeServerRequestCounter) Gather.MustRegister(VolumeServerRequestCounter)
Gather.MustRegister(VolumeServerRequestHistogram) Gather.MustRegister(VolumeServerRequestHistogram)
Gather.MustRegister(VolumeServerVacuumingCompactCounter)
Gather.MustRegister(VolumeServerVacuumingCommitCounter)
Gather.MustRegister(VolumeServerVacuumingHistogram)
Gather.MustRegister(VolumeServerVolumeCounter) Gather.MustRegister(VolumeServerVolumeCounter)
Gather.MustRegister(VolumeServerMaxVolumeCounter) Gather.MustRegister(VolumeServerMaxVolumeCounter)
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge) Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)

View file

@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error {
} }
func (df *DiskFile) Close() error { func (df *DiskFile) Close() error {
if err := df.Sync(); err != nil {
return err
}
return df.File.Close() return df.File.Close()
} }

View file

@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int {
func (l *DiskLocation) SetStopping() { func (l *DiskLocation) SetStopping() {
l.volumesLock.Lock() l.volumesLock.Lock()
for _, v := range l.volumes { for _, v := range l.volumes {
v.SetStopping() v.SyncToDisk()
} }
l.volumesLock.Unlock() l.volumesLock.Unlock()

View file

@ -125,6 +125,7 @@ func (ev *EcVolume) Close() {
ev.ecjFileAccessLock.Unlock() ev.ecjFileAccessLock.Unlock()
} }
if ev.ecxFile != nil { if ev.ecxFile != nil {
_ = ev.ecxFile.Sync()
_ = ev.ecxFile.Close() _ = ev.ecxFile.Close()
ev.ecxFile = nil ev.ecxFile = nil
} }

View file

@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
if err != nil { if err != nil {
return return
} }
defer idxFile.Close() defer func() {
idxFile.Sync()
idxFile.Close()
}()
return cm.AscendingVisit(func(value NeedleValue) error { return cm.AscendingVisit(func(value NeedleValue) error {
if value.Offset.IsZero() || value.Size.IsDeleted() { if value.Offset.IsZero() || value.Size.IsDeleted() {

View file

@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType return v.location.DiskType
} }
func (v *Volume) SetStopping() {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if v.nm != nil {
if err := v.nm.Sync(); err != nil {
glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
}
}
if v.DataBackend != nil {
if err := v.DataBackend.Sync(); err != nil {
glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
}
}
}
func (v *Volume) SyncToDisk() { func (v *Volume) SyncToDisk() {
v.dataFileAccessLock.Lock() v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock() defer v.dataFileAccessLock.Unlock()
@ -228,10 +213,9 @@ func (v *Volume) Close() {
v.nm = nil v.nm = nil
} }
if v.DataBackend != nil { if v.DataBackend != nil {
if err := v.DataBackend.Sync(); err != nil { if err := v.DataBackend.Close(); err != nil {
glog.Warningf("Volume Close fail to sync volume %d", v.Id) glog.Warningf("Volume Close fail to sync volume %d", v.Id)
} }
_ = v.DataBackend.Close()
v.DataBackend = nil v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
} }

View file

@ -55,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.lastCompactRevision = v.SuperBlock.CompactionRevision v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
if err := v.DataBackend.Sync(); err != nil { if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume %d", v.Id) glog.V(0).Infof("compact failed to sync volume %d", v.Id)
} }
if err := v.nm.Sync(); err != nil { if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume idx %d", v.Id) glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
} }
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
} }
@ -83,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile()) return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
} }
if err := v.DataBackend.Sync(); err != nil { if err := v.DataBackend.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err) glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err)
} }
if err := v.nm.Sync(); err != nil { if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
} }
return v.copyDataBasedOnIndexFile( return v.copyDataBasedOnIndexFile(
v.FileName(".dat"), v.FileName(".idx"), v.FileName(".dat"), v.FileName(".idx"),
@ -120,7 +120,7 @@ func (v *Volume) CommitCompact() error {
} }
if v.DataBackend != nil { if v.DataBackend != nil {
if err := v.DataBackend.Close(); err != nil { if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id) glog.V(0).Infof("failed to close volume %d", v.Id)
} }
} }
v.DataBackend = nil v.DataBackend = nil
@ -270,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
} }
defer idx.Close() defer func() {
idx.Sync()
idx.Close()
}()
stat, err := idx.Stat() stat, err := idx.Stat()
if err != nil { if err != nil {
return fmt.Errorf("stat file %s: %v", idx.Name(), err) return fmt.Errorf("stat file %s: %v", idx.Name(), err)
@ -387,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
} }
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var ( var dst backend.BackendStorageFile
dst backend.BackendStorageFile
)
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
return err return err
} }
@ -493,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err) glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
return err return err
} }
defer indexFile.Close() defer func() {
indexFile.Sync()
indexFile.Close()
}()
if v.tmpNm != nil { if v.tmpNm != nil {
v.tmpNm.Close() v.tmpNm.Close()
v.tmpNm = nil v.tmpNm = nil