support for collections!

This commit is contained in:
Chris Lu 2013-11-12 02:21:22 -08:00
parent 8f0e2f31af
commit 3b68711139
16 changed files with 163 additions and 92 deletions

View file

@ -13,9 +13,10 @@ type AllocateVolumeResult struct {
Error string Error string
} }
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
values.Add("collection", collection)
values.Add("replicationType", repType.String()) values.Add("replicationType", repType.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil { if err != nil {

View file

@ -32,27 +32,27 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
} }
func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) { func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) {
factor := 1 factor := 1
switch repType { switch repType {
case storage.Copy000: case storage.Copy000:
factor = 1 factor = 1
count, err = vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo)
case storage.Copy001: case storage.Copy001:
factor = 2 factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy010: case storage.Copy010:
factor = 2 factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy100: case storage.Copy100:
factor = 2 factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy110: case storage.Copy110:
factor = 3 factor = 3
count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
case storage.Copy200: case storage.Copy200:
factor = 3 factor = 3
count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
default: default:
err = errors.New("Unknown Replication Type!") err = errors.New("Unknown Replication Type!")
} }
@ -61,7 +61,7 @@ func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dat
} }
return count, err return count, err
} }
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) { func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock() vg.accessLock.Lock()
defer vg.accessLock.Unlock() defer vg.accessLock.Unlock()
@ -70,7 +70,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
case storage.Copy000: case storage.Copy000:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
if err = vg.grow(topo, *vid, repType, server); err == nil { if err = vg.grow(topo, *vid, collection, repType, server); err == nil {
counter++ counter++
} else { } else {
return counter, err return counter, err
@ -89,7 +89,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
newNodeList := topology.NewNodeList(rack.Children(), exclusion) newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 { if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++ counter++
} }
} }
@ -107,7 +107,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
newNodeList := topology.NewNodeList(dc.Children(), exclusion) newNodeList := topology.NewNodeList(dc.Children(), exclusion)
if newNodeList.FreeSpace() > 0 { if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++ counter++
} }
} }
@ -129,7 +129,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 2 { if len(servers) == 2 {
if err = vg.grow(topo, vid, repType, servers...); err == nil { if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++ counter++
} }
} }
@ -168,7 +168,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 3 { if len(servers) == 3 {
if err = vg.grow(topo, vid, repType, servers...); err == nil { if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++ counter++
} }
} }
@ -189,7 +189,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 3 { if len(servers) == 3 {
if err = vg.grow(topo, vid, repType, servers...); err == nil { if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++ counter++
} }
} }
@ -198,10 +198,10 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
return return
} }
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil { if err := operation.AllocateVolume(server, vid, collection, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server) glog.V(0).Infoln("Created Volume", vid, "on", server)

View file

@ -39,7 +39,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
} }
return return
} }
func (s *Store) AddVolume(volumeListString string, replicationType string) error { func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error {
rt, e := NewReplicationTypeFromString(replicationType) rt, e := NewReplicationTypeFromString(replicationType)
if e != nil { if e != nil {
return e return e
@ -51,7 +51,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
if err != nil { if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
} }
e = s.addVolume(VolumeId(id), rt) e = s.addVolume(VolumeId(id), collection, rt)
} else { } else {
pair := strings.Split(range_string, "-") pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64) start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -63,7 +63,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), rt); err != nil { if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
e = err e = err
} }
} }
@ -90,13 +90,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
} }
return ret return ret
} }
func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error {
if s.findVolume(vid) != nil { if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid) return fmt.Errorf("Volume Id %s already exists!", vid)
} }
if location := s.findFreeLocation(); location != nil { if location := s.findFreeLocation(); location != nil {
glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType) glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType)
if volume, err := NewVolume(location.directory, vid, replicationType); err == nil { if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil {
location.volumes[vid] = volume location.volumes[vid] = volume
return nil return nil
} else { } else {
@ -158,12 +158,17 @@ func (l *DiskLocation) loadExistingVolumes() {
for _, dir := range dirs { for _, dir := range dirs {
name := dir.Name() name := dir.Name()
if !dir.IsDir() && strings.HasSuffix(name, ".dat") { if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
collection := ""
base := name[:len(name)-len(".dat")] base := name[:len(name)-len(".dat")]
i := strings.Index(base, "_")
if i > 0 {
collection, base = base[0:i], base[i+1:]
}
if vid, err := NewVolumeId(base); err == nil { if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil { if l.volumes[vid] == nil {
if v, e := NewVolume(l.directory, vid, CopyNil); e == nil { if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil {
l.volumes[vid] = v l.volumes[vid] = v
glog.V(0).Infoln("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
} }
} }
} }
@ -177,7 +182,9 @@ func (s *Store) Status() []*VolumeInfo {
for _, location := range s.locations { for _, location := range s.locations {
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
RepType: v.ReplicaType, Version: v.Version(), Collection: v.Collection,
RepType: v.ReplicaType,
Version: v.Version(),
FileCount: v.nm.FileCount(), FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(), DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(), DeletedByteCount: v.nm.DeletedSize(),
@ -208,7 +215,9 @@ func (s *Store) Join() error {
maxVolumeCount = maxVolumeCount + location.maxVolumeCount maxVolumeCount = maxVolumeCount + location.maxVolumeCount
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
RepType: v.ReplicaType, Version: v.Version(), Collection: v.Collection,
RepType: v.ReplicaType,
Version: v.Version(),
FileCount: v.nm.FileCount(), FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(), DeleteCount: v.nm.DeletedCount(),
DeletedByteCount: v.nm.DeletedSize(), DeletedByteCount: v.nm.DeletedSize(),

View file

@ -29,32 +29,38 @@ func (s *SuperBlock) Bytes() []byte {
} }
type Volume struct { type Volume struct {
Id VolumeId Id VolumeId
dir string dir string
dataFile *os.File Collection string
nm NeedleMapper dataFile *os.File
readOnly bool nm NeedleMapper
readOnly bool
SuperBlock SuperBlock
accessLock sync.Mutex accessLock sync.Mutex
} }
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
v = &Volume{dir: dirname, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: replicationType} v.SuperBlock = SuperBlock{ReplicaType: replicationType}
e = v.load(true) e = v.load(true)
return return
} }
func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: CopyNil} v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
e = v.load(false) e = v.load(false)
return return
} }
func (v *Volume) load(alsoLoadIndex bool) error { func (v *Volume) load(alsoLoadIndex bool) error {
var e error var e error
fileName := path.Join(v.dir, v.Id.String()) var fileName string
if v.Collection == "" {
fileName = path.Join(v.dir, v.Id.String())
} else {
fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String())
}
if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead { if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
} else if !exists || canWrite { } else if !exists || canWrite {
@ -309,11 +315,11 @@ func (v *Volume) freeze() error {
return nil return nil
} }
func ScanVolumeFile(dirname string, id VolumeId, func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitSuperBlock func(SuperBlock) error, visitSuperBlock func(SuperBlock) error,
visitNeedle func(n *Needle, offset int64) error) (err error) { visitNeedle func(n *Needle, offset int64) error) (err error) {
var v *Volume var v *Volume
if v, err = loadVolumeWithoutIndex(dirname, id); err != nil { if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil {
return return
} }
if err = visitSuperBlock(v.SuperBlock); err != nil { if err = visitSuperBlock(v.SuperBlock); err != nil {
@ -365,7 +371,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx) nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize) new_offset := int64(SuperBlockSize)
err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error { err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes()) _, err = dst.Write(superBlock.Bytes())
return err return err
}, func(n *Needle, offset int64) error { }, func(n *Needle, offset int64) error {

View file

@ -6,6 +6,7 @@ type VolumeInfo struct {
Id VolumeId Id VolumeId
Size uint64 Size uint64
RepType ReplicationType RepType ReplicationType
Collection string
Version Version Version Version
FileCount int FileCount int
DeleteCount int DeleteCount int

38
go/topology/collection.go Normal file
View file

@ -0,0 +1,38 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
)
type Collection struct {
Name string
volumeSizeLimit uint64
replicaType2VolumeLayout []*VolumeLayout
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
return c
}
func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if c.replicaType2VolumeLayout[replicationTypeIndex] == nil {
glog.V(0).Infoln("collection", c.Name, "adding replication type", repType)
c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit)
}
return c.replicaType2VolumeLayout[replicationTypeIndex]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.replicaType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
}
}
}
return nil
}

View file

@ -99,9 +99,10 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := storage.VolumeInfo{ vi := storage.VolumeInfo{
Id: storage.VolumeId(int64(m["id"].(float64))), Id: storage.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)), Size: uint64(m["size"].(float64)),
Version: storage.CurrentVersion} Collection: "testingCollection",
Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))

View file

@ -12,8 +12,7 @@ import (
type Topology struct { type Topology struct {
NodeImpl NodeImpl
//transient vid~servers mapping for each replication type collectionMap map[string]*Collection
replicaType2VolumeLayout []*VolumeLayout
pulse int64 pulse int64
@ -34,7 +33,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology" t.nodeType = "Topology"
t.NodeImpl.value = t t.NodeImpl.value = t
t.children = make(map[NodeId]Node) t.children = make(map[NodeId]Node)
t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) t.collectionMap = make(map[string]*Collection)
t.pulse = int64(pulse) t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit t.volumeSizeLimit = volumeSizeLimit
@ -60,13 +59,18 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil return nil
} }
func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
for _, vl := range t.replicaType2VolumeLayout { //maybe an issue if lots of collections?
if vl != nil { if collection == "" {
if list := vl.Lookup(vid); list != nil { for _, c := range t.collectionMap {
if list := c.Lookup(vid); list != nil {
return list return list
} }
} }
} else {
if c, ok := t.collectionMap[collection]; ok {
return c.Lookup(vid)
}
} }
return nil return nil
} }
@ -86,12 +90,8 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next() return vid.Next()
} }
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
replicationTypeIndex := repType.GetReplicationLevelIndex() vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 { if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!") return "", 0, nil, errors.New("No writable volumes avalable!")
} }
@ -99,17 +99,16 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, data
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
} }
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := repType.GetReplicationLevelIndex() _, ok := t.collectionMap[collectionName]
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { if !ok {
glog.V(0).Infoln("adding replication type", repType) t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
} }
return t.replicaType2VolumeLayout[replicationTypeIndex] return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType)
} }
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn)
} }
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {

View file

@ -79,12 +79,14 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess return isCommitSuccess
} }
func (t *Topology) Vacuum(garbageThreshold string) int { func (t *Topology) Vacuum(garbageThreshold string) int {
for _, vl := range t.replicaType2VolumeLayout { for _, c := range t.collectionMap {
if vl != nil { for _, vl := range c.replicaType2VolumeLayout {
for vid, locationlist := range vl.vid2location { if vl != nil {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCompact(vl, vid, locationlist) { if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
batchVacuumVolumeCommit(vl, vid, locationlist) if batchVacuumVolumeCompact(vl, vid, locationlist) {
batchVacuumVolumeCommit(vl, vid, locationlist)
}
} }
} }
} }

View file

@ -37,7 +37,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}() }()
} }
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.RepType) vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) { if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false return false
} }
@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.RepType) vl := t.GetVolumeLayout(v.Collection, v.RepType)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
} }
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@ -59,7 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
} }
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.RepType) vl := t.GetVolumeLayout(v.Collection, v.RepType)
if vl.isWritable(&v) { if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id) vl.SetVolumeAvailable(dn, v.Id)
} }

View file

@ -13,9 +13,13 @@ func (t *Topology) ToMap() interface{} {
} }
m["DataCenters"] = dcs m["DataCenters"] = dcs
var layouts []interface{} var layouts []interface{}
for _, layout := range t.replicaType2VolumeLayout { for _, c := range t.collectionMap {
if layout != nil { for _, layout := range c.replicaType2VolumeLayout {
layouts = append(layouts, layout.ToMap()) if layout != nil {
tmp := layout.ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
}
} }
} }
m["layouts"] = layouts m["layouts"] = layouts

View file

@ -21,8 +21,9 @@ var cmdCompact = &Command{
} }
var ( var (
compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files") compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files")
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name")
compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.")
) )
func runCompact(cmd *Command, args []string) bool { func runCompact(cmd *Command, args []string) bool {
@ -32,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
} }
vid := storage.VolumeId(*compactVolumeId) vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, vid, storage.CopyNil) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil)
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }

View file

@ -35,6 +35,7 @@ var cmdExport = &Command{
var ( var (
exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name")
exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
@ -95,7 +96,7 @@ func runExport(cmd *Command, args []string) bool {
var version storage.Version var version storage.Version
err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error {
version = superBlock.Version version = superBlock.Version
return nil return nil
}, func(n *storage.Needle, offset int64) error { }, func(n *storage.Needle, offset int64) error {

View file

@ -22,8 +22,9 @@ var cmdFix = &Command{
} }
var ( var (
fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
) )
func runFix(cmd *Command, args []string) bool { func runFix(cmd *Command, args []string) bool {
@ -33,6 +34,9 @@ func runFix(cmd *Command, args []string) bool {
} }
fileName := strconv.Itoa(*fixVolumeId) fileName := strconv.Itoa(*fixVolumeId)
if *fixVolumeCollection != "" {
fileName = *fixVolumeCollection + "_" + fileName
}
indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
if err != nil { if err != nil {
glog.Fatalf("Create Volume Index [ERROR] %s\n", err) glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
@ -43,7 +47,7 @@ func runFix(cmd *Command, args []string) bool {
defer nm.Close() defer nm.Close()
vid := storage.VolumeId(*fixVolumeId) vid := storage.VolumeId(*fixVolumeId)
err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error {
return nil return nil
}, func(n *storage.Needle, offset int64) error { }, func(n *storage.Needle, offset int64) error {
debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())

View file

@ -56,13 +56,14 @@ var vgLock sync.Mutex
func dirLookupHandler(w http.ResponseWriter, r *http.Request) { func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId") vid := r.FormValue("volumeId")
collection := r.FormValue("collection") //optional, but can be faster if too many collections
commaSep := strings.Index(vid, ",") commaSep := strings.Index(vid, ",")
if commaSep > 0 { if commaSep > 0 {
vid = vid[0:commaSep] vid = vid[0:commaSep]
} }
volumeId, err := storage.NewVolumeId(vid) volumeId, err := storage.NewVolumeId(vid)
if err == nil { if err == nil {
machines := topo.Lookup(volumeId) machines := topo.Lookup(collection, volumeId)
if machines != nil { if machines != nil {
ret := []map[string]string{} ret := []map[string]string{}
for _, dn := range machines { for _, dn := range machines {
@ -88,6 +89,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" { if repType == "" {
repType = *defaultRepType repType = *defaultRepType
} }
collection := r.FormValue("collection")
dataCenter := r.FormValue("dataCenter") dataCenter := r.FormValue("dataCenter")
rt, err := storage.NewReplicationTypeFromString(repType) rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil { if err != nil {
@ -96,7 +98,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 { if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
if topo.FreeSpace() <= 0 { if topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
@ -104,15 +106,15 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
} else { } else {
vgLock.Lock() vgLock.Lock()
defer vgLock.Unlock() defer vgLock.Unlock()
if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 { if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
if _, err = vg.AutomaticGrowByType(rt, dataCenter, topo); err != nil { if _, err = vg.AutomaticGrowByType(collection, rt, dataCenter, topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return return
} }
} }
} }
} }
fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter) fid, count, dn, err := topo.PickForWrite(collection, rt, c, dataCenter)
if err == nil { if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else { } else {
@ -168,7 +170,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
if topo.FreeSpace() < count*rt.GetCopyCount() { if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else { } else {
count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo) count, err = vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), topo)
} }
} else { } else {
err = errors.New("parameter count is not found") err = errors.New("parameter count is not found")
@ -197,7 +199,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) {
debug("parsing error:", err, r.URL.Path) debug("parsing error:", err, r.URL.Path)
return return
} }
machines := topo.Lookup(volumeId) machines := topo.Lookup("", volumeId)
if machines != nil && len(machines) > 0 { if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else { } else {

View file

@ -56,13 +56,13 @@ func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, m) writeJsonQuiet(w, r, m)
} }
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType"))
if err == nil { if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""}) writeJsonQuiet(w, r, map[string]string{"error": ""})
} else { } else {
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
} }
debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
} }
func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
@ -112,6 +112,8 @@ func storeHandler(w http.ResponseWriter, r *http.Request) {
GetOrHeadHandler(w, r, false) GetOrHeadHandler(w, r, false)
case "DELETE": case "DELETE":
secure(volumeWhiteList, DeleteHandler)(w, r) secure(volumeWhiteList, DeleteHandler)(w, r)
case "PUT":
secure(volumeWhiteList, PostHandler)(w, r)
case "POST": case "POST":
secure(volumeWhiteList, PostHandler)(w, r) secure(volumeWhiteList, PostHandler)(w, r)
} }