mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
Merge branch 'master' into messaging
This commit is contained in:
commit
8050f9202f
|
@ -185,7 +185,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
|
|||
return nil
|
||||
}
|
||||
|
||||
glog.V(4).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFileId)
|
||||
glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFileId)
|
||||
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
|
|
@ -76,7 +76,9 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
|
|||
rc.Lock()
|
||||
defer rc.Unlock()
|
||||
if cacher, found := rc.downloaders[fileId]; found {
|
||||
return cacher.readChunkAt(buffer, offset)
|
||||
if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
if shouldCache || rc.lookupFileIdFn == nil {
|
||||
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
|
||||
|
@ -176,6 +178,9 @@ func (s *SingleChunkCacher) startCaching() {
|
|||
}
|
||||
|
||||
func (s *SingleChunkCacher) destroy() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if s.data != nil {
|
||||
mem.Free(s.data)
|
||||
s.data = nil
|
||||
|
@ -194,6 +199,10 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
|
|||
return 0, s.err
|
||||
}
|
||||
|
||||
if len(s.data) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return copy(buf, s.data[offset:]), nil
|
||||
|
||||
}
|
||||
|
|
|
@ -445,9 +445,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
|
|||
func setEtag(w http.ResponseWriter, etag string) {
|
||||
if etag != "" {
|
||||
if strings.HasPrefix(etag, "\"") {
|
||||
w.Header().Set("ETag", etag)
|
||||
w.Header()["ETag"] = []string{etag}
|
||||
} else {
|
||||
w.Header().Set("ETag", "\""+etag+"\"")
|
||||
w.Header()["ETag"] = []string{"\"" + etag + "\""}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,12 +95,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
|
|||
|
||||
alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId)
|
||||
if alreadyKnown {
|
||||
return fmt.Errorf("duplicated local subscription detected for client %s id %d", clientName, req.ClientId)
|
||||
return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
|
||||
}
|
||||
defer fs.deleteClient(clientName, req.ClientId)
|
||||
defer func() {
|
||||
glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
|
||||
fs.deleteClient(clientName, req.ClientId)
|
||||
}()
|
||||
|
||||
lastReadTime := time.Unix(0, req.SinceNs)
|
||||
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
|
||||
glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
|
||||
|
||||
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
|
||||
|
||||
|
|
Loading…
Reference in a new issue