mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge pull request #259 from tnextday/master
Fix chunk manifest replicate upload error
This commit is contained in:
commit
f7f9129b05
|
@ -5,6 +5,8 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/go/glog"
|
"github.com/chrislusf/seaweedfs/go/glog"
|
||||||
"github.com/chrislusf/seaweedfs/go/operation"
|
"github.com/chrislusf/seaweedfs/go/operation"
|
||||||
"github.com/chrislusf/seaweedfs/go/security"
|
"github.com/chrislusf/seaweedfs/go/security"
|
||||||
|
@ -33,9 +35,24 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
||||||
}
|
}
|
||||||
if needToReplicate { //send to other replica locations
|
if needToReplicate { //send to other replica locations
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
|
|
||||||
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
||||||
_, err := operation.Upload(
|
u := url.URL{
|
||||||
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
|
Scheme: "http",
|
||||||
|
Host: location.Url,
|
||||||
|
Path: r.URL.Path,
|
||||||
|
}
|
||||||
|
q := url.Values{
|
||||||
|
"type": {"replicate"},
|
||||||
|
}
|
||||||
|
if needle.LastModified > 0 {
|
||||||
|
q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
|
||||||
|
}
|
||||||
|
if needle.IsChunkedManifest() {
|
||||||
|
q.Set("cm", "true")
|
||||||
|
}
|
||||||
|
u.RawQuery = q.Encode()
|
||||||
|
_, err := operation.Upload(u.String(),
|
||||||
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
|
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
|
||||||
jwt)
|
jwt)
|
||||||
return err == nil
|
return err == nil
|
||||||
|
|
|
@ -11,6 +11,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/go/glog"
|
"github.com/chrislusf/seaweedfs/go/glog"
|
||||||
"github.com/chrislusf/seaweedfs/go/images"
|
"github.com/chrislusf/seaweedfs/go/images"
|
||||||
"github.com/chrislusf/seaweedfs/go/operation"
|
"github.com/chrislusf/seaweedfs/go/operation"
|
||||||
|
@ -46,7 +48,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
|
||||||
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
|
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
|
||||||
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
|
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
|
||||||
if err == nil && len(lookupResult.Locations) > 0 {
|
if err == nil && len(lookupResult.Locations) > 0 {
|
||||||
http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
|
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
|
||||||
|
u.Path = r.URL.Path
|
||||||
|
arg := url.Values{}
|
||||||
|
if c := r.FormValue("collection"); c != "" {
|
||||||
|
arg.Set("collection", c)
|
||||||
|
}
|
||||||
|
u.RawQuery = arg.Encode()
|
||||||
|
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
|
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
|
Loading…
Reference in a new issue