Merge pull request #4 from chrislusf/master

merge
This commit is contained in:
yourchanges 2015-02-02 15:21:55 +08:00
commit a2b1afce78
14 changed files with 109 additions and 40 deletions

View file

@ -7,7 +7,7 @@ RUN echo insecure >> ~/.curlrc
RUN \
curl -Lks https://bintray.com$(curl -Lk http://bintray.com/chrislusf/Weed-FS/seaweed/_latestVersion | grep linux_amd64.tar.gz | sed -n "/href/ s/.*href=['\"]\([^'\"]*\)['\"].*/\1/gp") | gunzip | tar -xf - -C /opt/weed/ && \
mv weed_* bin && \
mv weed_*/* /bin && \
chmod +x ./bin/weed
EXPOSE 8080

View file

@ -25,6 +25,8 @@ Seaweed-FS costs only 40 bytes disk storage for each file's metadata. It is so s
![](https://api.bintray.com/packages/chrislusf/Weed-FS/seaweed/images/download.png)
https://bintray.com/chrislusf/Weed-FS/seaweed Download latest compiled binaries for different platforms here.
http://groups.google.com/group/weed-file-system Seaweed File System Discussion Group
## Additional Features
* Can choose no replication or different replication level, rack and data center aware
* Automatic master servers failover. No single point of failure, SPOF.
@ -58,23 +60,25 @@ Seaweed-FS uses HTTP REST operations to write, read, delete. The return results
```
### Write File ###
Here is a simple usage on how to save a file:
To upload a file, first, send a HTTP POST, PUT, or GET request to `/dir/assign` to get an fid and a volume server url:
```
> curl http://localhost:9333/dir/assign
> curl -X POST http://localhost:9333/dir/assign
{"count":1,"fid":"3,01637037d6","url":"127.0.0.1:8080","publicUrl":"localhost:8080"}
```
First, send a HTTP request to get an fid and a volume server url.
Second, to store the file content, send a HTTP multipart PUT or POST request to `url + '/' + fid` from the response:
```
> curl -F file=@/home/chris/myphoto.jpg http://127.0.0.1:8080/3,01637037d6
> curl -X PUT -F file=@/home/chris/myphoto.jpg http://127.0.0.1:8080/3,01637037d6
{"size": 43234}
```
Second, send a HTTP multipart POST request to the volume server url+'/'+fid, to really store the file content.
For update, send another POST request with updated file content.
For update, send another PUT or POST request with updated file content.
For deletion, send an HTTP DELETE request to the same `url + '/' + fid` URL:
For deletion, send a http DELETE request
```
> curl -X DELETE http://127.0.0.1:8080/3,01637037d6
```
@ -92,20 +96,28 @@ If stored as a string, in theory, you would need 8+1+16+8=33 bytes. A char(33) w
If space is really a concern, you can store the file id in your own format. You would need one 4-byte integer for volume id, 8-byte long number for file key, 4-byte integer for file cookie. So 16 bytes are enough (more than enough).
### Read File ###
Here is the example on how to render the URL.
First lookup the volume server's URLs by the file's volumeId:
```
> curl http://localhost:9333/dir/lookup?volumeId=3
{"locations":[{"publicUrl":"localhost:8080","url":"localhost:8080"}]}
```
First lookup the volume server's URLs by the file's volumeId. However, since usually there are not too many volume servers, and volumes does not move often, you can cache the results most of the time. Depends on the replication type, one volume can have multiple replica locations. Just randomly pick one location to read.
(However, since usually there are not too many volume servers, and volumes does not move often, you can cache the results most of the time. Depends on the replication type, one volume can have multiple replica locations. Just randomly pick one location to read.)
Now you can take the public url, render the url or directly read from the volume server via url:
```
http://localhost:8080/3,01637037d6.jpg
```
Notice we add an file extension ".jpg" here. It's optional and just one way for the client to specify the file content type.
If you want a nicer URL, you can use one of these alternative URL formats:
```
http://localhost:8080/3/01637037d6/my_preferred_name.jpg
http://localhost:8080/3/01637037d6.jpg
@ -116,11 +128,13 @@ If you want a nicer URL, you can use one of these alternative URL formats:
### Rack-Aware and Data Center-Aware Replication ###
Seaweed-FS apply the replication strategy on a volume level. So when you are getting a file id, you can specify the replication strategy. For example:
```
curl http://localhost:9333/dir/assign?replication=001
curl -X POST http://localhost:9333/dir/assign?replication=001
```
Here is the meaning of the replication parameter
Here is the meaning of the replication parameter:
```
000: no replication
001: replicate once on the same rack
@ -136,14 +150,18 @@ https://code.google.com/p/weed-fs/wiki/RackDataCenterAwareReplication
You can also set the default replication strategy when starting the master server.
### Allocate File Key on specific data center ###
Volume servers can start with a specific data center name.
Volume servers can start with a specific data center name:
```
weed volume -dir=/tmp/1 -port=8080 -dataCenter=dc1
weed volume -dir=/tmp/2 -port=8081 -dataCenter=dc2
```
Or the master server can determine the data center via volume server's IP address and settings in weed.conf file.
Now when requesting a file key, an optional "dataCenter" parameter can limit the assigned volume to the specific data center. For example, this specify
Now when requesting a file key, an optional "dataCenter" parameter can limit the assigned volume to the specific data center. For example, this specifies that the assigned volume should be limited to 'dc1':
```
http://localhost:9333/dir/assign?dataCenter=dc1
```
@ -182,9 +200,9 @@ When a client needs to read a file based on <volume id, file key, file cookie>,
Please see the example for details on write-read process.
### Storage Size ###
In current implementation, each volume can be size of 8x2^32^=32G bytes. This is because of aligning contents to 8 bytes. We can be easily increased to 64G, or 128G, or more, by changing 2 lines of code, at the cost of some wasted padding space due to alignment.
In current implementation, each volume can be size of 8x2^32 bytes (32GiB). This is because of aligning contents to 8 bytes. We can be easily increased to 64G, or 128G, or more, by changing 2 lines of code, at the cost of some wasted padding space due to alignment.
There can be 2^32^ volumes. So total system size is 8 x 2^32^ x 2^32^ = 8 x 4G x 4G = 128GG bytes. (Sorry, I don't know the word for giga of giga bytes.)
There can be 2^32 volumes. So total system size is 8 x 2^32 bytes x 2^32 = 8 x 4GiB x 4Gi = 128EiB (2^67 bytes, or 128 exbibytes).
Each individual file size is limited to the volume size.

View file

@ -14,7 +14,7 @@ It has these top-level messages:
*/
package operation
import "github.com/golang/protobuf/proto"
import proto "github.com/golang/protobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
@ -121,6 +121,7 @@ type JoinMessage struct {
DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -191,5 +192,12 @@ func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
return nil
}
func (m *JoinMessage) GetAdminPort() uint32 {
if m != nil && m.AdminPort != nil {
return *m.AdminPort
}
return 0
}
func init() {
}

View file

@ -23,4 +23,5 @@ message JoinMessage {
optional string data_center = 7;
optional string rack = 8;
repeated VolumeInformationMessage volumes = 9;
optional uint32 admin_port = 10;
}

View file

@ -73,8 +73,9 @@ func (mn *MasterNodes) findMaster() (string, error) {
* A VolumeServer contains one Store
*/
type Store struct {
Port int
Ip string
Port int
AdminPort int
PublicUrl string
Locations []*DiskLocation
dataCenter string //optional informaton, overwriting master setting if exists
@ -89,8 +90,8 @@ func (s *Store) String() (str string) {
return
}
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl}
func NewStore(port, adminPort int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
s = &Store{Port: port, AdminPort: adminPort, Ip: ip, PublicUrl: publicUrl}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]}
@ -308,6 +309,7 @@ func (s *Store) Join() (masterNode string, e error) {
DataCenter: proto.String(s.dataCenter),
Rack: proto.String(s.rack),
Volumes: volumeMessages,
AdminPort: proto.Uint32(uint32(s.AdminPort)),
}
data, err := proto.Marshal(joinMessage)

View file

@ -19,7 +19,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
jsonBlob, err := util.Post("http://"+dn.AdminUrl()+"/admin/assign_volume", values)
if err != nil {
return err
}

View file

@ -13,6 +13,7 @@ type DataNode struct {
volumes map[storage.VolumeId]storage.VolumeInfo
Ip string
Port int
AdminPort int
PublicUrl string
LastSeen int64 // unix time in seconds
Dead bool
@ -28,7 +29,7 @@ func NewDataNode(id string) *DataNode {
}
func (dn *DataNode) String() string {
return fmt.Sprintf("NodeImpl:%s ,volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
@ -89,6 +90,10 @@ func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
}
func (dn *DataNode) AdminUrl() string {
return dn.Ip + ":" + strconv.Itoa(dn.AdminPort)
}
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()

View file

@ -27,7 +27,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
func (r *Rack) GetOrCreateDataNode(ip string, port, adminPort int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@ -43,6 +43,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := NewDataNode(ip + ":" + strconv.Itoa(port))
dn.Ip = ip
dn.Port = port
dn.AdminPort = adminPort
dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount
dn.LastSeen = time.Now().Unix()

View file

@ -157,7 +157,13 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount))
adminPort := *joinMessage.Port
if joinMessage.AdminPort != nil {
adminPort = *joinMessage.AdminPort
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), int(adminPort), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {

View file

@ -23,7 +23,7 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
//glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
ch <- ret
}
}(index, dn.Url(), vid)
}(index, dn.AdminUrl(), vid)
}
isCheckSuccess := true
for _ = range locationlist.list {
@ -50,7 +50,7 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
ch <- true
}
}(index, dn.Url(), vid)
}(index, dn.AdminUrl(), vid)
}
isVacuumSuccess := true
for _ = range locationlist.list {
@ -66,12 +66,12 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.AdminUrl())
if e := vacuumVolume_Commit(dn.AdminUrl(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.AdminUrl(), e)
isCommitSuccess = false
} else {
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.AdminUrl())
}
if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid)

View file

@ -201,7 +201,7 @@ func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *Volum
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
} else {
glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %d: %v", vid, err)

View file

@ -64,6 +64,7 @@ var (
masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
volumeAdminPort = cmdServer.Flag.Int("volume.port.admin", 0, "volume server admin port to talk with master and other volume servers")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
@ -225,7 +226,9 @@ func runServer(cmd *Command, args []string) bool {
volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
volumeServer := weed_server.NewVolumeServer(r, r,
*serverIp, *volumePort, *volumeAdminPort, *serverPublicIp,
folders, maxCounts,
*serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation,
)

View file

@ -38,7 +38,7 @@ type VolumeServerOptions struct {
func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
v.adminPort = cmdVolume.Flag.Int("port.admin", 8443, "https admin port, active when SSL certs are specified. Not ready yet.")
v.adminPort = cmdVolume.Flag.Int("port.admin", 0, "admin port to talk with master and other volume servers")
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
@ -105,28 +105,50 @@ func runVolume(cmd *Command, args []string) bool {
}
}
r := http.NewServeMux()
if *v.adminPort == 0 {
*v.adminPort = *v.port
}
isSeperatedAdminPort := *v.adminPort != *v.port
volumeServer := weed_server.NewVolumeServer(r, r, *v.ip, *v.port, *v.publicIp, v.folders, v.folderMaxLimits,
publicMux := http.NewServeMux()
adminMux := publicMux
if isSeperatedAdminPort {
adminMux = http.NewServeMux()
}
volumeServer := weed_server.NewVolumeServer(publicMux, adminMux,
*v.ip, *v.port, *v.adminPort, *v.publicIp,
v.folders, v.folderMaxLimits,
*v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation,
)
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
}
if isSeperatedAdminPort {
adminListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.adminPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "admin at", adminListeningAddress)
adminListener, e := util.NewListener(adminListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
glog.Fatalf("Volume server listener error:%v", e)
}
go func() {
if e := http.Serve(adminListener, adminMux); e != nil {
glog.Fatalf("Volume server fail to serve admin: %v", e)
}
}()
}
OnInterrupt(func() {
volumeServer.Shutdown()
})
if e := http.Serve(listener, r); e != nil {
if e := http.Serve(listener, publicMux); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e)
}
return true

View file

@ -22,7 +22,9 @@ type VolumeServer struct {
FixJpgOrientation bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
port, adminPort int, publicIp string,
folders []string, maxCounts []int,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
@ -35,7 +37,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, pu
rack: rack,
FixJpgOrientation: fixJpgOrientation,
}
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
vs.guard = security.NewGuard(whiteList, "")
@ -62,6 +64,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, pu
if err == nil {
if !connected {
connected = true
vs.masterNode = master
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {