diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 4a9a9619a..8bb585d91 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -282,14 +282,19 @@ func readFiles(fileIdLineChan chan string, s *stat) { start := time.Now() var bytesRead int var err error - url, err := b.masterClient.LookupFileId(fid) + urls, err := b.masterClient.LookupFileId(fid) if err != nil { s.failed++ println("!!!! ", fid, " location not found!!!!!") continue } var bytes []byte - bytes, err = util.Get(url) + for _, url := range urls { + bytes, err = util.Get(url) + if err == nil { + break + } + } bytesRead = len(bytes) if err == nil { s.completed++ diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 37b172357..6fcd7abc2 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -84,21 +84,26 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil // TODO fetch from cache for weed mount? func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := lookupFileIdFn(fileId) + urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return nil, err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err + + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", fileId, err) + buffer.Reset() + } else { + break + } } - return buffer.Bytes(), nil + return buffer.Bytes(), err } func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 3aeb9a94d..74f037557 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/golang/groupcache/singleflight" "io" - "math/rand" "sync" "github.com/chrislusf/seaweedfs/weed/glog" @@ -17,24 +16,24 @@ import ( type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews []*ChunkView - lookupFileId func(fileId string) (targetUrl string, err error) + lookupFileId LookupFileIdFunctionType readerLock sync.Mutex fileSize int64 - fetchGroup singleflight.Group - lastChunkFileId string - lastChunkData []byte - chunkCache chunk_cache.ChunkCache + fetchGroup singleflight.Group + lastChunkFileId string + lastChunkData []byte + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) -type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) +type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error) func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { vidCache := make(map[string]*filer_pb.Locations) - return func(fileId string) (targetUrl string, err error) { + return func(fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) locations, found := vidCache[vid] @@ -59,8 +58,11 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { }) } - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[rand.Intn(len(locations.Locations))].Url) - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + for _, loc := range locations.Locations { + volumeServerAddress := filerClient.AdjustedUrl(loc.Url) + targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + targetUrls = append(targetUrls, targetUrl) + } return } @@ -142,7 +144,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews... *ChunkView) (chunkData []byte, err error) { +func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) { if c.lastChunkFileId == chunkView.FileId { return c.lastChunkData, nil diff --git a/weed/filer/stream.go b/weed/filer/stream.go index dc6e414ca..d98229379 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -17,27 +17,32 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f // fmt.Printf("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) - fileId2Url := make(map[string]string) + fileId2Url := make(map[string][]string) for _, chunkView := range chunkViews { - urlString, err := masterClient.LookupFileId(chunkView.FileId) + urlStrings, err := masterClient.LookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - fileId2Url[chunkView.FileId] = urlString + fileId2Url[chunkView.FileId] = urlStrings } for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err + urlStrings := fileId2Url[chunkView.FileId] + for _, urlString := range urlStrings { + err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + w.Write(data) + }) + if err != nil { + // data already written to w would be wrong + // but usually there are nothing written if fails to read + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + } else { + break + } } } @@ -51,24 +56,28 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) for _, chunkView := range chunkViews { - urlString, err := lookupFileIdFn(chunkView.FileId) + urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err } - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return nil, err + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + buffer.Reset() + } else { + break + } } } return buffer.Bytes(), nil @@ -89,7 +98,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { return masterClient.LookupFileId(fileId) } @@ -169,17 +178,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.lookupFileId(chunkView.FileId) + urlStrings, err := c.lookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + buffer.Reset() + } else { + break + } + } if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return err } c.buffer = buffer.Bytes() diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go new file mode 100644 index 000000000..9b18275b5 --- /dev/null +++ b/weed/replication/repl_util/replication_utli.go @@ -0,0 +1,40 @@ +package repl_util + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error { + + for _, chunk := range chunkViews { + + fileUrls, err := filerSource.LookupFileId(chunk.FileId) + if err != nil { + return err + } + + var writeErr error + + for _, fileUrl := range fileUrls { + err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + writeErr = writeFunc(data) + }) + if err != nil { + glog.V(1).Infof("read from %s: %v", fileUrl, err) + } else if writeErr != nil { + glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr) + } else { + break + } + } + + if err != nil { + return err + } + + } + return nil +} diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index dab5cf4f4..df70be64b 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "net/url" "strings" @@ -107,25 +108,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return err } - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - var writeErr error - readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) - }) - - if readErr != nil { - return readErr - } - if writeErr != nil { - return writeErr - } + writeFunc := func(data []byte) error { + _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) + return writeErr + } + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { + return err } return nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index cf212f129..24f0ecbbc 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -2,6 +2,7 @@ package B2Sink import ( "context" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "strings" "github.com/chrislusf/seaweedfs/weed/filer" @@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int targetObject := bucket.Object(key) writer := targetObject.NewWriter(context.Background()) - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - var writeErr error - readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - _, err := writer.Write(data) - if err != nil { - writeErr = err - } - }) - - if readErr != nil { - return readErr - } - if writeErr != nil { - return writeErr - } - + writeFunc := func(data []byte) error { + _, writeErr := writer.Write(data) + return writeErr } - return writer.Close() + defer writer.Close() + + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { + return err + } + + return nil } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index c6bfa212a..badabc32c 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -3,6 +3,7 @@ package gcssink import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "os" "cloud.google.com/go/storage" @@ -93,25 +94,14 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) + defer wc.Close() - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - wc.Write(data) - }) - - if err != nil { - return err - } - + writeFunc := func(data []byte) error { + _, writeErr := wc.Write(data) + return writeErr } - if err := wc.Close(); err != nil { + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { return err } diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 8a8e7a92b..45265d1ba 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -157,11 +157,18 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou } func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) { - fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) + fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId) if err != nil { return nil, err } buf := make([]byte, chunk.Size) - util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf) + for _, fileUrl := range fileUrls { + _, err = util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf) + if err != nil { + glog.V(1).Infof("read from %s: %v", fileUrl, err) + } else { + break + } + } return bytes.NewReader(buf), nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 9106ee98b..c3ef8835c 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -41,7 +41,7 @@ func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) return nil } -func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { +func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -64,29 +64,38 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { if err != nil { glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + for _, loc := range locations.Locations { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part)) + } return } func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) { - fileUrl, err := fs.LookupFileId(part) + fileUrls, err := fs.LookupFileId(part) if err != nil { return "", nil, nil, err } - filename, header, resp, err = util.DownloadFile(fileUrl) + for _, fileUrl := range fileUrls { + filename, header, resp, err = util.DownloadFile(fileUrl) + if err != nil { + glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + } else { + break + } + } return filename, header, resp, err } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index ecd23413f..5166f0896 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -135,16 +135,19 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } -func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) { +func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) { fid, err := needle.ParseFileIdFromString(fileId) if err != nil { - return "", err + return nil, err } locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) if !found || len(locations) == 0 { - return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) } - return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil + for _, loc := range locations { + targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId)) + } + return } func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 97df49cb6..cee2da6e1 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -44,38 +44,36 @@ func (vc *vidMap) getLocationIndex(length int) (int, error) { return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil } -func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { +func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) - return "", err + return nil, err } - return vc.GetRandomLocation(uint32(id)) + locations, found := vc.GetLocations(uint32(id)) + if !found { + return nil, fmt.Errorf("volume %d not found", id) + } + for _, loc := range locations { + serverUrls = append(serverUrls, loc.Url) + } + return } -func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) { +func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { - return "", errors.New("Invalid fileId " + fileId) + return nil, errors.New("Invalid fileId " + fileId) } - serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0]) + serverUrls, lookupError := vc.LookupVolumeServerUrl(parts[0]) if lookupError != nil { - return "", lookupError + return nil, lookupError } - return "http://" + serverUrl + "/" + fileId, nil -} - -func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err error) { - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - return "", errors.New("Invalid fileId " + fileId) + for _, serverUrl := range serverUrls { + fullUrls = append(fullUrls, "http://"+serverUrl+"/"+fileId) } - serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0]) - if lookupError != nil { - return "", lookupError - } - return serverUrl, nil + return } func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) { @@ -99,23 +97,6 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { return } -func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { - vc.RLock() - defer vc.RUnlock() - - locations := vc.vid2Locations[vid] - if len(locations) == 0 { - return "", fmt.Errorf("volume %d not found", vid) - } - - index, err := vc.getLocationIndex(len(locations)) - if err != nil { - return "", fmt.Errorf("volume %d: %v", vid, err) - } - - return locations[index].Url, nil -} - func (vc *vidMap) addLocation(vid uint32, location Location) { vc.Lock() defer vc.Unlock()