From 3b687111399fd08468e4a6232bcbe6068df961bf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 12 Nov 2013 02:21:22 -0800 Subject: [PATCH] support for collections! --- go/operation/allocate_volume.go | 3 +- go/replication/volume_growth.go | 34 +++++++++++----------- go/storage/store.go | 29 ++++++++++++------- go/storage/volume.go | 32 ++++++++++++--------- go/storage/volume_info.go | 1 + go/topology/collection.go | 38 +++++++++++++++++++++++++ go/topology/topo_test.go | 7 +++-- go/topology/topology.go | 39 +++++++++++++------------- go/topology/topology_compact.go | 14 +++++---- go/topology/topology_event_handling.go | 6 ++-- go/topology/topology_map.go | 10 +++++-- go/weed/compact.go | 7 +++-- go/weed/export.go | 3 +- go/weed/fix.go | 10 +++++-- go/weed/master.go | 16 ++++++----- go/weed/volume.go | 6 ++-- 16 files changed, 163 insertions(+), 92 deletions(-) create mode 100644 go/topology/collection.go diff --git a/go/operation/allocate_volume.go b/go/operation/allocate_volume.go index ea34901ef..dee114f21 100644 --- a/go/operation/allocate_volume.go +++ b/go/operation/allocate_volume.go @@ -13,9 +13,10 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { +func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error { values := make(url.Values) values.Add("volume", vid.String()) + values.Add("collection", collection) values.Add("replicationType", repType.String()) jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) if err != nil { diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go index 2adf72dee..6e5bf1f5c 100644 --- a/go/replication/volume_growth.go +++ b/go/replication/volume_growth.go @@ -32,27 +32,27 @@ func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} } -func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) { +func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) { factor := 1 switch repType { case storage.Copy000: factor = 1 - count, err = vg.GrowByCountAndType(vg.copy1factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo) case storage.Copy001: factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) case storage.Copy010: factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) case storage.Copy100: factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) case storage.Copy110: factor = 3 - count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo) case storage.Copy200: factor = 3 - count, err = vg.GrowByCountAndType(vg.copy3factor, repType, dataCenter, topo) + count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo) default: err = errors.New("Unknown Replication Type!") } @@ -61,7 +61,7 @@ func (vg *VolumeGrowth) AutomaticGrowByType(repType storage.ReplicationType, dat } return count, err } -func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() @@ -70,7 +70,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio case storage.Copy000: for i := 0; i < count; i++ { if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { - if err = vg.grow(topo, *vid, repType, server); err == nil { + if err = vg.grow(topo, *vid, collection, repType, server); err == nil { counter++ } else { return counter, err @@ -89,7 +89,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio newNodeList := topology.NewNodeList(rack.Children(), exclusion) if newNodeList.FreeSpace() > 0 { if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil { counter++ } } @@ -107,7 +107,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio newNodeList := topology.NewNodeList(dc.Children(), exclusion) if newNodeList.FreeSpace() > 0 { if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil { counter++ } } @@ -129,7 +129,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if len(servers) == 2 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { + if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { counter++ } } @@ -168,7 +168,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if len(servers) == 3 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { + if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { counter++ } } @@ -189,7 +189,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } } if len(servers) == 3 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { + if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { counter++ } } @@ -198,10 +198,10 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio } return } -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { +func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { - if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} + if err := operation.AllocateVolume(server, vid, collection, repType); err == nil { + vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/storage/store.go b/go/storage/store.go index e5dc92bf6..cc9fef7d0 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -39,7 +39,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, replicationType string) error { +func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error { rt, e := NewReplicationTypeFromString(replicationType) if e != nil { return e @@ -51,7 +51,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), rt) + e = s.addVolume(VolumeId(id), collection, rt) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -63,7 +63,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), rt); err != nil { + if err := s.addVolume(VolumeId(id), collection, rt); err != nil { e = err } } @@ -90,13 +90,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %s already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", replicationType =", replicationType) - if volume, err := NewVolume(location.directory, vid, replicationType); err == nil { + glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType) + if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil { location.volumes[vid] = volume return nil } else { @@ -158,12 +158,17 @@ func (l *DiskLocation) loadExistingVolumes() { for _, dir := range dirs { name := dir.Name() if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + collection := "" base := name[:len(name)-len(".dat")] + i := strings.Index(base, "_") + if i > 0 { + collection, base = base[0:i], base[i+1:] + } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.directory, vid, CopyNil); e == nil { + if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("In dir", l.directory, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) } } } @@ -177,7 +182,9 @@ func (s *Store) Status() []*VolumeInfo { for _, location := range s.locations { for k, v := range location.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), - RepType: v.ReplicaType, Version: v.Version(), + Collection: v.Collection, + RepType: v.ReplicaType, + Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), DeletedByteCount: v.nm.DeletedSize(), @@ -208,7 +215,9 @@ func (s *Store) Join() error { maxVolumeCount = maxVolumeCount + location.maxVolumeCount for k, v := range location.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), - RepType: v.ReplicaType, Version: v.Version(), + Collection: v.Collection, + RepType: v.ReplicaType, + Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), DeletedByteCount: v.nm.DeletedSize(), diff --git a/go/storage/volume.go b/go/storage/volume.go index bdc07fc58..db3a66c1b 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -29,32 +29,38 @@ func (s *SuperBlock) Bytes() []byte { } type Volume struct { - Id VolumeId - dir string - dataFile *os.File - nm NeedleMapper - readOnly bool + Id VolumeId + dir string + Collection string + dataFile *os.File + nm NeedleMapper + readOnly bool SuperBlock accessLock sync.Mutex } -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { - v = &Volume{dir: dirname, Id: id} +func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { + v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaType: replicationType} e = v.load(true) return } -func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) { - v = &Volume{dir: dirname, Id: id} +func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { + v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaType: CopyNil} e = v.load(false) return } func (v *Volume) load(alsoLoadIndex bool) error { var e error - fileName := path.Join(v.dir, v.Id.String()) + var fileName string + if v.Collection == "" { + fileName = path.Join(v.dir, v.Id.String()) + } else { + fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String()) + } if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists && !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } else if !exists || canWrite { @@ -309,11 +315,11 @@ func (v *Volume) freeze() error { return nil } -func ScanVolumeFile(dirname string, id VolumeId, +func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, visitNeedle func(n *Needle, offset int64) error) (err error) { var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, id); err != nil { + if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { return } if err = visitSuperBlock(v.SuperBlock); err != nil { @@ -365,7 +371,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nm := NewNeedleMap(idx) new_offset := int64(SuperBlockSize) - err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error { + err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { _, err = dst.Write(superBlock.Bytes()) return err }, func(n *Needle, offset int64) error { diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 5a83b6e36..c8eb7612e 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -6,6 +6,7 @@ type VolumeInfo struct { Id VolumeId Size uint64 RepType ReplicationType + Collection string Version Version FileCount int DeleteCount int diff --git a/go/topology/collection.go b/go/topology/collection.go new file mode 100644 index 000000000..0a7971424 --- /dev/null +++ b/go/topology/collection.go @@ -0,0 +1,38 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/storage" +) + +type Collection struct { + Name string + volumeSizeLimit uint64 + replicaType2VolumeLayout []*VolumeLayout +} + +func NewCollection(name string, volumeSizeLimit uint64) *Collection { + c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} + c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + return c +} + +func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if c.replicaType2VolumeLayout[replicationTypeIndex] == nil { + glog.V(0).Infoln("collection", c.Name, "adding replication type", repType) + c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit) + } + return c.replicaType2VolumeLayout[replicationTypeIndex] +} + +func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + if list := vl.Lookup(vid); list != nil { + return list + } + } + } + return nil +} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index 36f4963db..c0edca7c1 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -99,9 +99,10 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), - Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Collection: "testingCollection", + Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) diff --git a/go/topology/topology.go b/go/topology/topology.go index b21601210..5b3d29e0b 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -12,8 +12,7 @@ import ( type Topology struct { NodeImpl - //transient vid~servers mapping for each replication type - replicaType2VolumeLayout []*VolumeLayout + collectionMap map[string]*Collection pulse int64 @@ -34,7 +33,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + t.collectionMap = make(map[string]*Collection) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -60,13 +59,18 @@ func (t *Topology) loadConfiguration(configurationFile string) error { return nil } -func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - if list := vl.Lookup(vid); list != nil { +func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { + //maybe an issue if lots of collections? + if collection == "" { + for _, c := range t.collectionMap { + if list := c.Lookup(vid); list != nil { return list } } + } else { + if c, ok := t.collectionMap[collection]; ok { + return c.Lookup(vid) + } } return nil } @@ -86,12 +90,8 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter) +func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -99,17 +99,16 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, data return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - glog.V(0).Infoln("adding replication type", repType) - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) +func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout { + _, ok := t.collectionMap[collectionName] + if !ok { + t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.replicaType2VolumeLayout[replicationTypeIndex] + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType) } func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) + t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn) } func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go index 4ba77a4a5..a1d6d2564 100644 --- a/go/topology/topology_compact.go +++ b/go/topology/topology_compact.go @@ -79,12 +79,14 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis return isCommitSuccess } func (t *Topology) Vacuum(garbageThreshold string) int { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) + for _, c := range t.collectionMap { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + for vid, locationlist := range vl.vid2location { + if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(vl, vid, locationlist) { + batchVacuumVolumeCommit(vl, vid, locationlist) + } } } } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index f3b09c649..7f81d8184 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -37,7 +37,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.RepType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -59,7 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index b416ee943..f66d4c251 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -13,9 +13,13 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, layout := range t.replicaType2VolumeLayout { - if layout != nil { - layouts = append(layouts, layout.ToMap()) + for _, c := range t.collectionMap { + for _, layout := range c.replicaType2VolumeLayout { + if layout != nil { + tmp := layout.ToMap() + tmp["collection"] = c.Name + layouts = append(layouts, tmp) + } } } m["layouts"] = layouts diff --git a/go/weed/compact.go b/go/weed/compact.go index 30ae6abd2..2600b3362 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -21,8 +21,9 @@ var cmdCompact = &Command{ } var ( - compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files") - compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") + compactVolumePath = cmdCompact.Flag.String("dir", "/tmp", "data directory to store files") + compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") + compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") ) func runCompact(cmd *Command, args []string) bool { @@ -32,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, vid, storage.CopyNil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/export.go b/go/weed/export.go index 0c5a6c227..e6644adc7 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -35,6 +35,7 @@ var cmdExport = &Command{ var ( exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") + exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name") exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") @@ -95,7 +96,7 @@ func runExport(cmd *Command, args []string) bool { var version storage.Version - err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { + err = storage.ScanVolumeFile(*exportVolumePath, *exportCollection, vid, func(superBlock storage.SuperBlock) error { version = superBlock.Version return nil }, func(n *storage.Needle, offset int64) error { diff --git a/go/weed/fix.go b/go/weed/fix.go index c97fd60d3..159e2dbde 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -22,8 +22,9 @@ var cmdFix = &Command{ } var ( - fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") - fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") + fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") + fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name") + fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") ) func runFix(cmd *Command, args []string) bool { @@ -33,6 +34,9 @@ func runFix(cmd *Command, args []string) bool { } fileName := strconv.Itoa(*fixVolumeId) + if *fixVolumeCollection != "" { + fileName = *fixVolumeCollection + "_" + fileName + } indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) if err != nil { glog.Fatalf("Create Volume Index [ERROR] %s\n", err) @@ -43,7 +47,7 @@ func runFix(cmd *Command, args []string) bool { defer nm.Close() vid := storage.VolumeId(*fixVolumeId) - err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { + err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, func(superBlock storage.SuperBlock) error { return nil }, func(n *storage.Needle, offset int64) error { debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) diff --git a/go/weed/master.go b/go/weed/master.go index 950aaca6d..3beecaaf9 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -56,13 +56,14 @@ var vgLock sync.Mutex func dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid := r.FormValue("volumeId") + collection := r.FormValue("collection") //optional, but can be faster if too many collections commaSep := strings.Index(vid, ",") if commaSep > 0 { vid = vid[0:commaSep] } volumeId, err := storage.NewVolumeId(vid) if err == nil { - machines := topo.Lookup(volumeId) + machines := topo.Lookup(collection, volumeId) if machines != nil { ret := []map[string]string{} for _, dn := range machines { @@ -88,6 +89,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { if repType == "" { repType = *defaultRepType } + collection := r.FormValue("collection") dataCenter := r.FormValue("dataCenter") rt, err := storage.NewReplicationTypeFromString(repType) if err != nil { @@ -96,7 +98,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { return } - if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 { + if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { if topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) @@ -104,15 +106,15 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { } else { vgLock.Lock() defer vgLock.Unlock() - if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 { - if _, err = vg.AutomaticGrowByType(rt, dataCenter, topo); err != nil { + if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { + if _, err = vg.AutomaticGrowByType(collection, rt, dataCenter, topo); err != nil { writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) return } } } } - fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter) + fid, count, dn, err := topo.PickForWrite(collection, rt, c, dataCenter) if err == nil { writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { @@ -168,7 +170,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { if topo.FreeSpace() < count*rt.GetCopyCount() { err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) } else { - count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo) + count, err = vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), topo) } } else { err = errors.New("parameter count is not found") @@ -197,7 +199,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) { debug("parsing error:", err, r.URL.Path) return } - machines := topo.Lookup(volumeId) + machines := topo.Lookup("", volumeId) if machines != nil && len(machines) > 0 { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { diff --git a/go/weed/volume.go b/go/weed/volume.go index cf58af799..87d42e227 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -56,13 +56,13 @@ func statusHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, m) } func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) + err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType")) if err == nil { writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } - debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) + debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) } func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) @@ -112,6 +112,8 @@ func storeHandler(w http.ResponseWriter, r *http.Request) { GetOrHeadHandler(w, r, false) case "DELETE": secure(volumeWhiteList, DeleteHandler)(w, r) + case "PUT": + secure(volumeWhiteList, PostHandler)(w, r) case "POST": secure(volumeWhiteList, PostHandler)(w, r) }