Merge pull request #2634 from kmlebedev/errorMetrics

error metrics for filer and store
This commit is contained in:
Chris Lu 2022-02-04 22:35:13 -08:00 committed by GitHub
commit a23fcb9a7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 0 deletions

View file

@ -162,6 +162,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if offset+size <= int64(len(entry.Content)) { if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size]) _, err := writer.Write(entry.Content[offset : offset+size])
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues("write.entry.failed").Inc()
glog.Errorf("failed to write entry content: %v", err) glog.Errorf("failed to write entry content: %v", err)
} }
return err return err
@ -173,6 +174,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
Directory: dir, Directory: dir,
Name: name, Name: name,
}); err != nil { }); err != nil {
stats.FilerRequestCounter.WithLabelValues("read.cache.failed").Inc()
glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
return fmt.Errorf("cache %s: %v", entry.FullPath, err) return fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else { } else {
@ -182,6 +184,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size) err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size)
if err != nil { if err != nil {
stats.FilerRequestCounter.WithLabelValues("read.stream.failed").Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err) glog.Errorf("failed to stream content %s: %v", r.URL, err)
} }
return err return err

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types" . "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -217,9 +218,11 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
if n.Size != size { if n.Size != size {
// cookie is not always passed in for this API. Use size to do preliminary checking. // cookie is not always passed in for this API. Use size to do preliminary checking.
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
stats.VolumeServerRequestCounter.WithLabelValues("errorSizeMismatchOffsetSize").Inc()
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
return ErrorSizeMismatch return ErrorSizeMismatch
} }
stats.VolumeServerRequestCounter.WithLabelValues("errorSizeMismatch").Inc()
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
} }
switch version { switch version {
@ -235,6 +238,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
newChecksum := NewCRC(n.Data) newChecksum := NewCRC(n.Data)
if checksum != newChecksum.Value() { if checksum != newChecksum.Value() {
stats.VolumeServerRequestCounter.WithLabelValues("errorCRC").Inc()
return errors.New("CRC error! Data On Disk Corrupted") return errors.New("CRC error! Data On Disk Corrupted")
} }
n.Checksum = newChecksum n.Checksum = newChecksum
@ -267,6 +271,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
n.DataSize = util.BytesToUint32(bytes[index : index+4]) n.DataSize = util.BytesToUint32(bytes[index : index+4])
index = index + 4 index = index + 4
if int(n.DataSize)+index > lenBytes { if int(n.DataSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 1) return fmt.Errorf("index out of range %d", 1)
} }
n.Data = bytes[index : index+int(n.DataSize)] n.Data = bytes[index : index+int(n.DataSize)]
@ -278,6 +283,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
n.NameSize = uint8(bytes[index]) n.NameSize = uint8(bytes[index])
index = index + 1 index = index + 1
if int(n.NameSize)+index > lenBytes { if int(n.NameSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 2) return fmt.Errorf("index out of range %d", 2)
} }
n.Name = bytes[index : index+int(n.NameSize)] n.Name = bytes[index : index+int(n.NameSize)]
@ -287,6 +293,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
n.MimeSize = uint8(bytes[index]) n.MimeSize = uint8(bytes[index])
index = index + 1 index = index + 1
if int(n.MimeSize)+index > lenBytes { if int(n.MimeSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 3) return fmt.Errorf("index out of range %d", 3)
} }
n.Mime = bytes[index : index+int(n.MimeSize)] n.Mime = bytes[index : index+int(n.MimeSize)]
@ -294,6 +301,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
} }
if index < lenBytes && n.HasLastModifiedDate() { if index < lenBytes && n.HasLastModifiedDate() {
if LastModifiedBytesLength+index > lenBytes { if LastModifiedBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 4) return fmt.Errorf("index out of range %d", 4)
} }
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
@ -301,6 +309,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
} }
if index < lenBytes && n.HasTtl() { if index < lenBytes && n.HasTtl() {
if TtlBytesLength+index > lenBytes { if TtlBytesLength+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 5) return fmt.Errorf("index out of range %d", 5)
} }
n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
@ -308,11 +317,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
} }
if index < lenBytes && n.HasPairs() { if index < lenBytes && n.HasPairs() {
if 2+index > lenBytes { if 2+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 6) return fmt.Errorf("index out of range %d", 6)
} }
n.PairsSize = util.BytesToUint16(bytes[index : index+2]) n.PairsSize = util.BytesToUint16(bytes[index : index+2])
index += 2 index += 2
if int(n.PairsSize)+index > lenBytes { if int(n.PairsSize)+index > lenBytes {
stats.VolumeServerRequestCounter.WithLabelValues("errorIndexOutOfRange").Inc()
return fmt.Errorf("index out of range %d", 7) return fmt.Errorf("index out of range %d", 7)
} }
end := index + int(n.PairsSize) end := index + int(n.PairsSize)

View file

@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/storage/types"
@ -44,6 +45,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
if s.GetVolume(volumeId) != nil { if s.GetVolume(volumeId) != nil {
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues("errorWriteToLocalDisk").Inc()
err = fmt.Errorf("failed to write to local disk: %v", err) err = fmt.Errorf("failed to write to local disk: %v", err)
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
return return
@ -74,6 +76,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
tmpMap := make(map[string]string) tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap) err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil { if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues("errorUnmarshalPairs").Inc()
glog.V(0).Infoln("Unmarshal pairs error:", err) glog.V(0).Infoln("Unmarshal pairs error:", err)
} }
for k, v := range tmpMap { for k, v := range tmpMap {
@ -95,6 +98,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
_, err := operation.UploadData(n.Data, uploadOption) _, err := operation.UploadData(n.Data, uploadOption)
return err return err
}); err != nil { }); err != nil {
stats.VolumeServerRequestCounter.WithLabelValues("errorWriteToReplicas").Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err) glog.V(0).Infoln(err)
} }