package storage import ( "context" "fmt" "io" "os" "sort" "sync" "time" "github.com/klauspost/reedsolomon" "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat { var ecShardMessages []*master_pb.VolumeEcShardInformationMessage collectionEcShardSize := make(map[string]int64) for _, location := range s.Locations { location.ecVolumesLock.RLock() for _, ecShards := range location.ecVolumes { ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...) for _, ecShard := range ecShards.Shards { collectionEcShardSize[ecShards.Collection] += ecShard.Size() } } location.ecVolumesLock.RUnlock() } for col, size := range collectionEcShardSize { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "ec").Set(float64(size)) } return &master_pb.Heartbeat{ EcShards: ecShardMessages, HasNoEcShards: len(ecShardMessages) == 0, } } func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error { for _, location := range s.Locations { if err := location.LoadEcShard(collection, vid, shardId); err == nil { log.Infof("MountEcShards %d.%d", vid, shardId) var shardBits erasure_coding.ShardBits s.NewEcShardsChan <- master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), } return nil } else if err == os.ErrNotExist { continue } else { return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err) } } return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId) } func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error { ecShard, found := s.findEcShard(vid, shardId) if !found { return nil } var shardBits erasure_coding.ShardBits message := master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: ecShard.Collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), } for _, location := range s.Locations { if deleted := location.UnloadEcShard(vid, shardId); deleted { log.Infof("UnmountEcShards %d.%d", vid, shardId) s.DeletedEcShardsChan <- message return nil } } return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId) } func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) { for _, location := range s.Locations { if v, found := location.FindEcShard(vid, shardId); found { return v, found } } return nil, false } func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { for _, location := range s.Locations { if s, found := location.FindEcVolume(vid); found { return s, true } } return nil, false } func (s *Store) DestroyEcVolume(vid needle.VolumeId) { for _, location := range s.Locations { location.DestroyEcVolume(vid) } } func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { for _, location := range s.Locations { if localEcVolume, found := location.FindEcVolume(vid); found { offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("locate in local ec volume: %v", err) } if size.IsDeleted() { return 0, ErrorDeleted } log.Tracef("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals) if len(intervals) > 1 { log.Tracef("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) } bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) } if isDeleted { return 0, ErrorDeleted } err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("readbytes: %v", err) } return len(bytes), nil } } return 0, fmt.Errorf("ec shard %d not found", vid) } func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { if err = s.cachedLookupEcShardLocations(ecVolume); err != nil { return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } for i, interval := range intervals { if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil { return nil, isDeleted, e } else { if isDeleted { is_deleted = true } if i == 0 { data = d } else { data = append(data, d...) } } } return } func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { if _, err = shard.ReadAt(data, actualOffset); err != nil { log.Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) return } } else { ecVolume.ShardLocationsLock.RLock() sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId] ecVolume.ShardLocationsLock.RUnlock() // try reading directly if hasShardIdLocation { _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) if err == nil { return } log.Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err) forgetShardId(ecVolume, shardId) } // try reading by recovering from other shards _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) if err == nil { return } log.Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err) } return } func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId) { // failed to access the source data nodes, clear it up ecVolume.ShardLocationsLock.Lock() delete(ecVolume.ShardLocations, shardId) ecVolume.ShardLocationsLock.Unlock() } func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) { shardCount := len(ecVolume.ShardLocations) if shardCount < erasure_coding.DataShardsCount && ecVolume.ShardLocationsRefreshTime.Add(11*time.Second).After(time.Now()) || shardCount == erasure_coding.TotalShardsCount && ecVolume.ShardLocationsRefreshTime.Add(37*time.Minute).After(time.Now()) || shardCount >= erasure_coding.DataShardsCount && ecVolume.ShardLocationsRefreshTime.Add(7*time.Minute).After(time.Now()) { // still fresh return nil } log.Tracef("lookup and cache ec volume %d locations", ecVolume.VolumeId) err = operation.WithMasterServerClient(s.MasterAddress, s.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } resp, err := masterClient.LookupEcVolume(context.Background(), req) if err != nil { return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err) } if len(resp.ShardIdLocations) < erasure_coding.DataShardsCount { return fmt.Errorf("only %d shards found but %d required", len(resp.ShardIdLocations), erasure_coding.DataShardsCount) } ecVolume.ShardLocationsLock.Lock() for _, shardIdLocations := range resp.ShardIdLocations { shardId := erasure_coding.ShardId(shardIdLocations.ShardId) delete(ecVolume.ShardLocations, shardId) for _, loc := range shardIdLocations.Locations { ecVolume.ShardLocations[shardId] = append(ecVolume.ShardLocations[shardId], loc.Url) } } ecVolume.ShardLocationsRefreshTime = time.Now() ecVolume.ShardLocationsLock.Unlock() return nil }) return } func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) } for _, sourceDataNode := range sourceDataNodes { log.Tracef("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset) if err == nil { return } log.Debugf("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) } return } func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ VolumeId: uint32(vid), ShardId: uint32(shardId), Offset: offset, Size: int64(len(buf)), FileKey: uint64(needleId), }) if err != nil { return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) } for { resp, receiveErr := shardReadClient.Recv() if receiveErr == io.EOF { break } if receiveErr != nil { return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) } if resp.IsDeleted { is_deleted = true } copy(buf[n:n+len(resp.Data)], resp.Data) n += len(resp.Data) } return nil }) if err != nil { return 0, is_deleted, fmt.Errorf("read ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) } return } func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { log.Tracef("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) if err != nil { return 0, false, fmt.Errorf("failed to create encoder: %v", err) } bufs := make([][]byte, erasure_coding.TotalShardsCount) var wg sync.WaitGroup ecVolume.ShardLocationsLock.RLock() for shardId, locations := range ecVolume.ShardLocations { // skip currnent shard or empty shard if shardId == shardIdToRecover { continue } if len(locations) == 0 { log.Tracef("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations) continue } // read from remote locations wg.Add(1) go func(shardId erasure_coding.ShardId, locations []string) { defer wg.Done() data := make([]byte, len(buf)) nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) if readErr != nil { log.Tracef("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr) forgetShardId(ecVolume, shardId) } if isDeleted { is_deleted = true } if nRead == len(buf) { bufs[shardId] = data } }(shardId, locations) } ecVolume.ShardLocationsLock.RUnlock() wg.Wait() if err = enc.ReconstructData(bufs); err != nil { log.Tracef("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err) return 0, false, err } log.Tracef("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) copy(buf, bufs[shardIdToRecover]) return len(buf), is_deleted, nil } func (s *Store) EcVolumes() (ecVolumes []*erasure_coding.EcVolume) { for _, location := range s.Locations { location.ecVolumesLock.RLock() for _, v := range location.ecVolumes { ecVolumes = append(ecVolumes, v) } location.ecVolumesLock.RUnlock() } sort.Slice(ecVolumes, func(i, j int) bool { return ecVolumes[i].VolumeId > ecVolumes[j].VolumeId }) return ecVolumes }