mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
can now delete a collection! Is this a dangerous feature? Only enabling
deleting "benchmark" collections for now.
This commit is contained in:
parent
a121453188
commit
cd10c277b2
|
@ -64,6 +64,10 @@ func (m *cdbMap) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *cdbMap) Destroy() error {
|
||||||
|
return errors.New("Can not delete readonly volumes")
|
||||||
|
}
|
||||||
|
|
||||||
func (m cdbMap) ContentSize() uint64 {
|
func (m cdbMap) ContentSize() uint64 {
|
||||||
return m.FileByteCounter
|
return m.FileByteCounter
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ type NeedleMapper interface {
|
||||||
Get(key uint64) (element *NeedleValue, ok bool)
|
Get(key uint64) (element *NeedleValue, ok bool)
|
||||||
Delete(key uint64) error
|
Delete(key uint64) error
|
||||||
Close()
|
Close()
|
||||||
|
Destroy() error
|
||||||
ContentSize() uint64
|
ContentSize() uint64
|
||||||
DeletedSize() uint64
|
DeletedSize() uint64
|
||||||
FileCount() int
|
FileCount() int
|
||||||
|
@ -155,6 +156,10 @@ func (nm *NeedleMap) Delete(key uint64) error {
|
||||||
func (nm *NeedleMap) Close() {
|
func (nm *NeedleMap) Close() {
|
||||||
_ = nm.indexFile.Close()
|
_ = nm.indexFile.Close()
|
||||||
}
|
}
|
||||||
|
func (nm *NeedleMap) Destroy() error {
|
||||||
|
nm.Close()
|
||||||
|
return os.Remove(nm.indexFile.Name())
|
||||||
|
}
|
||||||
func (nm NeedleMap) ContentSize() uint64 {
|
func (nm NeedleMap) ContentSize() uint64 {
|
||||||
return nm.FileByteCounter
|
return nm.FileByteCounter
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,20 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
func (s *Store) DeleteCollection(collection string) (e error) {
|
||||||
|
for _, location := range s.locations {
|
||||||
|
for k, v := range location.volumes {
|
||||||
|
if v.Collection == collection {
|
||||||
|
e = v.Destroy()
|
||||||
|
if e != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(location.volumes, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
func (s *Store) findVolume(vid VolumeId) *Volume {
|
func (s *Store) findVolume(vid VolumeId) *Volume {
|
||||||
for _, location := range s.locations {
|
for _, location := range s.locations {
|
||||||
if v, found := location.volumes[vid]; found {
|
if v, found := location.volumes[vid]; found {
|
||||||
|
|
|
@ -197,6 +197,21 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *Volume) Destroy() (err error) {
|
||||||
|
if v.readOnly {
|
||||||
|
err = fmt.Errorf("%s is read-only", v.dataFile)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
v.Close()
|
||||||
|
err = os.Remove(v.dataFile.Name())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = v.nm.Destroy()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (v *Volume) write(n *Needle) (size uint32, err error) {
|
func (v *Volume) write(n *Needle) (size uint32, err error) {
|
||||||
if v.readOnly {
|
if v.readOnly {
|
||||||
err = fmt.Errorf("%s is read-only", v.dataFile)
|
err = fmt.Errorf("%s is read-only", v.dataFile)
|
||||||
|
|
|
@ -36,3 +36,14 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
|
||||||
|
for _, vl := range c.replicaType2VolumeLayout {
|
||||||
|
if vl != nil {
|
||||||
|
if list := vl.ListVolumeServers(); list != nil {
|
||||||
|
nodes = append(nodes, list...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package topology
|
package topology
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
_ "fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,12 +28,32 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
|
||||||
if _, ok := dn.volumes[v.Id]; !ok {
|
if _, ok := dn.volumes[v.Id]; !ok {
|
||||||
dn.volumes[v.Id] = v
|
dn.volumes[v.Id] = v
|
||||||
dn.UpAdjustVolumeCountDelta(1)
|
dn.UpAdjustVolumeCountDelta(1)
|
||||||
|
if !v.ReadOnly {
|
||||||
dn.UpAdjustActiveVolumeCountDelta(1)
|
dn.UpAdjustActiveVolumeCountDelta(1)
|
||||||
|
}
|
||||||
dn.UpAdjustMaxVolumeId(v.Id)
|
dn.UpAdjustMaxVolumeId(v.Id)
|
||||||
} else {
|
} else {
|
||||||
dn.volumes[v.Id] = v
|
dn.volumes[v.Id] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
|
||||||
|
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
|
||||||
|
for _, v := range actualVolumes {
|
||||||
|
actualVolumeMap[v.Id] = v
|
||||||
|
}
|
||||||
|
for vid, _ := range dn.volumes {
|
||||||
|
glog.V(2).Infoln("Checking volume id:", vid)
|
||||||
|
if _, ok := actualVolumeMap[vid]; !ok {
|
||||||
|
glog.V(0).Infoln("Deleting volume id:", vid)
|
||||||
|
delete(dn.volumes, vid)
|
||||||
|
dn.UpAdjustVolumeCountDelta(-1)
|
||||||
|
dn.UpAdjustActiveVolumeCountDelta(-1)
|
||||||
|
}
|
||||||
|
} //TODO: adjust max volume id, if need to reclaim volume ids
|
||||||
|
for _, v := range actualVolumes {
|
||||||
|
dn.AddOrUpdateVolume(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
func (dn *DataNode) GetDataCenter() *DataCenter {
|
func (dn *DataNode) GetDataCenter() *DataCenter {
|
||||||
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
|
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,15 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla
|
||||||
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
|
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
|
||||||
|
collection, ok = t.collectionMap[collectionName]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Topology) DeleteCollection(collectionName string) {
|
||||||
|
delete(t.collectionMap, collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
|
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
|
||||||
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
|
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
|
||||||
}
|
}
|
||||||
|
@ -112,8 +121,8 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo,
|
||||||
t.UnRegisterDataNode(dn)
|
t.UnRegisterDataNode(dn)
|
||||||
}
|
}
|
||||||
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
|
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
|
||||||
|
dn.UpdateVolumes(volumeInfos)
|
||||||
for _, v := range volumeInfos {
|
for _, v := range volumeInfos {
|
||||||
dn.AddOrUpdateVolume(v)
|
|
||||||
t.RegisterVolumeLayout(&v, dn)
|
t.RegisterVolumeLayout(&v, dn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,10 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, dn := range vl.vid2location[volumeInfo.Id].list {
|
for _, dn := range vl.vid2location[volumeInfo.Id].list {
|
||||||
|
if !volumeInfo.ReadOnly {
|
||||||
dn.UpAdjustActiveVolumeCountDelta(-1)
|
dn.UpAdjustActiveVolumeCountDelta(-1)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
|
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// mapping from volume to its locations, inverted from server to volume
|
||||||
type VolumeLayout struct {
|
type VolumeLayout struct {
|
||||||
rp *storage.ReplicaPlacement
|
rp *storage.ReplicaPlacement
|
||||||
vid2location map[storage.VolumeId]*VolumeLocationList
|
vid2location map[storage.VolumeId]*VolumeLocationList
|
||||||
|
@ -56,6 +57,13 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
|
||||||
|
for _, location := range vl.vid2location {
|
||||||
|
nodes = append(nodes, location.list...)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
|
func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
|
||||||
len_writers := len(vl.writables)
|
len_writers := len(vl.writables)
|
||||||
if len_writers <= 0 {
|
if len_writers <= 0 {
|
||||||
|
@ -134,12 +142,14 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
|
||||||
vl.accessLock.Lock()
|
vl.accessLock.Lock()
|
||||||
defer vl.accessLock.Unlock()
|
defer vl.accessLock.Unlock()
|
||||||
|
|
||||||
if vl.vid2location[vid].Remove(dn) {
|
if location, ok := vl.vid2location[vid]; ok {
|
||||||
if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() {
|
if location.Remove(dn) {
|
||||||
glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount())
|
if location.Length() < vl.rp.GetCopyCount() {
|
||||||
|
glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
|
||||||
return vl.removeFromWritable(vid)
|
return vl.removeFromWritable(vid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
|
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
|
||||||
|
|
|
@ -60,6 +60,7 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
|
||||||
r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
|
r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
|
||||||
r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
|
r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
|
||||||
r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
|
r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
|
||||||
|
r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler)))
|
||||||
r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
|
r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
|
||||||
r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
|
r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
|
||||||
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
|
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
|
||||||
|
|
|
@ -2,6 +2,7 @@ package weed_server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"code.google.com/p/weed-fs/go/storage"
|
"code.google.com/p/weed-fs/go/storage"
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -78,6 +79,22 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
collection, ok := ms.topo.GetCollection(r.FormValue("collection"))
|
||||||
|
if !ok {
|
||||||
|
writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, server := range collection.ListVolumeServers() {
|
||||||
|
_, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
|
||||||
|
if err != nil {
|
||||||
|
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ms.topo.DeleteCollection(r.FormValue("collection"))
|
||||||
|
}
|
||||||
|
|
||||||
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
init := r.FormValue("init") == "true"
|
init := r.FormValue("init") == "true"
|
||||||
ip := r.FormValue("ip")
|
ip := r.FormValue("ip")
|
||||||
|
|
|
@ -39,6 +39,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
|
||||||
r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler))
|
r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler))
|
||||||
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
|
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
|
||||||
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
|
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
|
||||||
|
r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
|
||||||
r.HandleFunc("/", vs.storeHandler)
|
r.HandleFunc("/", vs.storeHandler)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
|
@ -29,6 +29,19 @@ func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Reque
|
||||||
}
|
}
|
||||||
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
|
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
|
||||||
}
|
}
|
||||||
|
func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if "benchmark" != r.FormValue("collection") {
|
||||||
|
glog.V(0).Infoln("deleting collection =", r.FormValue("collection"), "!!!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := vs.store.DeleteCollection(r.FormValue("collection"))
|
||||||
|
if err == nil {
|
||||||
|
writeJsonQuiet(w, r, map[string]string{"error": ""})
|
||||||
|
} else {
|
||||||
|
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
|
||||||
|
}
|
||||||
|
glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
|
||||||
|
}
|
||||||
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
|
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
Loading…
Reference in a new issue