mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge remote-tracking branch 'choose_remote_name/cdb'
This commit is contained in:
commit
be83a56bb9
96
weed-fs/src/cmd/dump/main.go
Normal file
96
weed-fs/src/cmd/dump/main.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
// Copyright Tamás Gulácsi 2013 All rights reserved
|
||||
// Use of this source is governed by the same rules as the weed-fs library.
|
||||
// If this would be ambigous, than Apache License 2.0 has to be used.
|
||||
//
|
||||
// dump dumps the files of a volume to tar or unique files.
|
||||
// Each file will have id#mimetype#original_name file format
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
// "io"
|
||||
"log"
|
||||
"os"
|
||||
"pkg/storage"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
volumePath = flag.String("dir", "/tmp", "volume directory")
|
||||
volumeId = flag.Int("id", 0, "volume Id")
|
||||
dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.")
|
||||
tarFh *tar.Writer
|
||||
tarHeader tar.Header
|
||||
counter int
|
||||
)
|
||||
|
||||
func main() {
|
||||
var err error
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *dest == "-" {
|
||||
*dest = ""
|
||||
}
|
||||
if *dest == "" || strings.HasSuffix(*dest, ".tar") {
|
||||
var fh *os.File
|
||||
if *dest == "" {
|
||||
fh = os.Stdout
|
||||
} else {
|
||||
if fh, err = os.Create(*dest); err != nil {
|
||||
log.Printf("cannot open output tar %s: %s", *dest, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
defer fh.Close()
|
||||
tarFh = tar.NewWriter(fh)
|
||||
defer tarFh.Close()
|
||||
t := time.Now()
|
||||
tarHeader = tar.Header{Mode: 0644,
|
||||
ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
|
||||
Typeflag: tar.TypeReg,
|
||||
AccessTime: t, ChangeTime: t}
|
||||
}
|
||||
|
||||
v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil)
|
||||
if v == nil || v.Version() == 0 || err != nil {
|
||||
log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err)
|
||||
return
|
||||
}
|
||||
log.Printf("volume: %s (ver. %d)", v, v.Version())
|
||||
if err := v.WalkValues(walker); err != nil {
|
||||
log.Printf("error while walking: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("%d files written.", counter)
|
||||
}
|
||||
|
||||
func walker(n *storage.Needle) (err error) {
|
||||
// log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime)
|
||||
nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name)
|
||||
// log.Print(nm)
|
||||
if tarFh != nil {
|
||||
tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
|
||||
if err = tarFh.WriteHeader(&tarHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tarFh.Write(n.Data)
|
||||
} else {
|
||||
if fh, e := os.Create(*dest + "/" + nm); e != nil {
|
||||
return e
|
||||
} else {
|
||||
defer fh.Close()
|
||||
_, err = fh.Write(n.Data)
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
counter++
|
||||
}
|
||||
return
|
||||
}
|
|
@ -24,7 +24,6 @@ type Command struct {
|
|||
|
||||
// Flag is a set of flags specific to this command.
|
||||
Flag flag.FlagSet
|
||||
|
||||
}
|
||||
|
||||
// Name returns the command's name: the first word in the usage line.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -33,24 +34,36 @@ func runFix(cmd *Command, args []string) bool {
|
|||
}
|
||||
|
||||
fileName := strconv.Itoa(*volumeId)
|
||||
dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644)
|
||||
|
||||
if err := createIndexFile(path.Join(*dir, fileName+".dat")); err != nil {
|
||||
log.Fatalf("[ERROR] " + err.Error())
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func createIndexFile(datafn string) error {
|
||||
dataFile, e := os.OpenFile(datafn, os.O_RDONLY, 0644)
|
||||
if e != nil {
|
||||
log.Fatalf("Read Volume [ERROR] %s\n", e)
|
||||
return errors.New("Read Volume " + e.Error())
|
||||
}
|
||||
defer dataFile.Close()
|
||||
indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
|
||||
// log.Printf("dataFile=%s", dataFile)
|
||||
indexFile, ie := os.OpenFile(datafn[:len(datafn)-4]+".idx", os.O_WRONLY|os.O_CREATE, 0644)
|
||||
if ie != nil {
|
||||
log.Fatalf("Create Volume Index [ERROR] %s\n", ie)
|
||||
return errors.New("Create Volume Index " + ie.Error())
|
||||
}
|
||||
defer indexFile.Close()
|
||||
|
||||
dataFile.Seek(0, 0)
|
||||
header := make([]byte, storage.SuperBlockSize)
|
||||
if _, e := dataFile.Read(header); e != nil {
|
||||
log.Fatalf("cannot read superblock: %s", e)
|
||||
return errors.New("cannot read superblock: " + e.Error())
|
||||
}
|
||||
|
||||
ver, _, _ := storage.ParseSuperBlock(header)
|
||||
ver, _, e := storage.ParseSuperBlock(header)
|
||||
if e != nil {
|
||||
return errors.New("cannot parse superblock: " + e.Error())
|
||||
}
|
||||
|
||||
n, rest := storage.ReadNeedleHeader(dataFile, ver)
|
||||
dataFile.Seek(int64(rest), 1)
|
||||
|
@ -66,5 +79,5 @@ func runFix(cmd *Command, args []string) bool {
|
|||
n, rest = storage.ReadNeedleHeader(dataFile, ver)
|
||||
dataFile.Seek(int64(rest), 1)
|
||||
}
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
|||
if ip == "" {
|
||||
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
|
||||
}
|
||||
port, _ := strconv.Atoi(r.FormValue("port"))
|
||||
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
|
||||
port, err := strconv.Atoi(r.FormValue("port"))
|
||||
if err != nil {
|
||||
log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err)
|
||||
}
|
||||
maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount"))
|
||||
if err != nil {
|
||||
log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err)
|
||||
}
|
||||
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
|
||||
publicUrl := r.FormValue("publicUrl")
|
||||
volumes := new([]storage.VolumeInfo)
|
||||
|
|
|
@ -2,8 +2,8 @@ package main
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -18,8 +18,7 @@ var cmdShell = &Command{
|
|||
`,
|
||||
}
|
||||
|
||||
var (
|
||||
)
|
||||
var ()
|
||||
|
||||
func runShell(command *Command, args []string) bool {
|
||||
r := bufio.NewReader(os.Stdin)
|
||||
|
@ -28,11 +27,11 @@ func runShell(command *Command, args []string) bool {
|
|||
prompt := func() {
|
||||
o.WriteString("> ")
|
||||
o.Flush()
|
||||
};
|
||||
}
|
||||
readLine := func() string {
|
||||
ret, err := r.ReadString('\n')
|
||||
if err != nil {
|
||||
fmt.Fprint(e,err);
|
||||
fmt.Fprint(e, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return ret
|
||||
|
|
|
@ -156,7 +156,9 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
|
|||
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
|
||||
w.Header().Set("Content-Encoding", "gzip")
|
||||
} else {
|
||||
n.Data = storage.UnGzipData(n.Data)
|
||||
if n.Data, err = storage.UnGzipData(n.Data); err != nil {
|
||||
debug("lookup error:", err, r.URL.Path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ var server *string
|
|||
|
||||
var commands = []*Command{
|
||||
cmdFix,
|
||||
cmdFreeze,
|
||||
cmdMaster,
|
||||
cmdUpload,
|
||||
cmdShell,
|
||||
|
|
|
@ -3,8 +3,8 @@ package directory
|
|||
import (
|
||||
"encoding/hex"
|
||||
"pkg/storage"
|
||||
"strings"
|
||||
"pkg/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type FileId struct {
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package operation
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"log"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func Delete(url string) error {
|
||||
|
|
|
@ -2,11 +2,11 @@ package operation
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
_ "fmt"
|
||||
"net/url"
|
||||
"pkg/storage"
|
||||
"pkg/util"
|
||||
_ "fmt"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Location struct {
|
||||
|
|
|
@ -3,13 +3,13 @@ package operation
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
_ "fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type UploadResult struct {
|
||||
|
|
|
@ -127,4 +127,3 @@ func TestReserveOneVolume(t *testing.T) {
|
|||
t.Log("reserved", c)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,10 @@ package sequence
|
|||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"log"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
112
weed-fs/src/pkg/storage/cdb_map.go
Normal file
112
weed-fs/src/pkg/storage/cdb_map.go
Normal file
|
@ -0,0 +1,112 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"github.com/tgulacsi/go-cdb"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"pkg/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type CdbMap struct {
|
||||
db *cdb.Cdb
|
||||
transient []byte
|
||||
Filename string
|
||||
}
|
||||
|
||||
// Opens the CDB file and servers as a needle map
|
||||
func NewCdbMap(filename string) (*CdbMap, error) {
|
||||
m, err := cdb.Open(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CdbMap{db: m, transient: make([]byte, 8),
|
||||
Filename: filename}, nil
|
||||
}
|
||||
|
||||
// writes the content of the index file to a CDB and returns that
|
||||
func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
|
||||
nm := indexFile.Name()
|
||||
nm = nm[:strings.LastIndex(nm, ".")+1] + "cdb"
|
||||
|
||||
var (
|
||||
key uint64
|
||||
offset uint32
|
||||
ok bool
|
||||
)
|
||||
deleted := make(map[uint64]bool, 16)
|
||||
gatherDeletes := func(buf []byte) error {
|
||||
key = util.BytesToUint64(buf[:8])
|
||||
offset = util.BytesToUint32(buf[8:12])
|
||||
if offset > 0 {
|
||||
if _, ok = deleted[key]; ok { //undelete
|
||||
delete(deleted, key)
|
||||
}
|
||||
} else {
|
||||
deleted[key] = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := readIndexFile(indexFile, gatherDeletes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Printf("deleted: %s\nnm=%s", deleted, nm)
|
||||
w, err := cdb.NewWriter(nm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iterFun := func(buf []byte) error {
|
||||
key = util.BytesToUint64(buf[:8])
|
||||
log.Printf("iter key=%d", key)
|
||||
if _, ok = deleted[key]; !ok {
|
||||
w.PutPair(buf[:8], buf[8:16])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
indexFile.Seek(0, 0)
|
||||
err = readIndexFile(indexFile, iterFun)
|
||||
w.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = util.SetFilePerm(nil, nm, 0444, -1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewCdbMap(nm)
|
||||
}
|
||||
|
||||
func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) {
|
||||
util.Uint64toBytes(m.transient, uint64(key))
|
||||
data, err := m.db.Data(m.transient)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil, false
|
||||
}
|
||||
log.Printf("error getting %s: %s", key, err)
|
||||
return nil, false
|
||||
}
|
||||
return &NeedleValue{Key: key,
|
||||
Offset: util.BytesToUint32(data[:4]),
|
||||
Size: util.BytesToUint32(data[4:8]),
|
||||
}, true
|
||||
}
|
||||
|
||||
func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
||||
r, err := os.Open(m.Filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
iterFunc := func(elt cdb.Element) error {
|
||||
return pedestrian(&NeedleValue{
|
||||
Key: Key(util.BytesToUint64(elt.Key[:8])),
|
||||
Offset: util.BytesToUint32(elt.Data[:4]),
|
||||
Size: util.BytesToUint32(elt.Data[4:8]),
|
||||
})
|
||||
}
|
||||
return cdb.DumpMap(r, iterFunc)
|
||||
}
|
|
@ -109,8 +109,8 @@ type CompactMap struct {
|
|||
list []CompactSection
|
||||
}
|
||||
|
||||
func NewCompactMap() CompactMap {
|
||||
return CompactMap{}
|
||||
func NewCompactMap() *CompactMap {
|
||||
return &CompactMap{}
|
||||
}
|
||||
|
||||
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
|
||||
|
@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// iterate over the keys by calling iterate on each key till error is returned
|
||||
func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
||||
var i int
|
||||
for _, cs := range cm.list {
|
||||
for key := cs.start; key < cs.end; key++ {
|
||||
if i = cs.binarySearchValues(key); i >= 0 {
|
||||
if err = pedestrian(&cs.values[i]); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, val := range cs.overflow {
|
||||
if err = pedestrian(val); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"log"
|
||||
"os"
|
||||
"pkg/util"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMemoryUsage(t *testing.T) {
|
||||
|
|
|
@ -15,49 +15,35 @@ func IsGzippable(ext, mtype string) bool {
|
|||
if strings.HasPrefix(mtype, "text/") {
|
||||
return true
|
||||
}
|
||||
if ext == ".zip" {
|
||||
switch ext {
|
||||
case ".zip", ".rar", ".gz", ".bz2", ".xz":
|
||||
return false
|
||||
}
|
||||
if ext == ".rar" {
|
||||
return false
|
||||
}
|
||||
if ext == ".gz" {
|
||||
return false
|
||||
}
|
||||
if ext == ".pdf" {
|
||||
return true
|
||||
}
|
||||
if ext == ".css" {
|
||||
return true
|
||||
}
|
||||
if ext == ".js" {
|
||||
return true
|
||||
}
|
||||
if ext == ".json" {
|
||||
case ".pdf", ".txt", ".html", ".css", ".js", ".json":
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(mtype, "application/") {
|
||||
if strings.HasSuffix(mtype, "xml") {
|
||||
return true
|
||||
}
|
||||
if strings.HasSuffix(mtype, "script") {
|
||||
if strings.HasSuffix(mtype, "xml") ||
|
||||
strings.HasSuffix(mtype, "script") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
func GzipData(input []byte) []byte {
|
||||
|
||||
func GzipData(input []byte) ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
|
||||
if _, err := w.Write(input); err != nil {
|
||||
println("error compressing data:", err)
|
||||
return nil, err
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
println("error closing compressed data:", err)
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes()
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
func UnGzipData(input []byte) []byte {
|
||||
func UnGzipData(input []byte) ([]byte, error) {
|
||||
buf := bytes.NewBuffer(input)
|
||||
r, _ := gzip.NewReader(buf)
|
||||
defer r.Close()
|
||||
|
@ -65,5 +51,5 @@ func UnGzipData(input []byte) []byte {
|
|||
if err != nil {
|
||||
println("error uncompressing data:", err)
|
||||
}
|
||||
return output
|
||||
return output, err
|
||||
}
|
||||
|
|
|
@ -64,7 +64,9 @@ func NewNeedle(r *http.Request) (n *Needle, fname string, e error) {
|
|||
mtype = contentType
|
||||
}
|
||||
if IsGzippable(ext, mtype) {
|
||||
data = GzipData(data)
|
||||
if data, e = GzipData(data); e != nil {
|
||||
return
|
||||
}
|
||||
n.SetGzipped()
|
||||
}
|
||||
if ext == ".gz" {
|
||||
|
|
|
@ -1,14 +1,18 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"pkg/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type NeedleMap struct {
|
||||
indexFile *os.File
|
||||
m CompactMap
|
||||
m MapGetSetter // modifiable map
|
||||
fm MapGetter // frozen map
|
||||
|
||||
//transient
|
||||
bytes []byte
|
||||
|
@ -19,36 +23,100 @@ type NeedleMap struct {
|
|||
fileByteCounter uint64
|
||||
}
|
||||
|
||||
// Map interface for frozen maps
|
||||
type MapGetter interface {
|
||||
Get(key Key) (element *NeedleValue, ok bool)
|
||||
Walk(pedestrian func(*NeedleValue) error) error
|
||||
}
|
||||
|
||||
// Modifiable map interface
|
||||
type MapSetter interface {
|
||||
Set(key Key, offset, size uint32) (oldsize uint32)
|
||||
Delete(key Key) uint32
|
||||
}
|
||||
|
||||
// Settable and gettable map
|
||||
type MapGetSetter interface {
|
||||
MapGetter
|
||||
MapSetter
|
||||
}
|
||||
|
||||
// New in-memory needle map, backed by "file" index file
|
||||
func NewNeedleMap(file *os.File) *NeedleMap {
|
||||
nm := &NeedleMap{
|
||||
return &NeedleMap{
|
||||
m: NewCompactMap(),
|
||||
bytes: make([]byte, 16),
|
||||
indexFile: file,
|
||||
}
|
||||
return nm
|
||||
}
|
||||
|
||||
// Nes frozen (on-disk, not modifiable(!)) needle map
|
||||
func NewFrozenNeedleMap(fileName string) (*NeedleMap, error) {
|
||||
if strings.HasSuffix(fileName, ".dat") {
|
||||
fileName = fileName[:4]
|
||||
}
|
||||
var (
|
||||
fm *CdbMap
|
||||
indexExists bool
|
||||
)
|
||||
file, err := os.Open(fileName + ".idx")
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
if fm, err = NewCdbMap(fileName + ".cdb"); err != nil {
|
||||
log.Printf("error opening %s.cdb: %s", fileName, err)
|
||||
fm = nil
|
||||
} else {
|
||||
if dstat, e := os.Stat(fileName + ".dat"); e == nil {
|
||||
if cstat, e := os.Stat(fileName + ".cdb"); e == nil {
|
||||
if cstat.ModTime().Before(dstat.ModTime()) {
|
||||
return nil, errors.New("CDB file " + fileName +
|
||||
".cdb is older than data file " + fileName + ".dat!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
indexExists = true
|
||||
}
|
||||
if fm == nil {
|
||||
fm, err = NewCdbMapFromIndex(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if indexExists {
|
||||
os.Remove(fileName + ".idx")
|
||||
}
|
||||
}
|
||||
return &NeedleMap{
|
||||
fm: fm,
|
||||
bytes: make([]byte, 16),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (nm NeedleMap) IsFrozen() bool {
|
||||
return nm.m == nil && nm.fm != nil
|
||||
}
|
||||
|
||||
const (
|
||||
RowsToRead = 1024
|
||||
)
|
||||
|
||||
func LoadNeedleMap(file *os.File) *NeedleMap {
|
||||
var MapIsFrozen = errors.New("Map is frozen!")
|
||||
|
||||
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
|
||||
nm := NewNeedleMap(file)
|
||||
bytes := make([]byte, 16*RowsToRead)
|
||||
count, e := nm.indexFile.Read(bytes)
|
||||
if count > 0 {
|
||||
fstat, _ := file.Stat()
|
||||
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
||||
}
|
||||
for count > 0 && e == nil {
|
||||
for i := 0; i < count; i += 16 {
|
||||
key := util.BytesToUint64(bytes[i : i+8])
|
||||
offset := util.BytesToUint32(bytes[i+8 : i+12])
|
||||
size := util.BytesToUint32(bytes[i+12 : i+16])
|
||||
|
||||
var (
|
||||
key uint64
|
||||
offset, size, oldSize uint32
|
||||
)
|
||||
iterFun := func(buf []byte) error {
|
||||
key = util.BytesToUint64(buf[:8])
|
||||
offset = util.BytesToUint32(buf[8:12])
|
||||
size = util.BytesToUint32(buf[12:16])
|
||||
nm.fileCounter++
|
||||
nm.fileByteCounter = nm.fileByteCounter + uint64(size)
|
||||
if offset > 0 {
|
||||
oldSize := nm.m.Set(Key(key), offset, size)
|
||||
oldSize = nm.m.Set(Key(key), offset, size)
|
||||
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
|
||||
if oldSize > 0 {
|
||||
nm.deletionCounter++
|
||||
|
@ -60,14 +128,43 @@ func LoadNeedleMap(file *os.File) *NeedleMap {
|
|||
nm.deletionCounter++
|
||||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := readIndexFile(file, iterFun); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
count, e = nm.indexFile.Read(bytes)
|
||||
// calls iterFun with each row (raw 16 bytes)
|
||||
func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
|
||||
buf := make([]byte, 16*RowsToRead)
|
||||
count, e := io.ReadAtLeast(indexFile, buf, 16)
|
||||
if e != nil && count > 0 {
|
||||
fstat, err := indexFile.Stat()
|
||||
if err != nil {
|
||||
log.Println("ERROR stating %s: %s", indexFile, err)
|
||||
} else {
|
||||
log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
|
||||
}
|
||||
return nm
|
||||
}
|
||||
for count > 0 && e == nil {
|
||||
for i := 0; i < count; i += 16 {
|
||||
if e = iterFun(buf[i : i+16]); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
count, e = io.ReadAtLeast(indexFile, buf, 16)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
||||
if nm.IsFrozen() {
|
||||
return 0, MapIsFrozen
|
||||
}
|
||||
oldSize := nm.m.Set(Key(key), offset, size)
|
||||
util.Uint64toBytes(nm.bytes[0:8], key)
|
||||
util.Uint32toBytes(nm.bytes[8:12], offset)
|
||||
|
@ -81,16 +178,24 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
|
|||
return nm.indexFile.Write(nm.bytes)
|
||||
}
|
||||
func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
|
||||
if nm.m != nil {
|
||||
element, ok = nm.m.Get(Key(key))
|
||||
} else {
|
||||
element, ok = nm.fm.Get(Key(key))
|
||||
}
|
||||
return
|
||||
}
|
||||
func (nm *NeedleMap) Delete(key uint64) {
|
||||
func (nm *NeedleMap) Delete(key uint64) error {
|
||||
if nm.IsFrozen() {
|
||||
return MapIsFrozen
|
||||
}
|
||||
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
|
||||
util.Uint64toBytes(nm.bytes[0:8], key)
|
||||
util.Uint32toBytes(nm.bytes[8:12], 0)
|
||||
util.Uint32toBytes(nm.bytes[12:16], 0)
|
||||
nm.indexFile.Write(nm.bytes)
|
||||
nm.deletionCounter++
|
||||
return nil
|
||||
}
|
||||
func (nm *NeedleMap) Close() {
|
||||
nm.indexFile.Close()
|
||||
|
@ -98,3 +203,11 @@ func (nm *NeedleMap) Close() {
|
|||
func (nm *NeedleMap) ContentSize() uint64 {
|
||||
return nm.fileByteCounter
|
||||
}
|
||||
|
||||
// iterate through all needles using the iterator function
|
||||
func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
|
||||
if nm.m != nil {
|
||||
return nm.m.Walk(pedestrian)
|
||||
}
|
||||
return nm.fm.Walk(pedestrian)
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ package storage
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"pkg/util"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func (n *Needle) Append(w io.Writer, version Version) uint32 {
|
||||
|
@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 {
|
|||
return n.Size
|
||||
}
|
||||
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
||||
if version == Version1 {
|
||||
switch version {
|
||||
case Version1:
|
||||
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
|
||||
ret, e := r.Read(bytes)
|
||||
n.readNeedleHeader(bytes)
|
||||
|
@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
|||
return 0, errors.New("CRC error! Data On Disk Corrupted!")
|
||||
}
|
||||
return ret, e
|
||||
} else if version == Version2 {
|
||||
case Version2:
|
||||
if size == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
|
|||
}
|
||||
return ret, e
|
||||
}
|
||||
return 0, errors.New("Unsupported Version!")
|
||||
return 0, fmt.Errorf("Unsupported Version! (%d)", version)
|
||||
}
|
||||
func (n *Needle) readNeedleHeader(bytes []byte) {
|
||||
n.Cookie = util.BytesToUint32(bytes[0:4])
|
||||
|
|
|
@ -33,6 +33,8 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S
|
|||
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
|
||||
return
|
||||
}
|
||||
|
||||
// adds a volume to the store
|
||||
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
|
||||
rt, e := NewReplicationTypeFromString(replicationType)
|
||||
if e != nil {
|
||||
|
@ -65,15 +67,16 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
|
|||
}
|
||||
return e
|
||||
}
|
||||
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
|
||||
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
|
||||
if s.volumes[vid] != nil {
|
||||
return errors.New("Volume Id " + vid.String() + " already exists!")
|
||||
}
|
||||
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
|
||||
s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
|
||||
return nil
|
||||
s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
|
||||
return err
|
||||
}
|
||||
|
||||
// checks whether compaction is needed
|
||||
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
|
||||
vid, err := NewVolumeId(volumeIdString)
|
||||
if err != nil {
|
||||
|
@ -85,6 +88,8 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
|
|||
}
|
||||
return nil, garbageThreshold < s.volumes[vid].garbageLevel()
|
||||
}
|
||||
|
||||
// compacts the volume
|
||||
func (s *Store) CompactVolume(volumeIdString string) error {
|
||||
vid, err := NewVolumeId(volumeIdString)
|
||||
if err != nil {
|
||||
|
@ -92,6 +97,8 @@ func (s *Store) CompactVolume(volumeIdString string) error {
|
|||
}
|
||||
return s.volumes[vid].compact()
|
||||
}
|
||||
|
||||
// commits the compaction
|
||||
func (s *Store) CommitCompactVolume(volumeIdString string) error {
|
||||
vid, err := NewVolumeId(volumeIdString)
|
||||
if err != nil {
|
||||
|
@ -99,6 +106,8 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
|
|||
}
|
||||
return s.volumes[vid].commitCompact()
|
||||
}
|
||||
|
||||
// reads directory and loads volumes
|
||||
func (s *Store) loadExistingVolumes() {
|
||||
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
|
||||
for _, dir := range dirs {
|
||||
|
@ -107,9 +116,12 @@ func (s *Store) loadExistingVolumes() {
|
|||
base := name[:len(name)-len(".dat")]
|
||||
if vid, err := NewVolumeId(base); err == nil {
|
||||
if s.volumes[vid] == nil {
|
||||
v := NewVolume(s.dir, vid, CopyNil)
|
||||
if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
|
||||
s.volumes[vid] = v
|
||||
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
|
||||
log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size(), "frozen?", !v.IsWritable())
|
||||
} else {
|
||||
log.Println("ERROR loading volume", vid, "in dir", s.dir, ":", e.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -119,8 +131,16 @@ func (s *Store) loadExistingVolumes() {
|
|||
func (s *Store) Status() []*VolumeInfo {
|
||||
var stats []*VolumeInfo
|
||||
for k, v := range s.volumes {
|
||||
s := new(VolumeInfo)
|
||||
s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
|
||||
s := &VolumeInfo{
|
||||
Id: VolumeId(k),
|
||||
Size: v.ContentSize(),
|
||||
RepType: v.replicaType,
|
||||
Version: v.Version(),
|
||||
FileCount: v.nm.fileCounter,
|
||||
DeleteCount: v.nm.deletionCounter,
|
||||
DeletedByteCount: v.nm.deletionByteCounter,
|
||||
Frozen: !v.IsWritable(),
|
||||
}
|
||||
stats = append(stats, s)
|
||||
}
|
||||
return stats
|
||||
|
@ -133,6 +153,8 @@ type JoinResult struct {
|
|||
func (s *Store) SetMaster(mserver string) {
|
||||
s.masterNode = mserver
|
||||
}
|
||||
|
||||
// call master's /dir/join
|
||||
func (s *Store) Join() error {
|
||||
stats := new([]*VolumeInfo)
|
||||
for k, v := range s.volumes {
|
||||
|
@ -170,7 +192,8 @@ func (s *Store) Close() {
|
|||
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
|
||||
if v := s.volumes[i]; v != nil {
|
||||
size := v.write(n)
|
||||
if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
|
||||
if s.volumeSizeLimit < v.ContentSize()+uint64(size) &&
|
||||
s.volumeSizeLimit >= v.ContentSize() {
|
||||
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
|
||||
s.Join()
|
||||
}
|
||||
|
|
|
@ -3,8 +3,10 @@ package storage
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"pkg/util"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
@ -24,18 +26,25 @@ type Volume struct {
|
|||
accessLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
|
||||
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
|
||||
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
|
||||
v.load()
|
||||
e = v.load()
|
||||
return
|
||||
}
|
||||
func (v *Volume) load() error {
|
||||
var e error
|
||||
fileName := path.Join(v.dir, v.Id.String())
|
||||
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
|
||||
if e != nil {
|
||||
if os.IsPermission(e) {
|
||||
if util.FileExists(fileName + ".cdb") {
|
||||
v.dataFile, e = os.Open(fileName + ".dat")
|
||||
}
|
||||
}
|
||||
if e != nil {
|
||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||
}
|
||||
}
|
||||
if v.replicaType == CopyNil {
|
||||
if e = v.readSuperBlock(); e != nil {
|
||||
return e
|
||||
|
@ -43,13 +52,19 @@ func (v *Volume) load() error {
|
|||
} else {
|
||||
v.maybeWriteSuperBlock()
|
||||
}
|
||||
// TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that
|
||||
if !util.FileIsWritable(v.dataFile.Name()) { //Read-Only
|
||||
v.nm, e = NewFrozenNeedleMap(fileName)
|
||||
} else {
|
||||
indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
|
||||
if ie != nil {
|
||||
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
|
||||
}
|
||||
v.nm = LoadNeedleMap(indexFile)
|
||||
return nil
|
||||
v.nm, e = LoadNeedleMap(indexFile)
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func (v *Volume) Version() Version {
|
||||
return v.version
|
||||
}
|
||||
|
@ -63,6 +78,18 @@ func (v *Volume) Size() int64 {
|
|||
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
|
||||
return -1
|
||||
}
|
||||
|
||||
// a volume is writable, if its data file is writable and the index is not frozen
|
||||
func (v *Volume) IsWritable() bool {
|
||||
stat, e := v.dataFile.Stat()
|
||||
if e != nil {
|
||||
log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error())
|
||||
return false
|
||||
}
|
||||
// 4 for r, 2 for w, 1 for x
|
||||
return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen()
|
||||
}
|
||||
|
||||
func (v *Volume) Close() {
|
||||
v.accessLock.Lock()
|
||||
defer v.accessLock.Unlock()
|
||||
|
@ -79,21 +106,23 @@ func (v *Volume) maybeWriteSuperBlock() {
|
|||
v.dataFile.Write(header)
|
||||
}
|
||||
}
|
||||
func (v *Volume) readSuperBlock() error {
|
||||
func (v *Volume) readSuperBlock() (err error) {
|
||||
v.dataFile.Seek(0, 0)
|
||||
header := make([]byte, SuperBlockSize)
|
||||
if _, e := v.dataFile.Read(header); e != nil {
|
||||
return fmt.Errorf("cannot read superblock: %s", e)
|
||||
}
|
||||
var err error
|
||||
v.version, v.replicaType, err = ParseSuperBlock(header)
|
||||
return err
|
||||
}
|
||||
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) {
|
||||
func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) {
|
||||
version = Version(header[0])
|
||||
var err error
|
||||
if version == 0 {
|
||||
err = errors.New("Zero version impossible - bad superblock!")
|
||||
return
|
||||
}
|
||||
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
|
||||
e = fmt.Errorf("cannot read replica type: %s", err)
|
||||
err = fmt.Errorf("cannot read replica type: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -221,3 +250,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
|
|||
func (v *Volume) ContentSize() uint64 {
|
||||
return v.nm.fileByteCounter
|
||||
}
|
||||
|
||||
// Walk over the contained needles (call the function with each NeedleValue till error is returned)
|
||||
func (v *Volume) WalkValues(pedestrian func(*Needle) error) error {
|
||||
pedplus := func(nv *NeedleValue) (err error) {
|
||||
n := new(Needle)
|
||||
if nv.Offset > 0 {
|
||||
v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
|
||||
if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil {
|
||||
return
|
||||
}
|
||||
if err = pedestrian(n); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return v.nm.Walk(pedplus)
|
||||
}
|
||||
|
||||
// Walk over the keys
|
||||
func (v *Volume) WalkKeys(pedestrian func(Key) error) error {
|
||||
pedplus := func(nv *NeedleValue) (err error) {
|
||||
if nv.Offset > 0 && nv.Key > 0 {
|
||||
if err = pedestrian(nv.Key); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return v.nm.Walk(pedplus)
|
||||
}
|
||||
|
||||
func (v *Volume) String() string {
|
||||
return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(),
|
||||
v.Version(), v.replicaType)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
)
|
||||
|
||||
type VolumeId uint32
|
||||
|
||||
func NewVolumeId(vid string) (VolumeId, error) {
|
||||
volumeId, err := strconv.ParseUint(vid, 10, 64)
|
||||
return VolumeId(volumeId), err
|
||||
|
|
|
@ -10,4 +10,5 @@ type VolumeInfo struct {
|
|||
FileCount int
|
||||
DeleteCount int
|
||||
DeletedByteCount uint64
|
||||
Frozen bool
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
)
|
||||
import ()
|
||||
|
||||
type Version uint8
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package topology
|
||||
|
||||
import (
|
||||
)
|
||||
import ()
|
||||
|
||||
type DataCenter struct {
|
||||
NodeImpl
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package topology
|
||||
|
||||
import (
|
||||
_ "fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
_ "fmt"
|
||||
)
|
||||
|
||||
func TestXYZ(t *testing.T) {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package topology
|
||||
|
||||
import (
|
||||
)
|
||||
import ()
|
||||
|
||||
func (t *Topology) ToMap() interface{} {
|
||||
m := make(map[string]interface{})
|
||||
|
|
|
@ -5,12 +5,15 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"pkg/storage"
|
||||
"sort"
|
||||
)
|
||||
|
||||
type volumeIdList []storage.VolumeId
|
||||
|
||||
type VolumeLayout struct {
|
||||
repType storage.ReplicationType
|
||||
vid2location map[storage.VolumeId]*VolumeLocationList
|
||||
writables []storage.VolumeId // transient array of writable volume id
|
||||
writables volumeIdList // transient (sorted!) array of writable volume Ids
|
||||
pulse int64
|
||||
volumeSizeLimit uint64
|
||||
}
|
||||
|
@ -19,7 +22,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
|
|||
return &VolumeLayout{
|
||||
repType: repType,
|
||||
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
|
||||
writables: *new([]storage.VolumeId),
|
||||
writables: make(volumeIdList, 0, 4),
|
||||
pulse: pulse,
|
||||
volumeSizeLimit: volumeSizeLimit,
|
||||
}
|
||||
|
@ -33,13 +36,18 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
|||
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
|
||||
if vl.isWritable(v) {
|
||||
vl.writables = append(vl.writables, v.Id)
|
||||
if len(vl.writables) > 1 {
|
||||
vl.writables.Sort()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
||||
return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
|
||||
return !v.Frozen &&
|
||||
uint64(v.Size) < vl.volumeSizeLimit &&
|
||||
v.Version == storage.CurrentVersion
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
|
||||
|
@ -52,7 +60,13 @@ func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *Volume
|
|||
fmt.Println("No more writable volumes!")
|
||||
return nil, 0, nil, errors.New("No more writable volumes!")
|
||||
}
|
||||
vid := vl.writables[rand.Intn(len_writers)]
|
||||
var vid storage.VolumeId
|
||||
if len_writers == 1 {
|
||||
vid = vl.writables[0]
|
||||
} else {
|
||||
// skew for lesser indices
|
||||
vid = vl.writables[rand.Intn(len_writers+1)%len_writers]
|
||||
}
|
||||
locationList := vl.vid2location[vid]
|
||||
if locationList != nil {
|
||||
return &vid, count, locationList, nil
|
||||
|
@ -80,8 +94,12 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
// FIXME: how to refuse if volume is unwritable/frozen?
|
||||
fmt.Println("Volume", vid, "becomes writable")
|
||||
vl.writables = append(vl.writables, vid)
|
||||
if len(vl.writables) > 1 {
|
||||
vl.writables.Sort()
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -114,3 +132,18 @@ func (vl *VolumeLayout) ToMap() interface{} {
|
|||
//m["locations"] = vl.vid2location
|
||||
return m
|
||||
}
|
||||
|
||||
func (vls volumeIdList) Len() int { return len(vls) }
|
||||
|
||||
func (vls volumeIdList) Less(i, j int) bool {
|
||||
return vls[i] < vls[j]
|
||||
}
|
||||
|
||||
func (vls volumeIdList) Swap(i, j int) {
|
||||
vls[i], vls[j] = vls[j], vls[i]
|
||||
}
|
||||
|
||||
// convienence sorting
|
||||
func (vls volumeIdList) Sort() {
|
||||
sort.Sort(vls)
|
||||
}
|
||||
|
|
|
@ -31,4 +31,3 @@ func Uint32toBytes(b []byte, v uint32){
|
|||
func Uint8toBytes(b []byte, v uint8) {
|
||||
b[0] = byte(v)
|
||||
}
|
||||
|
||||
|
|
63
weed-fs/src/pkg/util/file.go
Normal file
63
weed-fs/src/pkg/util/file.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
// sets file (fh if not nil, otherwise fileName) permission to mask
|
||||
// it will
|
||||
// AND with the permission iff direction < 0
|
||||
// OR with the permission iff direction > 0
|
||||
// otherwise it will SET the permission to the mask
|
||||
func SetFilePerm(fh *os.File, fileName string, mask os.FileMode, direction int8) (err error) {
|
||||
var stat os.FileInfo
|
||||
if fh == nil {
|
||||
stat, err = os.Stat(fileName)
|
||||
} else {
|
||||
stat, err = fh.Stat()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mode := stat.Mode() & ^os.ModePerm
|
||||
// log.Printf("mode1=%d mask=%d", mode, mask)
|
||||
if direction == 0 {
|
||||
mode |= mask
|
||||
} else if direction > 0 {
|
||||
mode |= stat.Mode().Perm() | mask
|
||||
} else {
|
||||
mode |= stat.Mode().Perm() & mask
|
||||
}
|
||||
log.Printf("pmode=%d operm=%d => nmode=%d nperm=%d",
|
||||
stat.Mode(), stat.Mode()&os.ModePerm,
|
||||
mode, mode&os.ModePerm)
|
||||
if mode == 0 {
|
||||
return errors.New("Zero FileMode")
|
||||
}
|
||||
if fh == nil {
|
||||
err = os.Chmod(fileName, mode)
|
||||
} else {
|
||||
err = fh.Chmod(mode)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// returns whether the filename exists - errors doesn't mean not exists!
|
||||
func FileExists(fileName string) bool {
|
||||
if _, e := os.Stat(fileName); e != nil && os.IsNotExist(e) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// returns whether the filename is POSSIBLY writable
|
||||
//- whether it has some kind of writable bit set
|
||||
func FileIsWritable(fileName string) bool {
|
||||
if stat, e := os.Stat(fileName); e == nil {
|
||||
return stat.Mode().Perm()&0222 > 0
|
||||
}
|
||||
return false
|
||||
}
|
Loading…
Reference in a new issue