add "freeze" subcommand to volume

This commit is contained in:
Tamás Gulácsi 2013-01-14 21:42:35 +01:00
parent dd685fdd8d
commit f262fed197
7 changed files with 144 additions and 16 deletions

View file

@ -1,6 +1,7 @@
package main
import (
"errors"
"log"
"os"
"path"
@ -33,24 +34,36 @@ func runFix(cmd *Command, args []string) bool {
}
fileName := strconv.Itoa(*volumeId)
dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644)
if err := createIndexFile(path.Join(*dir, fileName+".dat")); err != nil {
log.Fatalf("[ERROR] " + err.Error())
}
return true
}
func createIndexFile(datafn string) error {
dataFile, e := os.OpenFile(datafn, os.O_RDONLY, 0644)
if e != nil {
log.Fatalf("Read Volume [ERROR] %s\n", e)
return errors.New("Read Volume " + e.Error())
}
defer dataFile.Close()
indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
// log.Printf("dataFile=%s", dataFile)
indexFile, ie := os.OpenFile(datafn[:len(datafn)-4]+".idx", os.O_WRONLY|os.O_CREATE, 0644)
if ie != nil {
log.Fatalf("Create Volume Index [ERROR] %s\n", ie)
return errors.New("Create Volume Index " + ie.Error())
}
defer indexFile.Close()
dataFile.Seek(0, 0)
header := make([]byte, storage.SuperBlockSize)
if _, e := dataFile.Read(header); e != nil {
log.Fatalf("cannot read superblock: %s", e)
return errors.New("cannot read superblock: " + e.Error())
}
ver, _, _ := storage.ParseSuperBlock(header)
ver, _, e := storage.ParseSuperBlock(header)
if e != nil {
return errors.New("cannot parse superblock: " + e.Error())
}
n, rest := storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1)
@ -66,5 +79,5 @@ func runFix(cmd *Command, args []string) bool {
n, rest = storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1)
}
return true
return nil
}

View file

@ -21,6 +21,7 @@ var server *string
var commands = []*Command{
cmdFix,
cmdFreeze,
cmdMaster,
cmdUpload,
cmdShell,

View file

@ -28,7 +28,7 @@ func NewCdbMap(filename string) (*CdbMap, error) {
// writes the content of the index file to a CDB and returns that
func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
nm := indexFile.Name()
nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb"
nm = nm[:strings.LastIndex(nm, ".")+1] + "cdb"
var (
key uint64
@ -52,12 +52,14 @@ func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
return nil, err
}
log.Printf("deleted: %s\nnm=%s", deleted, nm)
w, err := cdb.NewWriter(nm)
if err != nil {
return nil, err
}
iterFun := func(buf []byte) error {
key = util.BytesToUint64(buf[:8])
log.Printf("iter key=%d", key)
if _, ok = deleted[key]; !ok {
w.PutPair(buf[:8], buf[8:16])
}
@ -69,6 +71,9 @@ func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
if err != nil {
return nil, err
}
if err = util.SetFilePerm(nil, nm, 0444, -1); err != nil {
return nil, err
}
return NewCdbMap(nm)
}

View file

@ -6,6 +6,7 @@ import (
"log"
"os"
"pkg/util"
"strings"
)
type NeedleMap struct {
@ -50,10 +51,40 @@ func NewNeedleMap(file *os.File) *NeedleMap {
}
// Nes frozen (on-disk, not modifiable(!)) needle map
func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
fm, err := NewCdbMapFromIndex(file)
if err != nil {
return nil, err
func NewFrozenNeedleMap(fileName string) (*NeedleMap, error) {
if strings.HasSuffix(fileName, ".dat") {
fileName = fileName[:4]
}
var (
fm *CdbMap
indexExists bool
)
file, err := os.Open(fileName + ".idx")
if err != nil && os.IsNotExist(err) {
if fm, err = NewCdbMap(fileName + ".cdb"); err != nil {
log.Printf("error opening %s.cdb: %s", fileName, err)
fm = nil
} else {
if dstat, e := os.Stat(fileName + ".dat"); e == nil {
if cstat, e := os.Stat(fileName + ".cdb"); e == nil {
if cstat.ModTime().Before(dstat.ModTime()) {
return nil, errors.New("CDB file " + fileName +
".cdb is older than data file " + fileName + ".dat!")
}
}
}
}
} else {
indexExists = true
}
if fm == nil {
fm, err = NewCdbMapFromIndex(file)
if err != nil {
return nil, err
}
if indexExists {
os.Remove(fileName + ".idx")
}
}
return &NeedleMap{
fm: fm,

View file

@ -33,6 +33,8 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
return
}
// adds a volume to the store
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
rt, e := NewReplicationTypeFromString(replicationType)
if e != nil {
@ -74,6 +76,7 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err er
return err
}
// checks whether compaction is needed
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@ -85,6 +88,8 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
}
return nil, garbageThreshold < s.volumes[vid].garbageLevel()
}
// compacts the volume
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@ -92,6 +97,8 @@ func (s *Store) CompactVolume(volumeIdString string) error {
}
return s.volumes[vid].compact()
}
// commits the compaction
func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@ -99,6 +106,8 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
}
return s.volumes[vid].commitCompact()
}
// reads directory and loads volumes
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {

View file

@ -6,6 +6,7 @@ import (
"log"
"os"
"path"
"pkg/util"
"sync"
)
@ -45,13 +46,18 @@ func (v *Volume) load() error {
v.maybeWriteSuperBlock()
}
// TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
if !util.FileIsWritable(v.dataFile.Name()) { //Read-Only
v.nm, e = NewFrozenNeedleMap(fileName)
} else {
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
}
v.nm, e = LoadNeedleMap(indexFile)
}
v.nm, e = LoadNeedleMap(indexFile)
return e
}
func (v *Volume) Version() Version {
return v.version
}

View file

@ -0,0 +1,63 @@
package util
import (
"errors"
"log"
"os"
)
// sets file (fh if not nil, otherwise fileName) permission to mask
// it will
// AND with the permission iff direction < 0
// OR with the permission iff direction > 0
// otherwise it will SET the permission to the mask
func SetFilePerm(fh *os.File, fileName string, mask os.FileMode, direction int8) (err error) {
var stat os.FileInfo
if fh == nil {
stat, err = os.Stat(fileName)
} else {
stat, err = fh.Stat()
}
if err != nil {
return err
}
mode := stat.Mode() & ^os.ModePerm
// log.Printf("mode1=%d mask=%d", mode, mask)
if direction == 0 {
mode |= mask
} else if direction > 0 {
mode |= stat.Mode().Perm() | mask
} else {
mode |= stat.Mode().Perm() & mask
}
log.Printf("pmode=%d operm=%d => nmode=%d nperm=%d",
stat.Mode(), stat.Mode()&os.ModePerm,
mode, mode&os.ModePerm)
if mode == 0 {
return errors.New("Zero FileMode")
}
if fh == nil {
err = os.Chmod(fileName, mode)
} else {
err = fh.Chmod(mode)
}
return err
}
// returns whether the filename exists - errors doesn't mean not exists!
func FileExists(fileName string) bool {
if _, e := os.Stat(fileName); e != nil && os.IsNotExist(e) {
return false
}
return true
}
// returns whether the filename is POSSIBLY writable
//- whether it has some kind of writable bit set
func FileIsWritable(fileName string) bool {
if stat, e := os.Stat(fileName); e == nil {
return stat.Mode().Perm()&0222 > 0
}
return false
}