Merge pull request #8 from chrislusf/master

merge seaweed master
This commit is contained in:
joeslay 2019-10-01 17:55:31 +01:00 committed by GitHub
commit 9d03aa6a38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 73 deletions

View file

@ -16,7 +16,7 @@ import (
) )
var ( var (
dir = flag.String("dir", "./t", "directory to store level db files") dir = flag.String("dir", "./t", "directory to store level db files")
useHash = flag.Bool("isHash", false, "hash the path as the key") useHash = flag.Bool("isHash", false, "hash the path as the key")
dbCount = flag.Int("dbCount", 1, "the number of leveldb") dbCount = flag.Int("dbCount", 1, "the number of leveldb")
) )
@ -36,7 +36,7 @@ func main() {
var dbs []*leveldb.DB var dbs []*leveldb.DB
var chans []chan string var chans []chan string
for d := 0 ; d < *dbCount; d++ { for d := 0; d < *dbCount; d++ {
dbFolder := fmt.Sprintf("%s/%02d", *dir, d) dbFolder := fmt.Sprintf("%s/%02d", *dir, d)
os.MkdirAll(dbFolder, 0755) os.MkdirAll(dbFolder, 0755)
db, err := leveldb.OpenFile(dbFolder, opts) db, err := leveldb.OpenFile(dbFolder, opts)
@ -49,9 +49,9 @@ func main() {
} }
var wg sync.WaitGroup var wg sync.WaitGroup
for d := 0 ; d < *dbCount; d++ { for d := 0; d < *dbCount; d++ {
wg.Add(1) wg.Add(1)
go func(d int){ go func(d int) {
defer wg.Done() defer wg.Done()
ch := chans[d] ch := chans[d]
@ -60,14 +60,13 @@ func main() {
for p := range ch { for p := range ch {
if *useHash { if *useHash {
insertAsHash(db, p) insertAsHash(db, p)
}else{ } else {
insertAsFullPath(db, p) insertAsFullPath(db, p)
} }
} }
}(d) }(d)
} }
counter := int64(0) counter := int64(0)
lastResetTime := time.Now() lastResetTime := time.Now()
@ -101,7 +100,7 @@ func main() {
} }
} }
for d := 0 ; d < *dbCount; d++ { for d := 0; d < *dbCount; d++ {
close(chans[d]) close(chans[d])
} }

View file

@ -1,92 +1,91 @@
package main package main
import ( import (
"flag" "flag"
"fmt"
"os"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"os"
"path/filepath"
"fmt"
) )
var ( var (
volumePath = flag.String("dir", "/tmp", "data directory to store files") volumePath = flag.String("dir", "/tmp", "data directory to store files")
volumeCollection = flag.String("collection", "", "the volume collection name") volumeCollection = flag.String("collection", "", "the volume collection name")
volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
) )
func Checksum(n* needle.Needle) string { func Checksum(n *needle.Needle) string {
return fmt.Sprintf("%s%x", n.Id, n.Cookie) return fmt.Sprintf("%s%x", n.Id, n.Cookie)
} }
type VolumeFileScanner4SeeDat struct { type VolumeFileScanner4SeeDat struct {
version needle.Version version needle.Version
block storage.SuperBlock block storage.SuperBlock
dir string dir string
hashes map[string]bool hashes map[string]bool
dat * os.File dat *os.File
} }
func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error { func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error {
scanner.version = superBlock.Version() scanner.version = superBlock.Version()
scanner.block = superBlock scanner.block = superBlock
return nil return nil
} }
func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
return true return true
} }
func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error { func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error {
if scanner.dat == nil { if scanner.dat == nil {
newDatFile, err := os.Create(filepath.Join(*volumePath, "dat_fixed")) newDatFile, err := os.Create(filepath.Join(*volumePath, "dat_fixed"))
if err != nil { if err != nil {
glog.Fatalf("Write New Volume Data %v", err) glog.Fatalf("Write New Volume Data %v", err)
}
scanner.dat = newDatFile
scanner.dat.Write(scanner.block.Bytes())
} }
scanner.dat = newDatFile
scanner.dat.Write(scanner.block.Bytes())
}
checksum := Checksum(n) checksum := Checksum(n)
if scanner.hashes[checksum] { if scanner.hashes[checksum] {
glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset) glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset)
return nil return nil
} }
scanner.hashes[checksum] = true scanner.hashes[checksum] = true
_, s, _, e := n.Append(scanner.dat, scanner.version) _, s, _, e := n.Append(scanner.dat, scanner.version)
fmt.Printf("size %d error %v\n", s, e) fmt.Printf("size %d error %v\n", s, e)
return nil return nil
} }
func main() { func main() {
flag.Parse() flag.Parse()
vid := needle.VolumeId(*volumeId) vid := needle.VolumeId(*volumeId)
outpath, _ := filepath.Abs(filepath.Dir(os.Args[0])) outpath, _ := filepath.Abs(filepath.Dir(os.Args[0]))
scanner := &VolumeFileScanner4SeeDat{ scanner := &VolumeFileScanner4SeeDat{
dir: filepath.Join(outpath, "out"), dir: filepath.Join(outpath, "out"),
hashes: map[string]bool{}, hashes: map[string]bool{},
} }
if _, err := os.Stat(scanner.dir); err != nil { if _, err := os.Stat(scanner.dir); err != nil {
if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil { if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil {
glog.Fatalf("could not create output dir : %s", err) glog.Fatalf("could not create output dir : %s", err)
} }
} }
err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner)
if err != nil { if err != nil {
glog.Fatalf("Reading Volume File [ERROR] %s\n", err) glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
} }
} }

View file

@ -2,12 +2,11 @@ package main
import ( import (
"flag" "flag"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle"
"time"
) )
var ( var (
@ -45,5 +44,4 @@ func main() {
if err != nil { if err != nil {
glog.Fatalf("Reading Volume File [ERROR] %s\n", err) glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
} }
} }

View file

@ -91,6 +91,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C
} }
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
// remove suffix '/'
if strings.HasSuffix(dirName, "/") {
dirName = dirName[:len(dirName)-1]
}
err = s3a.mkFile(ctx, dirName, entryName, finalParts) err = s3a.mkFile(ctx, dirName, entryName, finalParts)
if err != nil { if err != nil {

View file

@ -125,9 +125,11 @@ func (s3a *S3ApiServer) listFilerEntries(ctx context.Context, bucket, originalPr
} }
lastEntryName = entry.Name lastEntryName = entry.Name
if entry.IsDirectory { if entry.IsDirectory {
commonPrefixes = append(commonPrefixes, PrefixEntry{ if entry.Name != ".uploads" {
Prefix: fmt.Sprintf("%s%s/", dir, entry.Name), commonPrefixes = append(commonPrefixes, PrefixEntry{
}) Prefix: fmt.Sprintf("%s%s/", dir, entry.Name),
})
}
} else { } else {
contents = append(contents, ListEntry{ contents = append(contents, ListEntry{
Key: fmt.Sprintf("%s%s", dir, entry.Name), Key: fmt.Sprintf("%s%s", dir, entry.Name),

View file

@ -675,7 +675,7 @@ type PostResponse struct {
} }
type PrefixEntry struct { type PrefixEntry struct {
Prefix string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Prefix"` Prefix string `xml:"Prefix"`
} }
type PutObject struct { type PutObject struct {

View file

@ -275,7 +275,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
isRecursive := r.FormValue("recursive") == "true" isRecursive := r.FormValue("recursive") == "true"
ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError,true) err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, true)
if err != nil { if err != nil {
glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)

View file

@ -48,7 +48,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
collection := r.FormValue("collection") //optional, but can be faster if too many collections collection := r.FormValue("collection") //optional, but can be faster if too many collections
location := ms.findVolumeLocation(collection, vid) location := ms.findVolumeLocation(collection, vid)
httpStatus := http.StatusOK httpStatus := http.StatusOK
if location.Error != "" { if location.Error != "" || location.Locations == nil {
httpStatus = http.StatusNotFound httpStatus = http.StatusNotFound
} else { } else {
forRead := r.FormValue("read") forRead := r.FormValue("read")
@ -60,7 +60,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
// findVolumeLocation finds the volume location from master topo if it is leader, // findVolumeLocation finds the volume location from master topo if it is leader,
// or from master client if not leader // or from master client if not leader
func (ms *MasterServer) findVolumeLocation(collection string, vid string) operation.LookupResult { func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.LookupResult {
var locations []operation.Location var locations []operation.Location
var err error var err error
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {

View file

@ -98,7 +98,7 @@ func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics {
} }
func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics {
fmt.Fprintf(writer, " volume %+v \n", t) fmt.Fprintf(writer, " volume %+v \n", t)
return newStatiscis(t) return newStatistics(t)
} }
type statistics struct { type statistics struct {
@ -108,7 +108,7 @@ type statistics struct {
DeletedBytes uint64 DeletedBytes uint64
} }
func newStatiscis(t *master_pb.VolumeInformationMessage) statistics { func newStatistics(t *master_pb.VolumeInformationMessage) statistics {
return statistics{ return statistics{
Size: t.Size, Size: t.Size,
FileCount: t.FileCount, FileCount: t.FileCount,