mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
go fmt and fix some typo
This commit is contained in:
parent
6ddfaf33cb
commit
ab6be025d7
|
@ -77,13 +77,13 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
|
||||||
readerOffset += int64(count)
|
readerOffset += int64(count)
|
||||||
|
|
||||||
// start to read dat file
|
// start to read dat file
|
||||||
superblock, err := storage.ReadSuperBlock(datFile)
|
superBlock, err := storage.ReadSuperBlock(datFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("cannot read dat file super block: %v", err)
|
fmt.Printf("cannot read dat file super block: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
offset := int64(superblock.BlockSize())
|
offset := int64(superBlock.BlockSize())
|
||||||
version := superblock.Version()
|
version := superBlock.Version()
|
||||||
n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
|
n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("cannot read needle header: %v", err)
|
fmt.Printf("cannot read needle header: %v", err)
|
||||||
|
|
|
@ -29,7 +29,7 @@ func main() {
|
||||||
rand.Read(data)
|
rand.Read(data)
|
||||||
reader := bytes.NewReader(data)
|
reader := bytes.NewReader(data)
|
||||||
|
|
||||||
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url,assignResult.Fid)
|
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
|
||||||
|
|
||||||
_, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, "")
|
_, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -38,7 +38,7 @@ var cmdBackup = &Command{
|
||||||
This will help to backup future new volumes.
|
This will help to backup future new volumes.
|
||||||
|
|
||||||
Usually backing up is just copying the .dat (and .idx) files.
|
Usually backing up is just copying the .dat (and .idx) files.
|
||||||
But it's tricky to incremententally copy the differences.
|
But it's tricky to incrementally copy the differences.
|
||||||
|
|
||||||
The complexity comes when there are multiple addition, deletion and compaction.
|
The complexity comes when there are multiple addition, deletion and compaction.
|
||||||
This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.
|
This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.
|
||||||
|
|
|
@ -45,7 +45,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
cmdBenchmark.Run = runbenchmark // break init cycle
|
cmdBenchmark.Run = runBenchmark // break init cycle
|
||||||
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
|
cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
|
||||||
b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
|
b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
|
||||||
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
|
b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
|
||||||
|
@ -101,7 +101,7 @@ var (
|
||||||
readStats *stats
|
readStats *stats
|
||||||
)
|
)
|
||||||
|
|
||||||
func runbenchmark(cmd *Command, args []string) bool {
|
func runBenchmark(cmd *Command, args []string) bool {
|
||||||
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
|
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
|
||||||
if *b.maxCpu < 1 {
|
if *b.maxCpu < 1 {
|
||||||
*b.maxCpu = runtime.NumCPU()
|
*b.maxCpu = runtime.NumCPU()
|
||||||
|
@ -121,17 +121,17 @@ func runbenchmark(cmd *Command, args []string) bool {
|
||||||
masterClient.WaitUntilConnected()
|
masterClient.WaitUntilConnected()
|
||||||
|
|
||||||
if *b.write {
|
if *b.write {
|
||||||
bench_write()
|
benchWrite()
|
||||||
}
|
}
|
||||||
|
|
||||||
if *b.read {
|
if *b.read {
|
||||||
bench_read()
|
benchRead()
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func bench_write() {
|
func benchWrite() {
|
||||||
fileIdLineChan := make(chan string)
|
fileIdLineChan := make(chan string)
|
||||||
finishChan := make(chan bool)
|
finishChan := make(chan bool)
|
||||||
writeStats = newStats(*b.concurrency)
|
writeStats = newStats(*b.concurrency)
|
||||||
|
@ -158,7 +158,7 @@ func bench_write() {
|
||||||
writeStats.printStats()
|
writeStats.printStats()
|
||||||
}
|
}
|
||||||
|
|
||||||
func bench_read() {
|
func benchRead() {
|
||||||
fileIdLineChan := make(chan string)
|
fileIdLineChan := make(chan string)
|
||||||
finishChan := make(chan bool)
|
finishChan := make(chan bool)
|
||||||
readStats = newStats(*b.concurrency)
|
readStats = newStats(*b.concurrency)
|
||||||
|
|
|
@ -315,7 +315,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string,
|
||||||
|
|
||||||
func detectMimeType(f *os.File) string {
|
func detectMimeType(f *os.File) string {
|
||||||
head := make([]byte, 512)
|
head := make([]byte, 512)
|
||||||
f.Seek(0, 0)
|
f.Seek(0, io.SeekStart)
|
||||||
n, err := f.Read(head)
|
n, err := f.Read(head)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return ""
|
return ""
|
||||||
|
@ -324,7 +324,7 @@ func detectMimeType(f *os.File) string {
|
||||||
fmt.Printf("read head of %v: %v\n", f.Name(), err)
|
fmt.Printf("read head of %v: %v\n", f.Name(), err)
|
||||||
return "application/octet-stream"
|
return "application/octet-stream"
|
||||||
}
|
}
|
||||||
f.Seek(0, 0)
|
f.Seek(0, io.SeekStart)
|
||||||
mimeType := http.DetectContentType(head[:n])
|
mimeType := http.DetectContentType(head[:n])
|
||||||
return mimeType
|
return mimeType
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdFilerExport = &Command{
|
var cmdFilerExport = &Command{
|
||||||
UsageLine: "filer.export -sourceStore=mysql -targetStroe=cassandra",
|
UsageLine: "filer.export -sourceStore=mysql -targetStore=cassandra",
|
||||||
Short: "export meta data in filer store",
|
Short: "export meta data in filer store",
|
||||||
Long: `Iterate the file tree and export all metadata out
|
Long: `Iterate the file tree and export all metadata out
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (store *UniversalRedisStore) InsertEntry(entry *filer2.Entry) (err error) {
|
||||||
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
|
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec) * time.Second).Result()
|
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
|
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
|
||||||
|
|
|
@ -6,10 +6,10 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/fuse"
|
|
||||||
"github.com/seaweedfs/fuse/fs"
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
|
"github.com/seaweedfs/fuse"
|
||||||
|
"github.com/seaweedfs/fuse/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = fs.NodeSymlinker(&Dir{})
|
var _ = fs.NodeSymlinker(&Dir{})
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
package filesys
|
package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"github.com/seaweedfs/fuse"
|
"github.com/seaweedfs/fuse"
|
||||||
"github.com/seaweedfs/fuse/fs"
|
"github.com/seaweedfs/fuse/fs"
|
||||||
"context"
|
|
||||||
"math"
|
"math"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ type statsCache struct {
|
||||||
func NewSeaweedFileSystem(option *Option) *WFS {
|
func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
wfs := &WFS{
|
wfs := &WFS{
|
||||||
option: option,
|
option: option,
|
||||||
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024*8).ItemsToPrune(100)),
|
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
|
||||||
pathToHandleIndex: make(map[string]int),
|
pathToHandleIndex: make(map[string]int),
|
||||||
bufPool: sync.Pool{
|
bufPool: sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
|
@ -174,11 +174,11 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
|
||||||
resp.Blocks = totalDiskSize / blockSize
|
resp.Blocks = totalDiskSize / blockSize
|
||||||
|
|
||||||
// Compute the number of used blocks
|
// Compute the number of used blocks
|
||||||
numblocks := uint64(usedDiskSize / blockSize)
|
numBlocks := uint64(usedDiskSize / blockSize)
|
||||||
|
|
||||||
// Report the number of free and available blocks for the block size
|
// Report the number of free and available blocks for the block size
|
||||||
resp.Bfree = resp.Blocks - numblocks
|
resp.Bfree = resp.Blocks - numBlocks
|
||||||
resp.Bavail = resp.Blocks - numblocks
|
resp.Bavail = resp.Blocks - numBlocks
|
||||||
resp.Bsize = uint32(blockSize)
|
resp.Bsize = uint32(blockSize)
|
||||||
|
|
||||||
// Report the total number of possible files in the file system (and those free)
|
// Report the total number of possible files in the file system (and those free)
|
||||||
|
|
|
@ -201,7 +201,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_one_chunk(filename string, reader io.Reader, master,
|
func upload_one_chunk(filename string, reader io.Reader, master,
|
||||||
fileUrl string, jwt security.EncodedJwt,
|
fileUrl string, jwt security.EncodedJwt,
|
||||||
) (size uint32, e error) {
|
) (size uint32, e error) {
|
||||||
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
||||||
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
||||||
|
|
|
@ -29,12 +29,12 @@ type Needle struct {
|
||||||
|
|
||||||
DataSize uint32 `comment:"Data size"` //version2
|
DataSize uint32 `comment:"Data size"` //version2
|
||||||
Data []byte `comment:"The actual file data"`
|
Data []byte `comment:"The actual file data"`
|
||||||
Flags byte `comment:"boolean flags"` //version2
|
Flags byte `comment:"boolean flags"` //version2
|
||||||
NameSize uint8 //version2
|
NameSize uint8 //version2
|
||||||
Name []byte `comment:"maximum 256 characters"` //version2
|
Name []byte `comment:"maximum 256 characters"` //version2
|
||||||
MimeSize uint8 //version2
|
MimeSize uint8 //version2
|
||||||
Mime []byte `comment:"maximum 256 characters"` //version2
|
Mime []byte `comment:"maximum 256 characters"` //version2
|
||||||
PairsSize uint16 //version2
|
PairsSize uint16 //version2
|
||||||
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
|
Pairs []byte `comment:"additional name value pairs, json format, maximum 64kB"`
|
||||||
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
|
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
|
||||||
Ttl *TTL
|
Ttl *TTL
|
||||||
|
|
|
@ -4,11 +4,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReplicaPlacemnetSerialDeserial(t *testing.T) {
|
func TestReplicaPlacementSerialDeserial(t *testing.T) {
|
||||||
rp, _ := NewReplicaPlacementFromString("001")
|
rp, _ := NewReplicaPlacementFromString("001")
|
||||||
new_rp, _ := NewReplicaPlacementFromByte(rp.Byte())
|
newRp, _ := NewReplicaPlacementFromByte(rp.Byte())
|
||||||
if rp.String() != new_rp.String() {
|
if rp.String() != newRp.String() {
|
||||||
println("expected:", rp.String(), "actual:", new_rp.String())
|
println("expected:", rp.String(), "actual:", newRp.String())
|
||||||
t.Fail()
|
t.Fail()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,7 +163,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
|
||||||
}
|
}
|
||||||
volumeMessages = append(volumeMessages, volumeMessage)
|
volumeMessages = append(volumeMessages, volumeMessage)
|
||||||
} else {
|
} else {
|
||||||
if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
|
||||||
location.deleteVolumeById(v.Id)
|
location.deleteVolumeById(v.Id)
|
||||||
glog.V(0).Infoln("volume", v.Id, "is deleted.")
|
glog.V(0).Infoln("volume", v.Id, "is deleted.")
|
||||||
} else {
|
} else {
|
||||||
|
@ -265,4 +265,3 @@ func (s *Store) DeleteVolume(i VolumeId) error {
|
||||||
|
|
||||||
return fmt.Errorf("Volume %d not found on disk", i)
|
return fmt.Errorf("Volume %d not found on disk", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait either maxDelayMinutes or 10% of ttl minutes
|
// wait either maxDelayMinutes or 10% of ttl minutes
|
||||||
func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
|
func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
|
||||||
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
|
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,10 +137,10 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
||||||
}
|
}
|
||||||
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
|
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
|
||||||
|
|
||||||
for idx_offset := indexSize - NeedleEntrySize; uint64(idx_offset) >= v.lastCompactIndexOffset; idx_offset -= NeedleEntrySize {
|
for idxOffset := indexSize - NeedleEntrySize; uint64(idxOffset) >= v.lastCompactIndexOffset; idxOffset -= NeedleEntrySize {
|
||||||
var IdxEntry []byte
|
var IdxEntry []byte
|
||||||
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idx_offset); err != nil {
|
if IdxEntry, err = readIndexEntryAtOffset(oldIdxFile, idxOffset); err != nil {
|
||||||
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idx_offset, err)
|
return fmt.Errorf("readIndexEntry %s at offset %d failed: %v", oldIdxFileName, idxOffset, err)
|
||||||
}
|
}
|
||||||
key, offset, size := IdxFileEntry(IdxEntry)
|
key, offset, size := IdxFileEntry(IdxEntry)
|
||||||
glog.V(4).Infof("key %d offset %d size %d", key, offset, size)
|
glog.V(4).Infof("key %d offset %d size %d", key, offset, size)
|
||||||
|
|
|
@ -86,9 +86,9 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
|
||||||
func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
|
func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
|
||||||
dn.RLock()
|
dn.RLock()
|
||||||
defer dn.RUnlock()
|
defer dn.RUnlock()
|
||||||
v_info, ok := dn.volumes[id]
|
vInfo, ok := dn.volumes[id]
|
||||||
if ok {
|
if ok {
|
||||||
return v_info, nil
|
return vInfo, nil
|
||||||
} else {
|
} else {
|
||||||
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
|
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(d
|
||||||
}
|
}
|
||||||
if !ret {
|
if !ret {
|
||||||
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
|
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
|
||||||
err = errors.New("Not enough data node found!")
|
err = errors.New("No enough data node found!")
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
|
||||||
size = ret
|
size = ret
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
|
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
|
||||||
if !needToReplicate {
|
if !needToReplicate {
|
||||||
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
|
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
|
||||||
|
|
|
@ -136,12 +136,12 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
|
||||||
|
|
||||||
volumeLayout.accessLock.RLock()
|
volumeLayout.accessLock.RLock()
|
||||||
tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
|
tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
|
||||||
for vid, locationlist := range volumeLayout.vid2location {
|
for vid, locationList := range volumeLayout.vid2location {
|
||||||
tmpMap[vid] = locationlist
|
tmpMap[vid] = locationList
|
||||||
}
|
}
|
||||||
volumeLayout.accessLock.RUnlock()
|
volumeLayout.accessLock.RUnlock()
|
||||||
|
|
||||||
for vid, locationlist := range tmpMap {
|
for vid, locationList := range tmpMap {
|
||||||
|
|
||||||
volumeLayout.accessLock.RLock()
|
volumeLayout.accessLock.RLock()
|
||||||
isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
|
isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
|
||||||
|
@ -152,11 +152,11 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
|
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
|
||||||
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
|
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
|
||||||
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
|
if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) {
|
||||||
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
|
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
|
||||||
} else {
|
} else {
|
||||||
batchVacuumVolumeCleanup(volumeLayout, vid, locationlist)
|
batchVacuumVolumeCleanup(volumeLayout, vid, locationList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
|
||||||
}
|
}
|
||||||
|
|
||||||
//find main rack and other racks
|
//find main rack and other racks
|
||||||
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
|
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
|
||||||
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
|
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
|
||||||
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
|
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
|
||||||
}
|
}
|
||||||
|
@ -148,12 +148,12 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if rack_err != nil {
|
if rackErr != nil {
|
||||||
return nil, rack_err
|
return nil, rackErr
|
||||||
}
|
}
|
||||||
|
|
||||||
//find main rack and other racks
|
//find main rack and other racks
|
||||||
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
|
mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
|
||||||
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
|
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
|
||||||
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
|
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
|
||||||
}
|
}
|
||||||
|
@ -162,8 +162,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if server_err != nil {
|
if serverErr != nil {
|
||||||
return nil, server_err
|
return nil, serverErr
|
||||||
}
|
}
|
||||||
|
|
||||||
servers = append(servers, mainServer.(*DataNode))
|
servers = append(servers, mainServer.(*DataNode))
|
||||||
|
|
|
@ -55,8 +55,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||||
vl.vid2location[v.Id].Set(dn)
|
vl.vid2location[v.Id].Set(dn)
|
||||||
// glog.V(4).Infof("volume %d added to %s len %d copy %d", v.Id, dn.Id(), vl.vid2location[v.Id].Length(), v.ReplicaPlacement.GetCopyCount())
|
// glog.V(4).Infof("volume %d added to %s len %d copy %d", v.Id, dn.Id(), vl.vid2location[v.Id].Length(), v.ReplicaPlacement.GetCopyCount())
|
||||||
for _, dn := range vl.vid2location[v.Id].list {
|
for _, dn := range vl.vid2location[v.Id].list {
|
||||||
if v_info, err := dn.GetVolumesById(v.Id); err == nil {
|
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
|
||||||
if v_info.ReadOnly {
|
if vInfo.ReadOnly {
|
||||||
glog.V(3).Infof("vid %d removed from writable", v.Id)
|
glog.V(3).Infof("vid %d removed from writable", v.Id)
|
||||||
vl.removeFromWritable(v.Id)
|
vl.removeFromWritable(v.Id)
|
||||||
vl.readonlyVolumes[v.Id] = true
|
vl.readonlyVolumes[v.Id] = true
|
||||||
|
@ -145,13 +145,13 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
|
||||||
vl.accessLock.RLock()
|
vl.accessLock.RLock()
|
||||||
defer vl.accessLock.RUnlock()
|
defer vl.accessLock.RUnlock()
|
||||||
|
|
||||||
len_writers := len(vl.writables)
|
lenWriters := len(vl.writables)
|
||||||
if len_writers <= 0 {
|
if lenWriters <= 0 {
|
||||||
glog.V(0).Infoln("No more writable volumes!")
|
glog.V(0).Infoln("No more writable volumes!")
|
||||||
return nil, 0, nil, errors.New("No more writable volumes!")
|
return nil, 0, nil, errors.New("No more writable volumes!")
|
||||||
}
|
}
|
||||||
if option.DataCenter == "" {
|
if option.DataCenter == "" {
|
||||||
vid := vl.writables[rand.Intn(len_writers)]
|
vid := vl.writables[rand.Intn(lenWriters)]
|
||||||
locationList := vl.vid2location[vid]
|
locationList := vl.vid2location[vid]
|
||||||
if locationList != nil {
|
if locationList != nil {
|
||||||
return &vid, count, locationList, nil
|
return &vid, count, locationList, nil
|
||||||
|
|
Loading…
Reference in a new issue