diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 820c28a1d..667131e9f 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -111,7 +111,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, err := vs.store.Write(v.Id, n) + _, _, err := vs.store.Write(v.Id, n) return err }) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 45c868c33..188d88ddf 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -41,11 +41,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - _, errorStatus := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) + _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) httpStatus := http.StatusCreated - if errorStatus != "" { + if isUnchanged { + httpStatus = http.StatusNotModified + } + if writeError != nil { httpStatus = http.StatusInternalServerError - ret.Error = errorStatus + ret.Error = writeError.Error() } if needle.HasName() { ret.Name = string(needle.Name) diff --git a/weed/storage/store.go b/weed/storage/store.go index b870cee4e..3b2bb2c34 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -192,7 +192,7 @@ func (s *Store) Close() { } } -func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err error) { +func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("Volume %d is read only", i) @@ -200,7 +200,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err err } // TODO: count needle size ahead if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) { - _, size, err = v.writeNeedle(n) + _, size, isUnchanged, err = v.writeNeedle(n) } else { err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 6899ebbc1..bd268c1a6 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -77,7 +77,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) { return } -func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err error) { +func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile.Name()) @@ -87,7 +87,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err defer v.dataFileAccessLock.Unlock() if v.isFileUnchanged(n) { size = n.DataSize - glog.V(4).Infof("needle is unchanged!") + isUnchanged = true return } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 5192d23b8..d038eeda3 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -124,7 +124,7 @@ func TestCompaction(t *testing.T) { } func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) - _, size, err := v.writeNeedle(n) + _, size, _, err := v.writeNeedle(n) if err != nil { t.Fatalf("write file %d: %v", i, err) } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index fd19cbfba..d4076d548 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -20,19 +20,18 @@ import ( func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size uint32, errorStatus string) { + r *http.Request) (size uint32, isUnchanged bool, err error) { //check JWT jwt := security.GetJwt(r) - ret, err := s.Write(volumeId, n) - needToReplicate := !s.HasVolume(volumeId) + size, isUnchanged, err = s.Write(volumeId, n) if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - size = ret + err = fmt.Errorf("failed to write to local disk: %v", err) return } + needToReplicate := !s.HasVolume(volumeId) needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() if !needToReplicate { needToReplicate = s.GetVolume(volumeId).NeedToReplicate() @@ -75,12 +74,11 @@ func ReplicatedWrite(masterNode string, s *storage.Store, pairMap, jwt) return err }); err != nil { - ret = 0 - errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) + size = 0 + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) } } } - size = ret return }