diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index e86c33bda..bb2791622 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -58,6 +58,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/volume/mount", vs.guard.WhiteList(vs.getVolumeMountHandler)) + adminMux.HandleFunc("/admin/volume/unmount", vs.guard.WhiteList(vs.getVolumeUnmountHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 8a2e30743..438f17b25 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -68,20 +68,51 @@ func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *ht w.Write(content) } -func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { +func (vs *VolumeServer) getVolumeId(volumeParameterName string, r *http.Request) (storage.VolumeId, error) { volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) - return nil, err + return 0, err } + vid, err := storage.NewVolumeId(volumeIdString) if err != nil { err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + return 0, err + } + + return vid, err +} + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + vid, err := vs.getVolumeId(volumeParameterName, r) + if err != nil { return nil, err } v := vs.store.GetVolume(vid) if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + return nil, fmt.Errorf("Not Found Volume Id %d", vid) } return v, nil } + +func (vs *VolumeServer) getVolumeMountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.MountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume mounted") +} + +func (vs *VolumeServer) getVolumeUnmountHandler(w http.ResponseWriter, r *http.Request) { + vid, err := vs.getVolumeId("volume", r) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + vs.store.UnmountVolume(vid) + writeJsonQuiet(w, r, http.StatusOK, "Volume unmounted") +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 496e0dd57..8f5527d30 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" ) type DiskLocation struct { @@ -22,16 +23,27 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { return location } -func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { +func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { collection := "" - base := name[:len(name)-len(".dat")] + base := name[:len(name) - len(".dat")] i := strings.LastIndex(base, "_") if i > 0 { - collection, base = base[0:i], base[i+1:] + collection, base = base[0:i], base[i + 1:] } - if vid, err := NewVolumeId(base); err == nil { + vol, err := NewVolumeId(base); + return vol, collection, err + } + + return 0, "", fmt.Errorf("Path is not a volume: %s", name) +} + +func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + vid, collection, err := l.volumeIdFromPath(dir) + if err == nil { mutex.RLock() _, found := l.volumes[vid] mutex.RUnlock() @@ -125,6 +137,30 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { return } +func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool { + if dirs, err := ioutil.ReadDir(l.Directory); err == nil { + for _, dir := range dirs { + volId, _, err := l.volumeIdFromPath(dir) + if vid == volId && err == nil { + var mutex sync.RWMutex + l.loadExistingVolume(dir, needleMapKind, &mutex) + return true + } + } + } + + return false +} + +func (l *DiskLocation) UnloadVolume(vid VolumeId) (e error) { + _, ok := l.volumes[vid] + if !ok { + return fmt.Errorf("Volume not loaded, VolumeId: %d", vid) + } + delete(l.volumes, vid) + return nil +} + func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { l.Lock() defer l.Unlock() diff --git a/weed/storage/store.go b/weed/storage/store.go index f70ebe0ee..045b48220 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -77,6 +77,7 @@ type Store struct { connected bool VolumeSizeLimit uint64 //read from the master Client pb.Seaweed_SendHeartbeatClient + NeedleMapType NeedleMapType } func (s *Store) String() (str string) { @@ -85,7 +86,7 @@ func (s *Store) String() (str string) { } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) @@ -261,6 +262,7 @@ func (s *Store) Close() { location.Close() } } + func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { @@ -275,11 +277,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit) - if s.Client != nil { - if e := s.Client.Send(s.CollectHeartbeat()); e != nil { - glog.V(0).Infoln("error when reporting size:", e) - } - } + s.updateMaster() } return } @@ -287,17 +285,27 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { err = fmt.Errorf("Volume %d not found!", i) return } + +func (s *Store) updateMaster() { + if s.Client != nil { + if e := s.Client.Send(s.CollectHeartbeat()); e != nil { + glog.V(0).Infoln("error when reporting size:", e) + } + } +} + func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } return 0, nil } + func (s *Store) ReadVolumeNeedle(i VolumeId, n *Needle) (int, error) { if v := s.findVolume(i); v != nil { return v.readNeedle(n) } - return 0, fmt.Errorf("Volume %v not found!", i) + return 0, fmt.Errorf("Volume %d not found!", i) } func (s *Store) GetVolume(i VolumeId) *Volume { return s.findVolume(i) @@ -307,3 +315,26 @@ func (s *Store) HasVolume(i VolumeId) bool { v := s.findVolume(i) return v != nil } + +func (s *Store) MountVolume(i VolumeId) error { + for _, location := range s.Locations { + if found := location.LoadVolume(i, s.NeedleMapType); found == true { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} + +func (s *Store) UnmountVolume(i VolumeId) error { + for _, location := range s.Locations { + if err := location.UnloadVolume(i); err == nil { + s.updateMaster() + return nil + } + } + + return fmt.Errorf("Volume %d not found on disk", i) +} +