Issue 11: Failed to write to replicas for volumen 3

Avoid unnecessary master lookup
This commit is contained in:
Chris Lu 2012-11-12 01:26:18 -08:00
parent ccab4217e4
commit ecd0399f8d
2 changed files with 20 additions and 9 deletions

View file

@ -67,13 +67,13 @@ func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
debug("compacted volume =", r.FormValue("volume"), ", error =", err) debug("compacted volume =", r.FormValue("volume"), ", error =", err)
} }
func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
count, err := store.CommitCompactVolume(r.FormValue("volume")) count, err := store.CommitCompactVolume(r.FormValue("volume"))
if err == nil { if err == nil {
writeJson(w, r, map[string]interface{}{"error": "", "size":count}) writeJson(w, r, map[string]interface{}{"error": "", "size": count})
} else { } else {
writeJson(w, r, map[string]string{"error": err.Error()}) writeJson(w, r, map[string]string{"error": err.Error()})
} }
debug("commit compact volume =", r.FormValue("volume"), ", error =", err) debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
} }
func storeHandler(w http.ResponseWriter, r *http.Request) { func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
@ -146,7 +146,11 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
} else { } else {
ret := store.Write(volumeId, needle) ret := store.Write(volumeId, needle)
errorStatus := "" errorStatus := ""
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations needToReplicate := !store.HasVolume(volumeId)
if !needToReplicate && ret > 0 {
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "standard" { if r.FormValue("type") != "standard" {
if !distributedOperation(volumeId, func(location operation.Location) bool { if !distributedOperation(volumeId, func(location operation.Location) bool {
_, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
@ -201,7 +205,11 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
n.Size = 0 n.Size = 0
ret := store.Delete(volumeId, n) ret := store.Delete(volumeId, n)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations needToReplicate := !store.HasVolume(volumeId)
if !needToReplicate && ret > 0 {
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "standard" { if r.FormValue("type") != "standard" {
if !distributedOperation(volumeId, func(location operation.Location) bool { if !distributedOperation(volumeId, func(location operation.Location) bool {
return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")

View file

@ -73,6 +73,9 @@ func (v *Volume) readSuperBlock() {
v.replicaType, _ = NewReplicationTypeFromByte(header[1]) v.replicaType, _ = NewReplicationTypeFromByte(header[1])
} }
} }
func (v *Volume) NeedToReplicate() bool{
return v.replicaType.GetCopyCount()>1
}
func (v *Volume) write(n *Needle) uint32 { func (v *Volume) write(n *Needle) uint32 {
v.accessLock.Lock() v.accessLock.Lock()