seaweedfs/weed/remote_storage/hdfs/hdfs_storage_client.go
2021-08-29 19:13:48 -07:00

179 lines
4.4 KiB
Go

package hdfs
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/colinmarc/hdfs/v2"
"io"
"os"
"path"
)
func init() {
remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
}
type hdfsRemoteStorageMaker struct{}
func (s hdfsRemoteStorageMaker) HasBucket() bool {
return false
}
func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
client := &hdfsRemoteStorageClient{
conf: conf,
}
options := hdfs.ClientOptions{
Addresses: conf.HdfsNamenodes,
UseDatanodeHostname: false,
}
if conf.HdfsServicePrincipalName != "" {
var err error
options.KerberosClient, err = getKerberosClient()
if err != nil {
return nil, fmt.Errorf("get kerberos authentication: %s", err)
}
options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
if conf.HdfsDataTransferProtection != "" {
options.DataTransferProtection = conf.HdfsDataTransferProtection
}
} else {
options.User = conf.HdfsUsername
}
c, err := hdfs.NewClient(options)
if err != nil {
return nil, err
}
client.client = c
return client, nil
}
type hdfsRemoteStorageClient struct {
conf *remote_pb.RemoteConf
client *hdfs.Client
}
var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
children, err := c.client.ReadDir(string(parentDir))
if err != nil {
return err
}
for _, child := range children {
if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
StorageName: c.conf.Name,
LastLocalSyncTsNs: 0,
RemoteETag: "",
RemoteMtime: child.ModTime().Unix(),
RemoteSize: child.Size(),
}); err != nil {
return nil
}
}
return nil
}, util.FullPath(loc.Path), visitFn)
}
func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
f, err := c.client.Open(loc.Path)
if err != nil {
return
}
defer f.Close()
data = make([]byte, size)
_, err = f.ReadAt(data, offset)
return
}
func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
}
func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
return c.client.RemoveAll(loc.Path)
}
func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
dirname := path.Dir(loc.Path)
// ensure parent directory
if err = c.client.MkdirAll(dirname, 0755); err != nil {
return
}
// remove existing file
info, err := c.client.Stat(loc.Path)
if err == nil {
err = c.client.Remove(loc.Path)
if err != nil {
return
}
}
// create new file
out, err := c.client.Create(loc.Path)
if err != nil {
return
}
cleanup := func() {
if removeErr := c.client.Remove(loc.Path); removeErr != nil {
glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
}
}
if _, err = io.Copy(out, reader); err != nil {
cleanup()
return
}
if err = out.Close(); err != nil {
cleanup()
return
}
info, err = c.client.Stat(loc.Path)
if err != nil {
return
}
return &filer_pb.RemoteEntry{
RemoteMtime: info.ModTime().Unix(),
RemoteSize: info.Size(),
RemoteETag: "",
StorageName: c.conf.Name,
}, nil
}
func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
return err
}
}
return nil
}
func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
if err = c.client.Remove(loc.Path); err != nil {
return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
}
return
}