diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index 0d790cc0f..8be302111 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -64,6 +64,10 @@ func (m *cdbMap) Close() { } } +func (m *cdbMap) Destroy() error { + return errors.New("Can not delete readonly volumes") +} + func (m cdbMap) ContentSize() uint64 { return m.FileByteCounter } diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 29b71ae52..ef7d3d6fd 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -14,6 +14,7 @@ type NeedleMapper interface { Get(key uint64) (element *NeedleValue, ok bool) Delete(key uint64) error Close() + Destroy() error ContentSize() uint64 DeletedSize() uint64 FileCount() int @@ -155,6 +156,10 @@ func (nm *NeedleMap) Delete(key uint64) error { func (nm *NeedleMap) Close() { _ = nm.indexFile.Close() } +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} func (nm NeedleMap) ContentSize() uint64 { return nm.FileByteCounter } diff --git a/go/storage/store.go b/go/storage/store.go index 2df0e6cb7..e1babd1e5 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -111,6 +111,20 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla } return e } +func (s *Store) DeleteCollection(collection string) (e error) { + for _, location := range s.locations { + for k, v := range location.volumes { + if v.Collection == collection { + e = v.Destroy() + if e != nil { + return + } + delete(location.volumes, k) + } + } + } + return +} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.locations { if v, found := location.volumes[vid]; found { diff --git a/go/storage/volume.go b/go/storage/volume.go index 69817a6d4..0301d7968 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -197,6 +197,21 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { } return false } + +func (v *Volume) Destroy() (err error) { + if v.readOnly { + err = fmt.Errorf("%s is read-only", v.dataFile) + return + } + v.Close() + err = os.Remove(v.dataFile.Name()) + if err != nil { + return + } + err = v.nm.Destroy() + return +} + func (v *Volume) write(n *Needle) (size uint32, err error) { if v.readOnly { err = fmt.Errorf("%s is read-only", v.dataFile) diff --git a/go/topology/collection.go b/go/topology/collection.go index 8042369a9..b21122d22 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -36,3 +36,14 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } return nil } + +func (c *Collection) ListVolumeServers() (nodes []*DataNode) { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + if list := vl.ListVolumeServers(); list != nil { + nodes = append(nodes, list...) + } + } + } + return +} diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 3a6edb447..a83647939 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,8 +1,8 @@ package topology import ( + "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" - _ "fmt" "strconv" ) @@ -28,12 +28,32 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) - dn.UpAdjustActiveVolumeCountDelta(1) + if !v.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(1) + } dn.UpAdjustMaxVolumeId(v.Id) } else { dn.volumes[v.Id] = v } } +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { + actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + for _, v := range actualVolumes { + actualVolumeMap[v.Id] = v + } + for vid, _ := range dn.volumes { + glog.V(2).Infoln("Checking volume id:", vid) + if _, ok := actualVolumeMap[vid]; !ok { + glog.V(0).Infoln("Deleting volume id:", vid) + delete(dn.volumes, vid) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } //TODO: adjust max volume id, if need to reclaim volume ids + for _, v := range actualVolumes { + dn.AddOrUpdateVolume(v) + } +} func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } diff --git a/go/topology/topology.go b/go/topology/topology.go index 1426f7a12..d72879035 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -99,6 +99,15 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) } +func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { + collection, ok = t.collectionMap[collectionName] + return +} + +func (t *Topology) DeleteCollection(collectionName string) { + delete(t.collectionMap, collectionName) +} + func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn) } @@ -112,8 +121,8 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, t.UnRegisterDataNode(dn) } dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { - dn.AddOrUpdateVolume(v) t.RegisterVolumeLayout(&v, dn) } } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5740c9a03..710e7b2ae 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -46,7 +46,9 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { return false } for _, dn := range vl.vid2location[volumeInfo.Id].list { - dn.UpAdjustActiveVolumeCountDelta(-1) + if !volumeInfo.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } } return true } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 40628b4a0..1a35faa5c 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -8,6 +8,7 @@ import ( "sync" ) +// mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement vid2location map[storage.VolumeId]*VolumeLocationList @@ -56,6 +57,13 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { return nil } +func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + for _, location := range vl.vid2location { + nodes = append(nodes, location.list...) + } + return +} + func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { @@ -134,10 +142,12 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) vl.accessLock.Lock() defer vl.accessLock.Unlock() - if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount()) - return vl.removeFromWritable(vid) + if location, ok := vl.vid2location[vid]; ok { + if location.Remove(dn) { + if location.Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) + return vl.removeFromWritable(vid) + } } } return false diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index c1870e79e..d17e57531 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -60,6 +60,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler))) r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler))) diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 0c710429c..64786dc10 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -2,6 +2,7 @@ package weed_server import ( "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" "net/http" @@ -78,6 +79,22 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } } +func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { + collection, ok := ms.topo.GetCollection(r.FormValue("collection")) + if !ok { + writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"}) + return + } + for _, server := range collection.ListVolumeServers() { + _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) + if err != nil { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + return + } + } + ms.topo.DeleteCollection(r.FormValue("collection")) +} + func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { init := r.FormValue("init") == "true" ip := r.FormValue("ip") diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index a2939f848..b4fc4f4ea 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -39,6 +39,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler)) r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler)) r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler)) + r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler)) r.HandleFunc("/", vs.storeHandler) go func() { diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index 2f4673763..a7ea8ad87 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -29,6 +29,19 @@ func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Reque } glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) } +func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) { + if "benchmark" != r.FormValue("collection") { + glog.V(0).Infoln("deleting collection =", r.FormValue("collection"), "!!!") + return + } + err := vs.store.DeleteCollection(r.FormValue("collection")) + if err == nil { + writeJsonQuiet(w, r, map[string]string{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) +} func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) if err == nil {