diff --git a/weed-fs/src/cmd/weeds.go b/weed-fs/src/cmd/weeds.go index 4b3792687..5e7e9477a 100644 --- a/weed-fs/src/cmd/weeds.go +++ b/weed-fs/src/cmd/weeds.go @@ -8,6 +8,7 @@ import ( "http" "json" "log" + "rand" "strconv" "strings" ) @@ -17,6 +18,7 @@ var ( metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings") capacity = flag.Int("capacity", 100, "maximum number of volumes to hold") mapper *directory.Mapper + ) func dirReadHandler(w http.ResponseWriter, r *http.Request) { @@ -25,12 +27,14 @@ func dirReadHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, machine.Server) } func dirWriteHandler(w http.ResponseWriter, r *http.Request) { - machine := mapper.PickForWrite() + _, machine := mapper.PickForWrite() writeJson(w, r, machine) } func dirPickHandler(w http.ResponseWriter, r *http.Request) { - machine := mapper.PickForWrite() - writeJson(w, r, machine) + vid, machine := mapper.PickForWrite() + hashcode := rand.Uint32() + fid := strconv.Uitoa64(vid) + "," + strconv.Uitoa64(mapper.NextFileId())+","+strconv.Uitoa64(uint64(hashcode)) + writeJson(w, r, map[string]string{"fid":fid,"url":machine.Url}) } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index ff5b0bf64..fa2ae62d3 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -11,7 +11,8 @@ import ( ) const ( - ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8 + ChunkSizeLimit = 1 * 1000 * 1000 * 1000 //1G, can not be more than max(uint32)*8 + FileIdSaveInterval = 10000 ) type MachineInfo struct { @@ -35,14 +36,17 @@ type Mapper struct { Writers []int // transient array of Writers volume id GlobalVolumeSequence uint64 + + FileIdSequence uint64 + fileIdCounter uint64 } func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine { - return &Machine{Server:MachineInfo{Url:server,PublicUrl:publicServer},Volumes:volumes,Capacity:capacity} + return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes, Capacity: capacity} } func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { - m = &Mapper{dir:dirname,fileName:filename,capacity:capacity} + m = &Mapper{dir: dirname, fileName: filename, capacity: capacity} log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) m.vid2machineId = make(map[uint64]int) @@ -67,11 +71,33 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { } log.Println("Loaded mapping size", len(m.Machines)) } + + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence + FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } return } -func (m *Mapper) PickForWrite() MachineInfo { - vid := rand.Intn(len(m.Writers)) - return m.Machines[m.Writers[vid]].Server +func (m *Mapper) PickForWrite() (vid uint64, server MachineInfo) { + machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]] + vid = machine.Volumes[rand.Intn(len(machine.Volumes))].Id + return vid, machine.Server +} +func (m *Mapper) NextFileId() uint64 { + if m.fileIdCounter <= 0 { + m.fileIdCounter = FileIdSaveInterval + m.saveSequence() + } + m.fileIdCounter-- + return m.FileIdSequence - m.fileIdCounter } func (m *Mapper) Get(vid uint64) *Machine { return m.Machines[m.vid2machineId[vid]] @@ -141,3 +167,13 @@ func (m *Mapper) Save() { encoder.Encode(m.Machines) encoder.Encode(m.GlobalVolumeSequence) } +func (m *Mapper) saveSequence() { + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) +}