mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
refactoring
This commit is contained in:
parent
7edbee6f57
commit
560df51def
|
@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
|
||||||
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
|
||||||
|
|
||||||
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
|
||||||
_, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
|
_, err := vs.store.WriteVolumeNeedle(v.Id, n)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ret := operation.UploadResult{}
|
ret := operation.UploadResult{}
|
||||||
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
|
isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
|
||||||
|
|
||||||
// http 204 status code does not allow body
|
// http 204 status code does not allow body
|
||||||
if writeError == nil && isUnchanged {
|
if writeError == nil && isUnchanged {
|
||||||
|
|
|
@ -227,14 +227,15 @@ func (s *Store) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
|
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) {
|
||||||
if v := s.findVolume(i); v != nil {
|
if v := s.findVolume(i); v != nil {
|
||||||
if v.noWriteOrDelete || v.noWriteCanDelete {
|
if v.noWriteOrDelete || v.noWriteCanDelete {
|
||||||
err = fmt.Errorf("volume %d is read only", i)
|
err = fmt.Errorf("volume %d is read only", i)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
|
// using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
|
||||||
_, size, isUnchanged, err = v.writeNeedle(n)
|
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
|
||||||
|
_, _, isUnchanged, err = v.writeNeedle(n)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
|
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,7 @@ import (
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReplicatedWrite(masterNode string, s *storage.Store,
|
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
|
||||||
volumeId needle.VolumeId, n *needle.Needle,
|
|
||||||
r *http.Request) (size uint32, isUnchanged bool, err error) {
|
|
||||||
|
|
||||||
//check JWT
|
//check JWT
|
||||||
jwt := security.GetJwt(r)
|
jwt := security.GetJwt(r)
|
||||||
|
@ -33,12 +31,14 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
|
if s.GetVolume(volumeId) != nil {
|
||||||
|
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to write to local disk: %v", err)
|
err = fmt.Errorf("failed to write to local disk: %v", err)
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(remoteLocations) > 0 { //send to other replica locations
|
if len(remoteLocations) > 0 { //send to other replica locations
|
||||||
if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
|
if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
|
||||||
|
@ -75,7 +75,6 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
||||||
_, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
|
_, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
|
||||||
return err
|
return err
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
size = 0
|
|
||||||
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
|
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue