reading by recover from other shards

This commit is contained in:
Chris Lu 2019-05-28 23:48:39 -07:00
parent 3f9ecee40f
commit 5dd67f9acf
2 changed files with 80 additions and 10 deletions

View file

@ -122,7 +122,7 @@ func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uin
func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err error) {
ecFileOffset, ecFileIndex := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize)
ecFileIndex, ecFileOffset := interval.ToShardIdAndOffset(largeBlockSize, smallBlockSize)
data = make([]byte, interval.Size)
err = readFromFile(ecFiles[ecFileIndex], data, ecFileOffset)

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -12,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@ -160,11 +162,19 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, ecVolume *erasure_co
if !found || len(sourceDataNodes) == 0 {
return nil, fmt.Errorf("failed to find ec shard %d.%d", ecVolume.VolumeId, shardId)
}
glog.V(4).Infof("read remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNodes[0])
_, err = s.readOneRemoteEcShardInterval(ctx, sourceDataNodes[0], ecVolume.VolumeId, shardId, data, actualOffset)
if err != nil {
glog.V(1).Infof("failed to read from %s for ec shard %d.%d : %v", sourceDataNodes[0], ecVolume.VolumeId, shardId, err)
// try reading directly
_, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
// try reading by recovering from other shards
_, err = s.recoverOneRemoteEcShardInterval(ctx, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
}
return
}
@ -203,7 +213,21 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
for _, sourceDataNode := range sourceDataNodes {
glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
n, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, vid, shardId, buf, offset)
if err == nil {
return
}
glog.V(1).Infof("read remote ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
}
return
}
func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
@ -239,8 +263,54 @@ func (s *Store) readOneRemoteEcShardInterval(ctx context.Context, sourceDataNode
return
}
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, string, vid needle.VolumeId, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
glog.V(1).Infof("recover ec shard %d.%d from other locations", vid, shardIdToRecover)
// TODO add recovering
return 0, fmt.Errorf("recover is not implemented yet")
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, err error) {
glog.V(1).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
return 0, fmt.Errorf("failed to create encoder: %v", err)
}
bufs := make([][]byte, erasure_coding.TotalShardsCount)
var wg sync.WaitGroup
ecVolume.ShardLocationsLock.RLock()
for shardId, locations := range ecVolume.ShardLocations {
// skip currnent shard or empty shard
if shardId == shardIdToRecover {
continue
}
if len(locations) == 0 {
glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
continue
}
// read from remote locations
wg.Add(1)
go func(shardId erasure_coding.ShardId, locations []string) {
defer wg.Done()
data := make([]byte, len(buf))
n, err = s.readRemoteEcShardInterval(ctx, locations, ecVolume.VolumeId, shardId, data, offset)
if err != nil {
glog.V(3).Infof("readRemoteEcShardInterval %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
}
if n == len(buf) {
bufs[shardId] = data
return
}
}(shardId, locations)
}
ecVolume.ShardLocationsLock.RUnlock()
wg.Wait()
if err = enc.ReconstructData(bufs); err != nil {
return 0, err
}
glog.V(3).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
copy(buf, bufs[shardIdToRecover])
return len(buf), nil
}