mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
merge changes from about dealing with read only volumes.
97482255d5
.diff
This commit is contained in:
parent
4d8ce2fe26
commit
a4369b35a7
|
@ -80,7 +80,11 @@ func setup(topologyLayout string) *topology.Topology {
|
||||||
fmt.Println("data:", data)
|
fmt.Println("data:", data)
|
||||||
|
|
||||||
//need to connect all nodes first before server adding volumes
|
//need to connect all nodes first before server adding volumes
|
||||||
topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
|
topo, err := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf",
|
||||||
|
"/tmp", "testing", 32*1024, 5)
|
||||||
|
if err != nil {
|
||||||
|
panic("error: " + err.Error())
|
||||||
|
}
|
||||||
mTopology := data.(map[string]interface{})
|
mTopology := data.(map[string]interface{})
|
||||||
for dcKey, dcValue := range mTopology {
|
for dcKey, dcValue := range mTopology {
|
||||||
dc := topology.NewDataCenter(dcKey)
|
dc := topology.NewDataCenter(dcKey)
|
||||||
|
|
|
@ -120,9 +120,10 @@ func (s *Store) loadExistingVolumes() {
|
||||||
func (s *Store) Status() []*VolumeInfo {
|
func (s *Store) Status() []*VolumeInfo {
|
||||||
var stats []*VolumeInfo
|
var stats []*VolumeInfo
|
||||||
for k, v := range s.volumes {
|
for k, v := range s.volumes {
|
||||||
s := new(VolumeInfo)
|
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
|
||||||
s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
|
RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
|
||||||
VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
|
DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
|
||||||
|
ReadOnly: v.readOnly}
|
||||||
stats = append(stats, s)
|
stats = append(stats, s)
|
||||||
}
|
}
|
||||||
return stats
|
return stats
|
||||||
|
@ -138,9 +139,10 @@ func (s *Store) SetMaster(mserver string) {
|
||||||
func (s *Store) Join() error {
|
func (s *Store) Join() error {
|
||||||
stats := new([]*VolumeInfo)
|
stats := new([]*VolumeInfo)
|
||||||
for k, v := range s.volumes {
|
for k, v := range s.volumes {
|
||||||
s := new(VolumeInfo)
|
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
|
||||||
s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
|
RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
|
||||||
VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
|
DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
|
||||||
|
ReadOnly: v.readOnly}
|
||||||
*stats = append(*stats, s)
|
*stats = append(*stats, s)
|
||||||
}
|
}
|
||||||
bytes, _ := json.Marshal(stats)
|
bytes, _ := json.Marshal(stats)
|
||||||
|
@ -171,7 +173,7 @@ func (s *Store) Close() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
||||||
if v := s.volumes[i]; v != nil {
|
if v := s.volumes[i]; v != nil && !v.readOnly {
|
||||||
size, err = v.write(n)
|
size, err = v.write(n)
|
||||||
if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
|
if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
|
||||||
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
|
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
|
||||||
|
@ -185,7 +187,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
|
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
|
||||||
if v := s.volumes[i]; v != nil {
|
if v := s.volumes[i]; v != nil && !v.readOnly {
|
||||||
return v.delete(n)
|
return v.delete(n)
|
||||||
}
|
}
|
||||||
return 0, nil
|
return 0, nil
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -30,6 +31,7 @@ type Volume struct {
|
||||||
dir string
|
dir string
|
||||||
dataFile *os.File
|
dataFile *os.File
|
||||||
nm *NeedleMap
|
nm *NeedleMap
|
||||||
|
readOnly bool
|
||||||
|
|
||||||
SuperBlock
|
SuperBlock
|
||||||
|
|
||||||
|
@ -53,8 +55,15 @@ func (v *Volume) load(alsoLoadIndex bool) error {
|
||||||
fileName := path.Join(v.dir, v.Id.String())
|
fileName := path.Join(v.dir, v.Id.String())
|
||||||
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
|
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
|
if !os.IsPermission(e) {
|
||||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||||
}
|
}
|
||||||
|
if v.dataFile, e = os.Open(fileName + ".dat"); e != nil {
|
||||||
|
return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e)
|
||||||
|
}
|
||||||
|
log.Printf("opening " + fileName + ".dat in READONLY mode")
|
||||||
|
v.readOnly = true
|
||||||
|
}
|
||||||
if v.ReplicaType == CopyNil {
|
if v.ReplicaType == CopyNil {
|
||||||
e = v.readSuperBlock()
|
e = v.readSuperBlock()
|
||||||
} else {
|
} else {
|
||||||
|
@ -97,6 +106,14 @@ func (v *Volume) maybeWriteSuperBlock() error {
|
||||||
if stat.Size() == 0 {
|
if stat.Size() == 0 {
|
||||||
v.SuperBlock.Version = CurrentVersion
|
v.SuperBlock.Version = CurrentVersion
|
||||||
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
|
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
|
||||||
|
if e != nil && os.IsPermission(e) {
|
||||||
|
//read-only, but zero length - recreate it!
|
||||||
|
if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
|
||||||
|
if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
|
||||||
|
v.readOnly = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
@ -123,6 +140,10 @@ func (v *Volume) NeedToReplicate() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Volume) write(n *Needle) (size uint32, err error) {
|
func (v *Volume) write(n *Needle) (size uint32, err error) {
|
||||||
|
if v.readOnly {
|
||||||
|
err = fmt.Errorf("%s is read-only", v.dataFile)
|
||||||
|
return
|
||||||
|
}
|
||||||
v.accessLock.Lock()
|
v.accessLock.Lock()
|
||||||
defer v.accessLock.Unlock()
|
defer v.accessLock.Unlock()
|
||||||
var offset int64
|
var offset int64
|
||||||
|
@ -142,6 +163,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
func (v *Volume) delete(n *Needle) (uint32, error) {
|
func (v *Volume) delete(n *Needle) (uint32, error) {
|
||||||
|
if v.readOnly {
|
||||||
|
return 0, fmt.Errorf("%s is read-only", v.dataFile)
|
||||||
|
}
|
||||||
v.accessLock.Lock()
|
v.accessLock.Lock()
|
||||||
defer v.accessLock.Unlock()
|
defer v.accessLock.Unlock()
|
||||||
nv, ok := v.nm.Get(n.Id)
|
nv, ok := v.nm.Get(n.Id)
|
||||||
|
|
|
@ -10,4 +10,5 @@ type VolumeInfo struct {
|
||||||
FileCount int
|
FileCount int
|
||||||
DeleteCount int
|
DeleteCount int
|
||||||
DeletedByteCount uint64
|
DeletedByteCount uint64
|
||||||
|
ReadOnly bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
||||||
return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
|
return uint64(v.Size) < vl.volumeSizeLimit &&
|
||||||
|
v.Version == storage.CurrentVersion &&
|
||||||
|
!v.ReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
||||||
|
|
Loading…
Reference in a new issue