adding filer!!!

This commit is contained in:
Chris Lu 2014-03-30 20:57:25 -07:00
parent d6aa6239de
commit 67be8a5af8
4 changed files with 138 additions and 35 deletions

79
go/weed/filer.go Normal file
View file

@ -0,0 +1,79 @@
package main
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"code.google.com/p/weed-fs/go/weed/weed_server"
"net/http"
"os"
"strconv"
"time"
)
var (
f FilerOptions
)
type FilerOptions struct {
master *string
port *int
collection *string
defaultReplicaPlacement *string
dir *string
}
func init() {
cmdFiler.Run = runFiler // break init cycle
f.master = cmdFiler.Flag.String("master", "localhost:9333", "master server location")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.")
}
var cmdFiler = &Command{
UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
Short: "start a file server that points to a master server",
Long: `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
//get the file content
GET /path/to/file
//create or overwrite the file, the filename in the multipart request will be used
POST /path/to/
//return a json format subdirectory and files listing
GET /path/to/
Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
It should be highly scalable to hundreds of millions of files on a modest machine.
Future we will ensure it can avoid of being SPOF.
`,
}
func runFiler(cmd *Command, args []string) bool {
if err := util.TestFolderWritable(*f.dir); err != nil {
glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err)
}
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection)
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*f.port))
filerListener, e := util.NewListener(
":"+strconv.Itoa(*f.port),
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf(e.Error())
}
if e := http.Serve(filerListener, r); e != nil {
glog.Fatalf("Filer Fail to serve:%s", e.Error())
}
return true
}

View file

@ -14,6 +14,10 @@ import (
"time" "time"
) )
var (
filer FilerOptions
)
func init() { func init() {
cmdServer.Run = runServer // break init cycle cmdServer.Run = runServer // break init cycle
} }
@ -28,10 +32,6 @@ var cmdServer = &Command{
The servers are the same as starting them separately. The servers are the same as starting them separately.
So other volume servers can use this embedded master server also. So other volume servers can use this embedded master server also.
However, this may change very soon.
The target is to start both volume server and embedded master server on all instances,
and use a leader election process to auto choose a master server.
`, `,
} }
@ -53,23 +53,32 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
serverWhiteList []string serverWhiteList []string
) )
func init() {
filer.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filer.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filer.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filer.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified")
filer.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
}
func runServer(cmd *Command, args []string) bool { func runServer(cmd *Command, args []string) bool {
*filer.master = *serverIp + ":" + strconv.Itoa(*masterPort)
if *filer.defaultReplicaPlacement == "" {
*filer.defaultReplicaPlacement = *masterDefaultReplicaPlacement
}
if *serverMaxCpu < 1 { if *serverMaxCpu < 1 {
*serverMaxCpu = runtime.NumCPU() *serverMaxCpu = runtime.NumCPU()
} }
runtime.GOMAXPROCS(*serverMaxCpu) runtime.GOMAXPROCS(*serverMaxCpu)
if *masterMetaFolder == "" {
*masterMetaFolder = *volumeDataFolders
}
if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *masterMetaFolder, err)
}
folders := strings.Split(*volumeDataFolders, ",") folders := strings.Split(*volumeDataFolders, ",")
maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",") maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",")
maxCounts := make([]int, 0) maxCounts := make([]int, 0)
@ -89,6 +98,20 @@ func runServer(cmd *Command, args []string) bool {
} }
} }
if *masterMetaFolder == "" {
*masterMetaFolder = folders[0]
}
if *filer.dir == "" {
*filer.dir = *masterMetaFolder + "/filer"
os.MkdirAll(*filer.dir, 0700)
}
if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
}
if err := util.TestFolderWritable(*filer.dir); err != nil {
glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filer.dir, err)
}
if *volumePublicUrl == "" { if *volumePublicUrl == "" {
*volumePublicUrl = *serverIp + ":" + strconv.Itoa(*volumePort) *volumePublicUrl = *serverIp + ":" + strconv.Itoa(*volumePort)
} }
@ -96,24 +119,26 @@ func runServer(cmd *Command, args []string) bool {
serverWhiteList = strings.Split(*serverWhiteListOption, ",") serverWhiteList = strings.Split(*serverWhiteListOption, ",")
} }
go func() { if *isStartingFiler {
r := http.NewServeMux() go func() {
_, nfs_err := weed_server.NewFilerServer(r, *serverIp+":"+strconv.Itoa(*masterPort), *volumeDataFolders) r := http.NewServeMux()
if nfs_err != nil { _, nfs_err := weed_server.NewFilerServer(r, *filer.port, *filer.master, *filer.dir, *filer.collection)
glog.Fatalf(nfs_err.Error()) if nfs_err != nil {
} glog.Fatalf(nfs_err.Error())
glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", *serverIp+":"+strconv.Itoa(8888)) }
filerListener, e := util.NewListener( glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", strconv.Itoa(*filer.port))
*serverIp+":"+strconv.Itoa(8888), filerListener, e := util.NewListener(
time.Duration(*serverTimeout)*time.Second, ":"+strconv.Itoa(*filer.port),
) time.Duration(10)*time.Second,
if e != nil { )
glog.Fatalf(e.Error()) if e != nil {
} glog.Fatalf(e.Error())
if e := http.Serve(filerListener, r); e != nil { }
glog.Fatalf("Master Fail to serve:%s", e.Error()) if e := http.Serve(filerListener, r); e != nil {
} glog.Fatalf("Filer Fail to serve:%s", e.Error())
}() }
}()
}
var raftWaitForMaster sync.WaitGroup var raftWaitForMaster sync.WaitGroup
var volumeWait sync.WaitGroup var volumeWait sync.WaitGroup

View file

@ -21,6 +21,7 @@ var server *string
var commands = []*Command{ var commands = []*Command{
cmdBenchmark, cmdBenchmark,
cmdCompact, cmdCompact,
cmdFiler,
cmdFix, cmdFix,
cmdServer, cmdServer,
cmdMaster, cmdMaster,

View file

@ -1,11 +1,11 @@
package weed_server package weed_server
import ( import (
"code.google.com/p/weed-fs/go/glog"
"errors" "errors"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
"net/http" "net/http"
"strconv"
"strings" "strings"
) )
@ -25,11 +25,11 @@ type FilerServer struct {
db *leveldb.DB db *leveldb.DB
} }
func NewFilerServer(r *http.ServeMux, master string, dir string) (fs *FilerServer, err error) { func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) {
fs = &FilerServer{ fs = &FilerServer{
master: master, master: master,
collection: "", collection: collection,
port: ":8888", port: ":" + strconv.Itoa(port),
} }
if fs.db, err = leveldb.OpenFile(dir, nil); err != nil { if fs.db, err = leveldb.OpenFile(dir, nil); err != nil {
@ -38,8 +38,6 @@ func NewFilerServer(r *http.ServeMux, master string, dir string) (fs *FilerServe
r.HandleFunc("/", fs.filerHandler) r.HandleFunc("/", fs.filerHandler)
glog.V(0).Infoln("file server started on port ", fs.port)
return fs, nil return fs, nil
} }