From a7fc723ae0992c787629ad57c6f2c4f05c1de553 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 10 Jan 2024 23:05:27 +0500 Subject: [PATCH] chore: add status code for request_total metrics (#5188) --- Makefile | 2 +- weed/filer/stream.go | 4 ++-- weed/operation/upload_content.go | 2 +- weed/s3api/stats.go | 20 +----------------- weed/server/filer_server_handlers.go | 20 +++++++++++------- weed/server/filer_server_handlers_read.go | 10 ++++----- weed/server/filer_server_handlers_read_dir.go | 2 +- weed/server/filer_server_handlers_write.go | 2 +- .../filer_server_handlers_write_upload.go | 12 +++++------ weed/server/master_grpc_server.go | 4 ++-- weed/server/volume_server_handlers.go | 20 +++++++++++++----- weed/server/volume_server_handlers_read.go | 4 ++-- weed/stats/http_status_recorder.go | 21 +++++++++++++++++++ weed/stats/metrics.go | 18 ++++++++++++++++ weed/storage/needle/needle_read.go | 20 +++++++++--------- weed/topology/store_replicate.go | 6 +++--- 16 files changed, 101 insertions(+), 66 deletions(-) create mode 100644 weed/stats/http_status_recorder.go diff --git a/Makefile b/Makefile index fbe357db6..269b6ec91 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ full_install: cd weed; go install -tags "elastic gocdk sqlite ydb tikv rclone" server: install - weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json + weed -v 4 server -s3 -filer -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 -s3.port=8000 -s3.allowEmptyFolder=false -s3.allowDeleteBucketNotEmpty=false -s3.config=./docker/compose/s3.json -metricsPort=9324 benchmark: install warp_install pkill weed || true diff --git a/weed/filer/stream.go b/weed/filer/stream.go index eb8f0f3a1..73a2a219c 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -121,10 +121,10 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { - stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() return fmt.Errorf("read chunk: %v", err) } - stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) } if remaining > 0 { diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0413f433b..b48d73deb 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -329,7 +329,7 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) - stats.FilerRequestCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() resp, post_err = HttpClient.Do(req) defer util.CloseResponse(resp) } diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go index 369c3e0f6..42366b18a 100644 --- a/weed/s3api/stats.go +++ b/weed/s3api/stats.go @@ -8,29 +8,11 @@ import ( "time" ) -type StatusRecorder struct { - http.ResponseWriter - Status int -} - -func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder { - return &StatusRecorder{w, http.StatusOK} -} - -func (r *StatusRecorder) WriteHeader(status int) { - r.Status = status - r.ResponseWriter.WriteHeader(status) -} - -func (r *StatusRecorder) Flush() { - r.ResponseWriter.(http.Flusher).Flush() -} - func track(f http.HandlerFunc, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) w.Header().Set("Server", "SeaweedFS S3") - recorder := NewStatusResponseWriter(w) + recorder := stats_collect.NewStatusResponseWriter(w) start := time.Now() f(recorder, r) if recorder.Status == http.StatusForbidden { diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index d71b60d70..ac66514f2 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -4,6 +4,7 @@ import ( "errors" "net/http" "os" + "strconv" "strings" "sync/atomic" "time" @@ -17,7 +18,8 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() - + statusRecorder := stats.NewStatusResponseWriter(w) + w = statusRecorder origin := r.Header.Get("Origin") if origin != "" { if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" { @@ -53,23 +55,23 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { fileId = r.RequestURI[len("/?proxyChunkId="):] } if fileId != "" { - stats.FilerRequestCounter.WithLabelValues(stats.ChunkProxy).Inc() fs.proxyToVolumeServer(w, r, fileId) + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkProxy).Inc() stats.FilerRequestHistogram.WithLabelValues(stats.ChunkProxy).Observe(time.Since(start).Seconds()) return } + defer func() { + stats.FilerRequestCounter.WithLabelValues(r.Method, strconv.Itoa(statusRecorder.Status)).Inc() + stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) + }() + isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) return } - stats.FilerRequestCounter.WithLabelValues(r.Method).Inc() - defer func() { - stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) - }() - w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) switch r.Method { @@ -115,6 +117,8 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() + statusRecorder := stats.NewStatusResponseWriter(w) + w = statusRecorder os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n") @@ -140,8 +144,8 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque w.Header().Set("Access-Control-Allow-Credentials", "true") } - stats.FilerRequestCounter.WithLabelValues(r.Method).Inc() defer func() { + stats.FilerRequestCounter.WithLabelValues(r.Method, strconv.Itoa(statusRecorder.Status)).Inc() stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds()) }() // We handle OPTIONS first because it never should be authenticated diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 3a86c265b..6bdd6a9dd 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -98,11 +98,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } if err == filer_pb.ErrNotFound { glog.V(2).Infof("Not found %s: %v", path, err) - stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadNotFound).Inc() w.WriteHeader(http.StatusNotFound) } else { glog.Errorf("Internal %s: %v", path, err) - stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadInternal).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadInternal).Inc() w.WriteHeader(http.StatusInternalServerError) } return @@ -233,7 +233,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) if err != nil { - stats.FilerRequestCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() glog.Errorf("failed to write entry content: %v", err) } return err @@ -245,7 +245,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) Directory: dir, Name: name, }); err != nil { - stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadCache).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc() glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) return fmt.Errorf("cache %s: %v", entry.FullPath, err) } else { @@ -255,7 +255,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { - stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to stream content %s: %v", r.URL, err) } return err diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 594bc1fa6..2060e3374 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -18,7 +18,7 @@ import ( // is empty. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { - stats.FilerRequestCounter.WithLabelValues(stats.DirList).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.DirList).Inc() path := r.URL.Path if strings.HasSuffix(path, "/") && len(path) > 1 { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 499bfac32..0f82ca267 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -35,7 +35,7 @@ type FilerPostResult struct { func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { - stats.FilerRequestCounter.WithLabelValues(stats.ChunkAssign).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues(stats.ChunkAssign).Observe(time.Since(start).Seconds()) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index a329a38af..e26eecbe8 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -86,11 +86,11 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque bytesBuffer.Read(smallContent) bufPool.Put(bytesBuffer) <-bytesBufferLimitChan - stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ContentSaveToFiler).Inc() break } } else { - stats.FilerRequestCounter.WithLabelValues(stats.AutoChunk).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.AutoChunk).Inc() } wg.Add(1) @@ -143,7 +143,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { - stats.FilerRequestCounter.WithLabelValues(stats.ChunkUpload).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues(stats.ChunkUpload).Observe(time.Since(start).Seconds()) @@ -160,7 +160,7 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil } uploadResult, err, data := operation.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { - stats.FilerRequestCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } return uploadResult, err, data } @@ -180,14 +180,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) if uploadErr != nil { glog.V(4).Infof("retry later due to assign error: %v", uploadErr) - stats.FilerRequestCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() return uploadErr } // upload the chunk to the volume server uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { glog.V(4).Infof("retry later due to upload error: %v", uploadErr) - stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() fid, _ := filer_pb.ToFileIdObject(fileId) fileChunk := filer_pb.FileChunk{ FileId: fileId, diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 93569ce0d..616b28e67 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -168,10 +168,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ DataCenter: dn.GetDataCenterId(), } if len(heartbeat.NewVolumes) > 0 { - stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc() + stats.MasterReceivedHeartbeatCounter.WithLabelValues("newVolumes").Inc() } if len(heartbeat.DeletedVolumes) > 0 { - stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc() + stats.MasterReceivedHeartbeatCounter.WithLabelValues("deletedVolumes").Inc() } if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 9523eee56..06134e648 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -31,6 +31,8 @@ security settings: */ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { + statusRecorder := stats.NewStatusResponseWriter(w) + w = statusRecorder w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -38,10 +40,10 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } start := time.Now() requestMethod := r.Method - defer func(start time.Time, method *string) { - stats.VolumeServerRequestCounter.WithLabelValues(*method).Inc() + defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) { + stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc() stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds()) - }(start, &requestMethod) + }(start, &requestMethod, statusRecorder) switch r.Method { case http.MethodGet, http.MethodHead: stats.ReadRequest() @@ -63,11 +65,9 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque vs.inFlightDownloadDataLimitCond.L.Unlock() vs.GetOrHeadHandler(w, r) case http.MethodDelete: - stats.VolumeServerRequestCounter.WithLabelValues(r.Method).Inc() stats.DeleteRequest() vs.guard.WhiteList(vs.DeleteHandler)(w, r) case http.MethodPut, http.MethodPost: - stats.VolumeServerRequestCounter.WithLabelValues(r.Method).Inc() contentLength := getContentLength(r) // exclude the replication from the concurrentUploadLimitMB if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { @@ -124,11 +124,21 @@ func getContentLength(r *http.Request) int64 { } func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { + statusRecorder := stats.NewStatusResponseWriter(w) + w = statusRecorder w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") } + + start := time.Now() + requestMethod := r.Method + defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) { + stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc() + stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds()) + }(start, &requestMethod, statusRecorder) + switch r.Method { case http.MethodGet, http.MethodHead: stats.ReadRequest() diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 590cc77f0..5f4beb77d 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -30,12 +30,12 @@ import ( var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func NotFound(w http.ResponseWriter) { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorGetNotFound).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorGetNotFound).Inc() w.WriteHeader(http.StatusNotFound) } func InternalError(w http.ResponseWriter) { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorGetInternal).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorGetInternal).Inc() w.WriteHeader(http.StatusInternalServerError) } diff --git a/weed/stats/http_status_recorder.go b/weed/stats/http_status_recorder.go new file mode 100644 index 000000000..19467ecf2 --- /dev/null +++ b/weed/stats/http_status_recorder.go @@ -0,0 +1,21 @@ +package stats + +import "net/http" + +type StatusRecorder struct { + http.ResponseWriter + Status int +} + +func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder { + return &StatusRecorder{w, http.StatusOK} +} + +func (r *StatusRecorder) WriteHeader(status int) { + r.Status = status + r.ResponseWriter.WriteHeader(status) +} + +func (r *StatusRecorder) Flush() { + r.ResponseWriter.(http.Flusher).Flush() +} diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 87e5f2f4e..49a3b090b 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -84,6 +84,14 @@ var ( Subsystem: "filer", Name: "request_total", Help: "Counter of filer requests.", + }, []string{"type", "code"}) + + FilerHandlerCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "filer", + Name: "handler_total", + Help: "Counter of filer handlers.", }, []string{"type"}) FilerRequestHistogram = prometheus.NewHistogramVec( @@ -134,6 +142,14 @@ var ( Subsystem: "volumeServer", Name: "request_total", Help: "Counter of volume server requests.", + }, []string{"type", "code"}) + + VolumeServerHandlerCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "volumeServer", + Name: "handler_total", + Help: "Counter of volume server handlers.", }, []string{"type"}) VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec( @@ -245,6 +261,7 @@ func init() { Gather.MustRegister(MasterReplicaPlacementMismatch) Gather.MustRegister(FilerRequestCounter) + Gather.MustRegister(FilerHandlerCounter) Gather.MustRegister(FilerRequestHistogram) Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreHistogram) @@ -254,6 +271,7 @@ func init() { Gather.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) Gather.MustRegister(VolumeServerRequestCounter) + Gather.MustRegister(VolumeServerHandlerCounter) Gather.MustRegister(VolumeServerRequestHistogram) Gather.MustRegister(VolumeServerVacuumingCompactCounter) Gather.MustRegister(VolumeServerVacuumingCommitCounter) diff --git a/weed/storage/needle/needle_read.go b/weed/storage/needle/needle_read.go index 620591898..19b801bd4 100644 --- a/weed/storage/needle/needle_read.go +++ b/weed/storage/needle/needle_read.go @@ -54,11 +54,11 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio if n.Size != size { // cookie is not always passed in for this API. Use size to do preliminary checking. if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc() glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) return ErrorSizeMismatch } - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc() return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { @@ -75,7 +75,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio newChecksum := NewCRC(n.Data) if checksum != newChecksum.Value() && checksum != uint32(newChecksum) { // the crc.Value() function is to be deprecated. this double checking is for backward compatible. - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorCRC).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc() return errors.New("CRC error! Data On Disk Corrupted") } n.Checksum = newChecksum @@ -108,7 +108,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { n.DataSize = util.BytesToUint32(bytes[index : index+4]) index = index + 4 if int(n.DataSize)+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return fmt.Errorf("index out of range %d", 1) } n.Data = bytes[index : index+int(n.DataSize)] @@ -127,7 +127,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err n.NameSize = uint8(bytes[index]) index = index + 1 if int(n.NameSize)+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 2) } n.Name = bytes[index : index+int(n.NameSize)] @@ -137,7 +137,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err n.MimeSize = uint8(bytes[index]) index = index + 1 if int(n.MimeSize)+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 3) } n.Mime = bytes[index : index+int(n.MimeSize)] @@ -145,7 +145,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err } if index < lenBytes && n.HasLastModifiedDate() { if LastModifiedBytesLength+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 4) } n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) @@ -153,7 +153,7 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err } if index < lenBytes && n.HasTtl() { if TtlBytesLength+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 5) } n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) @@ -161,13 +161,13 @@ func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err err } if index < lenBytes && n.HasPairs() { if 2+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 6) } n.PairsSize = util.BytesToUint16(bytes[index : index+2]) index += 2 if int(n.PairsSize)+index > lenBytes { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc() return index, fmt.Errorf("index out of range %d", 7) } end := index + int(n.PairsSize) diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index d11a47f3b..a5a406459 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -48,7 +48,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds()) if err != nil { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc() err = fmt.Errorf("failed to write to local disk: %v", err) glog.V(0).Infoln(err) return @@ -80,7 +80,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt tmpMap := make(map[string]string) err := json.Unmarshal(n.Pairs, &tmpMap) if err != nil { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc() glog.V(0).Infoln("Unmarshal pairs error:", err) } for k, v := range tmpMap { @@ -109,7 +109,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt }) stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds()) if err != nil { - stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc() err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) glog.V(0).Infoln(err) return false, err