Merge branch 'master' into messaging

This commit is contained in:
chrislu 2022-07-13 02:30:53 -07:00
commit 9c0459685e
7 changed files with 47 additions and 51 deletions

View file

@ -18,7 +18,7 @@ type ReaderCache struct {
} }
type SingleChunkCacher struct { type SingleChunkCacher struct {
sync.RWMutex sync.Mutex
cond *sync.Cond cond *sync.Cond
parent *ReaderCache parent *ReaderCache
chunkFileId string chunkFileId string
@ -183,8 +183,8 @@ func (s *SingleChunkCacher) destroy() {
} }
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
s.RLock() s.Lock()
defer s.RUnlock() defer s.Unlock()
for s.completedTime.IsZero() { for s.completedTime.IsZero() {
s.cond.Wait() s.cond.Wait()

View file

@ -1,8 +1,8 @@
package filer package filer
type ReaderPattern struct { type ReaderPattern struct {
isStreaming bool isSequentialCounter int64
lastReadOffset int64 lastReadStopOffset int64
} }
// For streaming read: only cache the first chunk // For streaming read: only cache the first chunk
@ -10,29 +10,20 @@ type ReaderPattern struct {
func NewReaderPattern() *ReaderPattern { func NewReaderPattern() *ReaderPattern {
return &ReaderPattern{ return &ReaderPattern{
isStreaming: true, isSequentialCounter: 0,
lastReadOffset: -1, lastReadStopOffset: 0,
} }
} }
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
isStreaming := true if rp.lastReadStopOffset == offset {
if rp.lastReadOffset > offset { rp.isSequentialCounter++
isStreaming = false } else {
rp.isSequentialCounter--
} }
if rp.lastReadOffset == -1 { rp.lastReadStopOffset = offset + int64(size)
if offset != 0 {
isStreaming = false
}
}
rp.lastReadOffset = offset
rp.isStreaming = isStreaming
}
func (rp *ReaderPattern) IsStreamingMode() bool {
return rp.isStreaming
} }
func (rp *ReaderPattern) IsRandomMode() bool { func (rp *ReaderPattern) IsRandomMode() bool {
return !rp.isStreaming return rp.isSequentialCounter >= 0
} }

View file

@ -29,14 +29,14 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw return pw
} }
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) { func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize], isSequentail) pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
offset += writeSize offset += writeSize
data = data[writeSize:] data = data[writeSize:]
} }

View file

@ -1,9 +1,9 @@
package mount package mount
type WriterPattern struct { type WriterPattern struct {
isStreaming bool isSequentialCounter int64
lastWriteOffset int64 lastWriteStopOffset int64
chunkSize int64 chunkSize int64
} }
// For streaming write: only cache the first chunk // For streaming write: only cache the first chunk
@ -12,33 +12,21 @@ type WriterPattern struct {
func NewWriterPattern(chunkSize int64) *WriterPattern { func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{ return &WriterPattern{
isStreaming: true, isSequentialCounter: 0,
lastWriteOffset: -1, lastWriteStopOffset: 0,
chunkSize: chunkSize, chunkSize: chunkSize,
} }
} }
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
if rp.lastWriteOffset > offset { if rp.lastWriteStopOffset == offset {
rp.isStreaming = false rp.isSequentialCounter++
} else {
rp.isSequentialCounter--
} }
if rp.lastWriteOffset == -1 { rp.lastWriteStopOffset = offset + int64(size)
if offset != 0 {
rp.isStreaming = false
}
}
rp.lastWriteOffset = offset
} }
func (rp *WriterPattern) IsStreamingMode() bool { func (rp *WriterPattern) IsSequentialMode() bool {
return rp.isStreaming return rp.isSequentialCounter >= 0
}
func (rp *WriterPattern) IsRandomMode() bool {
return !rp.isStreaming
}
func (rp *WriterPattern) Reset() {
rp.isStreaming = true
rp.lastWriteOffset = -1
} }

View file

@ -58,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode()) fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
written = uint32(len(data)) written = uint32(len(data))

View file

@ -2,6 +2,7 @@ package shell
import ( import (
"bytes" "bytes"
"errors"
"flag" "flag"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
@ -164,6 +165,17 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
s3cfg.Identities = append(s3cfg.Identities, &identity) s3cfg.Identities = append(s3cfg.Identities, &identity)
} }
accessKeySet := make(map[string]string)
for _, ident := range s3cfg.Identities {
for _, cred := range ident.Credentials {
if userName, found := accessKeySet[cred.AccessKey]; !found {
accessKeySet[cred.AccessKey] = ident.Name
} else {
return errors.New(fmt.Sprintf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName))
}
}
}
buf.Reset() buf.Reset()
filer.ProtoToText(&buf, s3cfg) filer.ProtoToText(&buf, s3cfg)

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -206,7 +207,11 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
l.concurrentLoadingVolumes(needleMapKind, 10) workerNum := runtime.NumCPU()
if workerNum <= 10 {
workerNum = 10
}
l.concurrentLoadingVolumes(needleMapKind, workerNum)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards() l.loadAllEcShards()