diff --git a/weed/command/backup.go b/weed/command/backup.go index 0b3994027..7983f7dab 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -75,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool { return true } - v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0) if err != nil { fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) return true diff --git a/weed/command/compact.go b/weed/command/compact.go index db11880ec..ae54db115 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -20,10 +20,11 @@ var cmdCompact = &Command{ } var ( - compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") - compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") - compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") - compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.") + compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") + compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") + compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") + compactMethod = cmdCompact.Flag.Int("method", 0, "option to choose which compact method. use 0 or 1.") + compactVolumePreallocate = cmdCompact.Flag.Int64("preallocate", 0, "preallocate volume disk space") ) func runCompact(cmd *Command, args []string) bool { @@ -34,7 +35,7 @@ func runCompact(cmd *Command, args []string) bool { vid := storage.VolumeId(*compactVolumeId) v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, - storage.NeedleMapInMemory, nil, nil) + storage.NeedleMapInMemory, nil, nil, *compactVolumePreallocate) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/weed/command/master.go b/weed/command/master.go index cd15defce..ec54fbd7b 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -35,6 +35,7 @@ var ( metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file") defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") @@ -73,7 +74,8 @@ func runMaster(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *mport, *metaFolder, - *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, + *volumeSizeLimitMB, *volumePreallocate, + *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList, *masterSecureKey, ) diff --git a/weed/command/server.go b/weed/command/server.go index 027ba191d..87146940f 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -61,6 +61,7 @@ var ( masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file") masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") @@ -204,7 +205,8 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, + *masterVolumeSizeLimitMB, *masterVolumePreallocate, + *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList, *serverSecureKey, ) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 61bda6988..9f59c2400 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -20,6 +20,7 @@ type MasterServer struct { port int metaFolder string volumeSizeLimitMB uint + preallocate int64 pulseSeconds int defaultReplicaPlacement string garbageThreshold string @@ -34,6 +35,7 @@ type MasterServer struct { func NewMasterServer(r *mux.Router, port int, metaFolder string, volumeSizeLimitMB uint, + preallocate bool, pulseSeconds int, confFile string, defaultReplicaPlacement string, @@ -41,9 +43,15 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, whiteList []string, secureKey string, ) *MasterServer { + + var preallocateSize int64 + if preallocate { + preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + } ms := &MasterServer{ port: port, volumeSizeLimitMB: volumeSizeLimitMB, + preallocate: preallocateSize, pulseSeconds: pulseSeconds, defaultReplicaPlacement: defaultReplicaPlacement, garbageThreshold: garbageThreshold, diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index a762bf416..efe81bf89 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -181,10 +181,18 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + preallocate := ms.preallocate + if r.FormValue("preallocate") != "" { + preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) + if err != nil { + return nil, fmt.Errorf("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err) + } + } volumeGrowOption := &topology.VolumeGrowOption{ Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, Ttl: ttl, + Prealloacte: preallocate, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), DataNode: r.FormValue("dataNode"), diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index ae9817ef6..28631dac7 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "net/http" "path/filepath" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/stats" @@ -17,13 +18,29 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) + var err error + preallocate := int64(0) + if r.FormValue("preallocate") != "" { + preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) + if err != nil { + glog.V(0).Infoln("ignoring invalid int64 value for preallocate = %v", r.FormValue("preallocate")) + } + } + err = vs.store.AddVolume( + r.FormValue("volume"), + r.FormValue("collection"), + vs.needleMapKind, + r.FormValue("replication"), + r.FormValue("ttl"), + preallocate, + ) if err == nil { writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) } else { writeJsonError(w, r, http.StatusNotAcceptable, err) } - glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) + glog.V(2).Infoln("assign volume = %s, collection = %s , replication = %s, error = %v", + r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), err) } func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) { @@ -33,7 +50,7 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R } else { writeJsonError(w, r, http.StatusInternalServerError, err) } - glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) + glog.V(2).Infof("deleting collection = %s, error = %v", r.FormValue("collection"), err) } func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 039b4f3b9..496e0dd57 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -36,7 +36,7 @@ func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleM _, found := l.volumes[vid] mutex.RUnlock() if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil { mutex.Lock() l.volumes[vid] = v mutex.Unlock() diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go index ff43effb3..4f03ce396 100644 --- a/weed/storage/needle_read_write.go +++ b/weed/storage/needle_read_write.go @@ -146,18 +146,13 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - actualSize = NeedleHeaderSize + int64(n.Size) + NeedleChecksumSize + int64(padding) - - return n.DataSize, actualSize, err + return n.DataSize, getActualSize(n.Size), err } return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } func ReadNeedleBlob(r *os.File, offset int64, size uint32) (dataSlice []byte, block *Block, err error) { - NeedleWithoutPaddingSize := NeedleHeaderSize + size + NeedleChecksumSize - padding := NeedlePaddingSize - (NeedleWithoutPaddingSize % NeedlePaddingSize) - readSize := NeedleWithoutPaddingSize + padding - return getBytesForFileBlock(r, offset, int(readSize)) + return getBytesForFileBlock(r, offset, int(getActualSize(size))) } func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { diff --git a/weed/storage/store.go b/weed/storage/store.go index 37a3904bd..be2044d64 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -95,7 +95,7 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -111,7 +111,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) + e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -123,7 +123,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, needleMapK return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl, preallocate); err != nil { e = err } } @@ -160,14 +160,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate); err == nil { location.SetVolume(vid, volume) return nil } else { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 11ee600df..df9f0b7a7 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -29,11 +29,11 @@ type Volume struct { lastCompactRevision uint16 } -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind - e = v.load(true, true, needleMapKind) + e = v.load(true, true, needleMapKind, preallocate) return } func (v *Volume) String() string { diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go new file mode 100644 index 000000000..6b3a17439 --- /dev/null +++ b/weed/storage/volume_create.go @@ -0,0 +1,17 @@ +// +build !linux + +package storage + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) { + file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) + if preallocate > 0 { + glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) + } + return file, e +} diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go new file mode 100644 index 000000000..8f6bab2fe --- /dev/null +++ b/weed/storage/volume_create_linux.go @@ -0,0 +1,19 @@ +// +build linux + +package storage + +import ( + "os" + "syscall" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func createVolumeFile(fileName string, preallocate int64) (file *os.File, e error) { + file, e = os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) + if preallocate != 0 { + syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) + glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) + } + return file, e +} diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 7bc65a4a3..c4f1aae9b 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -12,11 +12,11 @@ func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, need v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} v.needleMapKind = needleMapKind - e = v.load(false, false, needleMapKind) + e = v.load(false, false, needleMapKind, 0) return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error { var e error fileName := v.FileName() @@ -34,7 +34,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } } else { if createDatIfMissing { - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.dataFile, e = createVolumeFile(fileName+".dat", preallocate) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index f3ded5ff2..13072d1fb 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -24,7 +24,7 @@ func (v *Volume) Compact() error { v.lastCompactIndexOffset = v.nm.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", v.dataFileSize) } func (v *Volume) Compact2() error { @@ -66,7 +66,7 @@ func (v *Volume) commitCompact() error { //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) glog.V(3).Infof("Loading Commit file...") - if e = v.load(true, false, v.needleMapKind); e != nil { + if e = v.load(true, false, v.needleMapKind, 0); e != nil { return e } return nil @@ -207,11 +207,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return nil } -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64) (err error) { var ( dst, idx *os.File ) - if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + if dst, err = createVolumeFile(dstName, preallocate); err != nil { return } defer dst.Close() diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go index 7b267a805..ebf8ecbf0 100644 --- a/weed/topology/allocate_volume.go +++ b/weed/topology/allocate_volume.go @@ -20,6 +20,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption values.Add("collection", option.Collection) values.Add("replication", option.ReplicaPlacement.String()) values.Add("ttl", option.Ttl.String()) + values.Add("preallocate", fmt.Sprintf("%d", option.Prealloacte)) jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) if err != nil { return err diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 3a1c9c567..ddf687419 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -21,6 +21,7 @@ type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement Ttl *storage.TTL + Prealloacte int64 DataCenter string Rack string DataNode string