mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
add metastore, switching sequence to use it
metastore is for storing metadata. This will be used later when moving to distributed master mode.
This commit is contained in:
parent
b579451db9
commit
5cb6590eae
30
go/metastore/backing_test.go
Normal file
30
go/metastore/backing_test.go
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
package metastore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMemoryBacking(t *testing.T) {
|
||||||
|
ms := &MetaStore{NewMetaStoreMemoryBacking()}
|
||||||
|
verifySetGet(t, ms)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileBacking(t *testing.T) {
|
||||||
|
ms := &MetaStore{NewMetaStoreFileBacking()}
|
||||||
|
verifySetGet(t, ms)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifySetGet(t *testing.T, ms *MetaStore) {
|
||||||
|
data := uint64(234234)
|
||||||
|
ms.SetUint64(data, "/tmp", "sequence")
|
||||||
|
if !ms.Has("/tmp", "sequence") {
|
||||||
|
t.Errorf("Failed to set data")
|
||||||
|
}
|
||||||
|
if val, err := ms.GetUint64("/tmp", "sequence"); err == nil {
|
||||||
|
if val != data {
|
||||||
|
t.Errorf("Set %d, but read back %d", data, val)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t.Errorf("Failed to get back data:%s", err)
|
||||||
|
}
|
||||||
|
}
|
34
go/metastore/file_backing.go
Normal file
34
go/metastore/file_backing.go
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package metastore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
// store data on disk, enough for most cases
|
||||||
|
|
||||||
|
type MetaStoreFileBacking struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetaStoreFileBacking() MetaStoreFileBacking {
|
||||||
|
mms := MetaStoreFileBacking{}
|
||||||
|
return mms
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreFileBacking) Set(val []byte, elem ...string) error {
|
||||||
|
return ioutil.WriteFile(path.Join(elem...), val, 0644)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreFileBacking) Get(elem ...string) (val []byte, err error) {
|
||||||
|
return ioutil.ReadFile(path.Join(elem...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreFileBacking) Has(elem ...string) (ok bool) {
|
||||||
|
seqFile, se := os.OpenFile(path.Join(elem...), os.O_RDONLY, 0644)
|
||||||
|
if se != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer seqFile.Close()
|
||||||
|
return true
|
||||||
|
}
|
37
go/metastore/memory_backing.go
Normal file
37
go/metastore/memory_backing.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package metastore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
//this is for testing only
|
||||||
|
|
||||||
|
type MetaStoreMemoryBacking struct {
|
||||||
|
m map[string][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetaStoreMemoryBacking() MetaStoreMemoryBacking {
|
||||||
|
mms := MetaStoreMemoryBacking{}
|
||||||
|
mms.m = make(map[string][]byte)
|
||||||
|
return mms
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreMemoryBacking) Set(val []byte, elem ...string) error {
|
||||||
|
mms.m[path.Join(elem...)] = val
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreMemoryBacking) Get(elem ...string) (val []byte, err error) {
|
||||||
|
var ok bool
|
||||||
|
val, ok = mms.m[path.Join(elem...)]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("Missing value for %s", path.Join(elem...))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mms MetaStoreMemoryBacking) Has(elem ...string) (ok bool) {
|
||||||
|
_, ok = mms.m[path.Join(elem...)]
|
||||||
|
return
|
||||||
|
}
|
35
go/metastore/metastore.go
Normal file
35
go/metastore/metastore.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package metastore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"code.google.com/p/weed-fs/go/util"
|
||||||
|
"errors"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MetaStoreBacking interface {
|
||||||
|
Get(elem ...string) ([]byte, error)
|
||||||
|
Set(val []byte, elem ...string) error
|
||||||
|
Has(elem ...string) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type MetaStore struct {
|
||||||
|
MetaStoreBacking
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaStore) SetUint64(val uint64, elem ...string) error {
|
||||||
|
b := make([]byte, 8)
|
||||||
|
util.Uint64toBytes(b, val)
|
||||||
|
return m.Set(b, elem...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetaStore) GetUint64(elem ...string) (val uint64, err error) {
|
||||||
|
if b, e := m.Get(elem...); e == nil && len(b) == 8 {
|
||||||
|
val = util.BytesToUint64(b)
|
||||||
|
} else {
|
||||||
|
if e != nil {
|
||||||
|
return 0, e
|
||||||
|
}
|
||||||
|
err = errors.New("Not found value for " + path.Join(elem...))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
|
@ -1,9 +1,10 @@
|
||||||
package sequence
|
package sequence
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"code.google.com/p/weed-fs/go/glog"
|
"code.google.com/p/weed-fs/go/glog"
|
||||||
|
"code.google.com/p/weed-fs/go/metastore"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"os"
|
|
||||||
"path"
|
"path"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -24,25 +25,28 @@ type SequencerImpl struct {
|
||||||
|
|
||||||
FileIdSequence uint64
|
FileIdSequence uint64
|
||||||
fileIdCounter uint64
|
fileIdCounter uint64
|
||||||
|
|
||||||
|
metaStore *metastore.MetaStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
|
func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
|
||||||
m = &SequencerImpl{dir: dirname, fileName: filename}
|
m = &SequencerImpl{dir: dirname, fileName: filename}
|
||||||
|
m.metaStore = &metastore.MetaStore{metastore.NewMetaStoreFileBacking()}
|
||||||
|
|
||||||
seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
|
if !m.metaStore.Has(m.dir, m.fileName+".seq") {
|
||||||
if se != nil {
|
|
||||||
m.FileIdSequence = FileIdSaveInterval
|
m.FileIdSequence = FileIdSaveInterval
|
||||||
glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
|
glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
|
||||||
} else {
|
} else {
|
||||||
decoder := gob.NewDecoder(seqFile)
|
var err error
|
||||||
defer seqFile.Close()
|
if m.FileIdSequence, err = m.metaStore.GetUint64(m.dir, m.fileName+".seq"); err != nil {
|
||||||
if se = decoder.Decode(&m.FileIdSequence); se != nil {
|
if data, err := m.metaStore.Get(m.dir, m.fileName+".seq"); err == nil {
|
||||||
glog.V(0).Infof("error decoding FileIdSequence: %s", se)
|
m.FileIdSequence = decode(data)
|
||||||
m.FileIdSequence = FileIdSaveInterval
|
glog.V(0).Infoln("Decoding old version of FileIdSequence", m.FileIdSequence)
|
||||||
glog.V(0).Infoln("Setting file id sequence", m.FileIdSequence)
|
} else {
|
||||||
|
glog.V(0).Infof("No existing FileIdSequence: %s", err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
|
glog.V(0).Infoln("Loading file id sequence", m.FileIdSequence)
|
||||||
m.FileIdSequence += FileIdSaveInterval
|
|
||||||
}
|
}
|
||||||
//in case the server stops between intervals
|
//in case the server stops between intervals
|
||||||
}
|
}
|
||||||
|
@ -66,13 +70,18 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
|
||||||
}
|
}
|
||||||
func (m *SequencerImpl) saveSequence() {
|
func (m *SequencerImpl) saveSequence() {
|
||||||
glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
|
glog.V(0).Infoln("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
|
||||||
seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
|
if e := m.metaStore.SetUint64(m.FileIdSequence, m.dir, m.fileName+".seq"); e != nil {
|
||||||
if e != nil {
|
glog.Fatalf("Sequence id Save [ERROR] %s", e)
|
||||||
glog.Fatalf("Sequence File Save [ERROR] %s", e)
|
|
||||||
}
|
|
||||||
defer seqFile.Close()
|
|
||||||
encoder := gob.NewEncoder(seqFile)
|
|
||||||
if e = encoder.Encode(m.FileIdSequence); e != nil {
|
|
||||||
glog.Fatalf("Sequence File Save [ERROR] %s", e)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//decode are for backward compatible purpose
|
||||||
|
func decode(input []byte) uint64 {
|
||||||
|
var x uint64
|
||||||
|
b := bytes.NewReader(input)
|
||||||
|
decoder := gob.NewDecoder(b)
|
||||||
|
if e := decoder.Decode(&x); e == nil {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue