diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index c26d6ed8f..c3b66c5e7 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, _, err := vs.store.WriteVolumeNeedle(v.Id, n) + _, err := vs.store.WriteVolumeNeedle(v.Id, n) return err }) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 101be4c43..56cebf50f 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -49,7 +49,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) + isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) // http 204 status code does not allow body if writeError == nil && isUnchanged { diff --git a/weed/storage/store.go b/weed/storage/store.go index e29680f6f..19dbcb70e 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -227,14 +227,15 @@ func (s *Store) Close() { } } -func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.noWriteOrDelete || v.noWriteCanDelete { err = fmt.Errorf("volume %d is read only", i) return } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) { - _, size, isUnchanged, err = v.writeNeedle(n) + // using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n) + if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) { + _, _, isUnchanged, err = v.writeNeedle(n) } else { err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 495c38cfa..6f043a601 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -17,9 +17,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size uint32, isUnchanged bool, err error) { +func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) @@ -33,11 +31,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } } - size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) - if err != nil { - err = fmt.Errorf("failed to write to local disk: %v", err) - glog.V(0).Infoln(err) - return + if s.GetVolume(volumeId) != nil { + isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) + if err != nil { + err = fmt.Errorf("failed to write to local disk: %v", err) + glog.V(0).Infoln(err) + return + } } if len(remoteLocations) > 0 { //send to other replica locations @@ -75,7 +75,6 @@ func ReplicatedWrite(masterNode string, s *storage.Store, _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt) return err }); err != nil { - size = 0 err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) glog.V(0).Infoln(err) }