diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 7ebee122b..4495526bf 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 1.79 +version: 1.81 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index cfc48d828..fc416d4ce 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "1.79" + imageTag: "1.81" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/master.go b/weed/command/master.go index cb6864edd..21c759f4e 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -33,7 +33,7 @@ type MasterOptions struct { peers *string volumeSizeLimitMB *uint volumePreallocate *bool - pulseSeconds *int + // pulseSeconds *int defaultReplication *string garbageThreshold *float64 whiteList *string @@ -51,7 +51,7 @@ func init() { m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") - m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + // m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") @@ -118,7 +118,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } // start raftServer raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds) + peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, 5) if raftServer == nil { glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder) } @@ -178,7 +178,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp MetaFolder: *m.metaFolder, VolumeSizeLimitMB: *m.volumeSizeLimitMB, VolumePreallocate: *m.volumePreallocate, - PulseSeconds: *m.pulseSeconds, + // PulseSeconds: *m.pulseSeconds, DefaultReplicaPlacement: *m.defaultReplication, GarbageThreshold: *m.garbageThreshold, WhiteList: whiteList, diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 4e83a44a0..c95626651 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -34,7 +34,7 @@ func runMount(cmd *Command, args []string) bool { return false } - if len(args)>0 { + if len(args) > 0 { return false } diff --git a/weed/command/server.go b/weed/command/server.go index c006f00eb..0af583a7f 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -55,12 +55,16 @@ var ( serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") - pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly") + + // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string + + False = false ) func init() { @@ -93,6 +97,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.pprof = &False s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}") @@ -142,8 +147,8 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.rack = serverRack msgBrokerOptions.ip = serverIp - serverOptions.v.pulseSeconds = pulseSeconds - masterOptions.pulseSeconds = pulseSeconds + // serverOptions.v.pulseSeconds = pulseSeconds + // masterOptions.pulseSeconds = pulseSeconds masterOptions.whiteList = serverWhiteListOption @@ -206,7 +211,8 @@ func runServer(cmd *Command, args []string) bool { // start volume server { - go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption) + go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent) + } startMaster(masterOptions, serverWhiteList) diff --git a/weed/command/volume.go b/weed/command/volume.go index f942ec50b..d0fdd2ed1 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -3,6 +3,7 @@ package command import ( "fmt" "net/http" + httppprof "net/http/pprof" "os" "runtime" "runtime/pprof" @@ -10,10 +11,11 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/httpdown" @@ -40,7 +42,6 @@ type VolumeServerOptions struct { publicUrl *string bindIp *string masters *string - pulseSeconds *int idleConnectionTimeout *int dataCenter *string rack *string @@ -52,6 +53,9 @@ type VolumeServerOptions struct { memProfile *string compactionMBPerSecond *int fileSizeLimitMB *int + minFreeSpacePercent []float32 + pprof *bool + // pulseSeconds *int } func init() { @@ -62,7 +66,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") - v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") + // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") @@ -73,6 +77,7 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") } var cmdVolume = &Command{ @@ -87,6 +92,7 @@ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent ", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly") ) func runVolume(cmd *Command, args []string) bool { @@ -94,14 +100,19 @@ func runVolume(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) runtime.GOMAXPROCS(runtime.NumCPU()) - grace.SetupProfiling(*v.cpuProfile, *v.memProfile) - v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption) + // If --pprof is set we assume the caller wants to be able to collect + // cpu and memory profiles via go tool pprof + if !*v.pprof { + grace.SetupProfiling(*v.cpuProfile, *v.memProfile) + } + + v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent) return true } -func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) { +func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption, minFreeSpacePercent string) { // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") @@ -116,6 +127,16 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v if len(v.folders) != len(v.folderMaxLimits) { glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) } + minFreeSpacePercentStrings := strings.Split(minFreeSpacePercent, ",") + for _, freeString := range minFreeSpacePercentStrings { + + if value, e := strconv.ParseFloat(freeString, 32); e == nil { + v.minFreeSpacePercent = append(v.minFreeSpacePercent, float32(value)) + } else { + glog.Fatalf("The value specified in -minFreeSpacePercent not a valid value %s", freeString) + } + } + for _, folder := range v.folders { if err := util.TestFolderWritable(folder); err != nil { glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) @@ -145,6 +166,14 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v publicVolumeMux = http.NewServeMux() } + if *v.pprof { + volumeMux.HandleFunc("/debug/pprof/", httppprof.Index) + volumeMux.HandleFunc("/debug/pprof/cmdline", httppprof.Cmdline) + volumeMux.HandleFunc("/debug/pprof/profile", httppprof.Profile) + volumeMux.HandleFunc("/debug/pprof/symbol", httppprof.Symbol) + volumeMux.HandleFunc("/debug/pprof/trace", httppprof.Trace) + } + volumeNeedleMapKind := storage.NeedleMapInMemory switch *v.indexType { case "leveldb": @@ -159,9 +188,9 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, - v.folders, v.folderMaxLimits, + v.folders, v.folderMaxLimits, v.minFreeSpacePercent, volumeNeedleMapKind, - strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack, + strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readRedirect, *v.compactionMBPerSecond, diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 53fff7672..2771f878c 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -76,7 +76,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { n += readCount err = readErr if readCount == 0 { - return n, nil + return n, io.EOF } } return diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 372d742ea..c6637259d 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,6 +3,7 @@ package filesys import ( "context" "fmt" + "io" "math" "net/http" "time" @@ -98,6 +99,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { totalRead, err := fh.f.reader.ReadAt(buff, offset) + if err == io.EOF { + err = nil + } + if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 67dd2a62c..2b0ef64c2 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -89,6 +89,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { }, } if option.CacheSizeMB > 0 { + os.MkdirAll(option.CacheDir, 0755) wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) grace.OnInterrupt(func() { wfs.chunkCache.Shutdown() diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go index b0fc5afbf..1aa483ff8 100644 --- a/weed/messaging/msgclient/publisher.go +++ b/weed/messaging/msgclient/publisher.go @@ -26,7 +26,7 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* for i := 0; i < int(topicConfiguration.PartitionCount); i++ { tp := broker.TopicPartition{ Namespace: namespace, - Topic: topic, + Topic: topic, Partition: int32(i), } grpcClientConn, err := mc.findBroker(tp) diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index caa795626..6c7dc1ab7 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -3,8 +3,8 @@ package msgclient import ( "context" "io" - "time" "sync" + "time" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -26,12 +26,12 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - if partitionId>=0 && i != partitionId { + if partitionId >= 0 && i != partitionId { continue } tp := broker.TopicPartition{ Namespace: namespace, - Topic: topic, + Topic: topic, Partition: int32(i), } grpcClientConn, err := mc.findBroker(tp) diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 71e2c7d17..7029c3342 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -32,11 +32,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events) if moveErr != nil { fs.filer.RollbackTransaction(ctx) - return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err) + return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) } else { if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { fs.filer.RollbackTransaction(ctx) - return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, err) + return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index dc3ef9cfc..9a490bb1f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -37,7 +37,7 @@ type MasterOption struct { MetaFolder string VolumeSizeLimitMB uint VolumePreallocate bool - PulseSeconds int + // PulseSeconds int DefaultReplicaPlacement string GarbageThreshold float64 WhiteList []string @@ -103,7 +103,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste if nil == seq { glog.Fatalf("create sequencer failed.") } - ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 2d716edc1..62fbc19a7 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -35,7 +35,7 @@ type VolumeServer struct { func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, - folders []string, maxCounts []int, + folders []string, maxCounts []int, minFreeSpacePercent []float32, needleMapKind storage.NeedleMapType, masterNodes []string, pulseSeconds int, dataCenter string, rack string, @@ -68,8 +68,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, } vs.SeedMasterNodes = masterNodes - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercent, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 37c4afd5c..d86664542 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -487,6 +487,10 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) f.off += int64(readSize) + if err == io.EOF { + err = nil + } + if err != nil { glog.Errorf("file read %s: %v", f.name, err) } diff --git a/weed/stats/disk_windows.go b/weed/stats/disk_windows.go index 1185e129c..3cfa52c0b 100644 --- a/weed/stats/disk_windows.go +++ b/weed/stats/disk_windows.go @@ -6,6 +6,7 @@ import ( "syscall" "unsafe" ) + var ( kernel32 = windows.NewLazySystemDLL("Kernel32.dll") getDiskFreeSpaceEx = kernel32.NewProc("GetDiskFreeSpaceExW") diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 088763c45..853facc49 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -2,10 +2,13 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "io/ioutil" "os" + "path/filepath" "strings" "sync" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -13,20 +16,22 @@ import ( ) type DiskLocation struct { - Directory string - MaxVolumeCount int - volumes map[needle.VolumeId]*Volume - volumesLock sync.RWMutex + Directory string + MaxVolumeCount int + MinFreeSpacePercent float32 + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex } -func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation { + location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent} location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + go location.CheckDiskSpace() return location } @@ -293,3 +298,21 @@ func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) return } + +func (l *DiskLocation) CheckDiskSpace() { + lastStat := false + t := time.NewTicker(time.Minute) + for _ = range t.C { + if dir, e := filepath.Abs(l.Directory); e == nil { + s := stats.NewDiskStatus(dir) + if (s.PercentFree < l.MinFreeSpacePercent) != lastStat { + lastStat = !lastStat + for _, v := range l.volumes { + v.SetLowDiskSpace(lastStat) + } + + } + } + } + +} diff --git a/weed/storage/store.go b/weed/storage/store.go index 14881ffde..2aff8c93f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -48,11 +48,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, freeDiskSpaceWatermark []float32, needleMapKind NeedleMapType) (s *Store) { s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], freeDiskSpaceWatermark[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go index ded5b88cb..d53147e21 100644 --- a/weed/storage/types/offset_4bytes.go +++ b/weed/storage/types/offset_4bytes.go @@ -11,7 +11,7 @@ type OffsetHigher struct { } const ( - OffsetSize = 4 + OffsetSize = 4 MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 // 32GB ) diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go index 1db22d524..05c6d2f39 100644 --- a/weed/storage/types/offset_5bytes.go +++ b/weed/storage/types/offset_5bytes.go @@ -11,7 +11,7 @@ type OffsetHigher struct { } const ( - OffsetSize = 4 + 1 + OffsetSize = 4 + 1 MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB ) diff --git a/weed/storage/volume.go b/weed/storage/volume.go index df63360a1..e10f5afaa 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -27,6 +27,7 @@ type Volume struct { needleMapKind NeedleMapType noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + lowDiskSpace bool hasRemoteFile bool // if the volume has a remote file MemoryMapMaxSizeMb uint32 @@ -45,6 +46,11 @@ type Volume struct { volumeInfo *volume_server_pb.VolumeInfo } +func (v *Volume) SetLowDiskSpace(lowDiskSpace bool) { + glog.V(0).Infof("SetLowDiskSpace id %d value %t", v.Id, lowDiskSpace) + v.lowDiskSpace = lowDiskSpace +} + func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, @@ -244,5 +250,5 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { } func (v *Volume) IsReadOnly() bool { - return v.noWriteOrDelete || v.noWriteCanDelete + return v.noWriteOrDelete || v.noWriteCanDelete || v.lowDiskSpace } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index dce800242..edb5f48d8 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -76,7 +76,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size uint32, isUnch defer v.dataFileAccessLock.Unlock() if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { - err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()) + err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) return } if v.isFileUnchanged(n) { @@ -190,7 +190,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (uint32, error) { defer v.dataFileAccessLock.Unlock() if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { - err := fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()) + err := fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) return 0, err } diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 0dccdd0f2..d18dd6af0 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -41,7 +41,7 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() if oldV, ok := dn.volumes[v.Id]; !ok { @@ -64,12 +64,13 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { dn.UpAdjustRemoteVolumeCountDelta(-1) } } + isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly dn.volumes[v.Id] = v } return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v @@ -91,10 +92,13 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume } dn.Unlock() for _, v := range actualVolumes { - isNew := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.AddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } + if isChangedRO { + changeRO = append(changeRO, v) + } } return } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index c24cab9d6..993f444a7 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -212,13 +212,18 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } } // find out the delta volumes - newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) + var changedVolumes []storage.VolumeInfo + newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos) for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + for _, v := range changedVolumes { + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl.ensureCorrectWritables(&v) + } return } diff --git a/weed/util/constants.go b/weed/util/constants.go index 7c3927a66..6e9b83a0b 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,10 +5,10 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 79) - COMMIT = "" + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 81) + COMMIT = "" ) func Version() string { return VERSION + " " + COMMIT -} \ No newline at end of file +} diff --git a/weed/util/file_util.go b/weed/util/file_util.go index bef9f7cd6..ff725830b 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -49,6 +49,10 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti exists = false return } + if err != nil { + glog.Errorf("check %s: %v", filename, err) + return + } if fi.Mode()&0400 != 0 { canRead = true } diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 67823e7f4..1ecfe6ce2 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -42,7 +42,7 @@ func (l *ExclusiveLocker) GetToken() (token int64, lockTsNs int64) { } func (l *ExclusiveLocker) RequestLock() { - if l.isLocking { + if l.isLocking { return }