From 64c48c9724c77faa0f7c97049eda6845b61a1649 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 21:44:53 -0800 Subject: [PATCH 01/11] seems compiling --- go.mod | 1 + go.sum | 15 ++ weed/filer/hbase/hbase_store.go | 223 +++++++++++++++++++++++++++++ weed/filer/hbase/hbase_store_kv.go | 67 +++++++++ 4 files changed, 306 insertions(+) create mode 100644 weed/filer/hbase/hbase_store.go create mode 100644 weed/filer/hbase/hbase_store_kv.go diff --git a/go.mod b/go.mod index 1ecbfd2a9..87bc93284 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/syndtr/goleveldb v1.0.0 github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 + github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 // indirect github.com/valyala/bytebufferpool v1.0.0 github.com/viant/assertly v0.5.4 // indirect github.com/viant/ptrie v0.3.0 diff --git a/go.sum b/go.sum index 387d2a8ac..66fb72a6b 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg= @@ -223,6 +225,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= @@ -539,6 +542,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -562,6 +566,8 @@ github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDqSt+GTGFMVlhk3ULuV0y9ZmzeVGR4mloJI3M= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -621,6 +627,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 h1:6iRwZdrFUzbcVYZwa8dXTIILGIxmmhjyUPJEcwzPGaU= +github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365/go.mod h1:zj0GJHGvyf1ed3Jm/Tb4830c/ZKDq+YoLsCt2rGQuT0= github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -785,6 +793,7 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfru golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd h1:WgqgiQvkiZWz7XLhphjt2GI2GcGCTIZs9jqXMWmH+oc= golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -930,8 +939,14 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o= +modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg= +modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= +rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= +rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go new file mode 100644 index 000000000..6e1ccea35 --- /dev/null +++ b/weed/filer/hbase/hbase_store.go @@ -0,0 +1,223 @@ +package hbase + +import ( + "bytes" + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tsuna/gohbase" + "github.com/tsuna/gohbase/hrpc" + "io" +) + +func init() { + filer.Stores = append(filer.Stores, &HbaseStore{}) +} + +type HbaseStore struct { + Client gohbase.Client + table []byte + cfKv string + cfMetaDir string + cfFlatDir string + column string +} + +func (store *HbaseStore) GetName() string { + return "hbase" +} + +func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"zkquorum"), + configuration.GetString(prefix+"table"), + ) +} + +func (store *HbaseStore) initialize(zkquorum, table string) (err error) { + store.Client = gohbase.NewClient(zkquorum) + store.table = []byte(table) + store.cfKv = "kv" + store.cfMetaDir = "meta" + store.cfFlatDir = "flat" + store.column = "a" + + // check table exists + key := "whatever" + headers := map[string][]string{store.cfMetaDir: nil} + get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers)) + if err != nil { + return fmt.Errorf("NewGet returned an error: %v", err) + } + _, err = store.Client.Get(get) + if err != gohbase.TableNotFound { + return nil + } + + // create table + adminClient := gohbase.NewAdminClient(zkquorum) + cFamilies := []string{store.cfKv, store.cfMetaDir, store.cfFlatDir} + cf := make(map[string]map[string]string, len(cFamilies)) + for _, f := range cFamilies { + cf[f] = nil + } + ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf) + if err := adminClient.CreateTable(ct); err != nil { + return err + } + + return nil +} + +func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + + return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value) +} + +func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) { + value, err := store.doGet(ctx, store.cfMetaDir, []byte(path)) + if err != nil { + if err == filer.ErrKvNotFound { + return nil, filer_pb.ErrNotFound + } + return nil, err + } + + entry = &filer.Entry{ + FullPath: path, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil +} + +func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) { + return store.doDelete(ctx, store.cfMetaDir, []byte(path)) +} + +func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { + + family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} + expectedPrefix := []byte(dirPath+"/") + scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) + if err != nil { + return err + } + + scanner := store.Client.Scan(scan) + defer scanner.Close() + for { + res, err := scanner.Next() + if err != nil { + break + } + if len(res.Cells) == 0 { + continue + } + cell := res.Cells[0] + + if !bytes.HasPrefix(cell.Row, expectedPrefix) { + break + } + + err = store.doDelete(ctx, store.cfMetaDir, cell.Row) + if err != nil { + break + } + + } + return +} + +func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +} + +func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) { + family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} + expectedPrefix := []byte(dirPath.Child(prefix)) + scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) + if err != nil { + return nil, err + } + + var entries []*filer.Entry + scanner := store.Client.Scan(scan) + defer scanner.Close() + for { + res, err := scanner.Next() + if err == io.EOF { + break + } + if err != nil { + return entries, err + } + if len(res.Cells) == 0 { + continue + } + cell := res.Cells[0] + + if !bytes.HasPrefix(cell.Row, expectedPrefix) { + break + } + + fullpath := util.FullPath(cell.Row) + + + value := cell.Value + + _, fileName := fullpath.DirAndName() + + if fileName == startFileName && !includeStartFile { + continue + } + + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: fullpath, + } + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + + return entries, nil +} + +func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *HbaseStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *HbaseStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *HbaseStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go new file mode 100644 index 000000000..b98eb3ed6 --- /dev/null +++ b/weed/filer/hbase/hbase_store_kv.go @@ -0,0 +1,67 @@ +package hbase + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/tsuna/gohbase/hrpc" +) + +const( + COLUMN_NAME = "a" +) +func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return store.doPut(ctx, store.cfKv, key, value) +} + +func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return store.doGet(ctx, store.cfKv, key) +} + +func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) { + return store.doDelete(ctx, store.cfKv, key) +} + +func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte) (err error) { + values := map[string]map[string][]byte{store.cfKv: map[string][]byte{}} + values[cf][COLUMN_NAME] = value + putRequest, err := hrpc.NewPut(ctx, store.table, key, values) + if err != nil { + return err + } + _, err = store.Client.Put(putRequest) + if err != nil { + return err + } + return nil +} + +func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (value []byte, err error) { + family := map[string][]string{cf: {COLUMN_NAME}} + getRequest, err := hrpc.NewGet(context.Background(), store.table, key, hrpc.Families(family)) + if err != nil { + return nil, err + } + getResp, err := store.Client.Get(getRequest) + if err != nil { + return nil, err + } + if len(getResp.Cells) == 0 { + return nil, filer.ErrKvNotFound + } + + return getResp.Cells[0].Value, nil +} + +func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { + values := map[string]map[string][]byte{store.cfKv: map[string][]byte{}} + values[cf][COLUMN_NAME] = nil + deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values) + if err != nil { + return err + } + _, err = store.Client.Delete(deleteRequest) + if err != nil { + return err + } + return nil +} From 94e3757c08909c6796c2bf02b579f8ee36128c4d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 21:45:16 -0800 Subject: [PATCH 02/11] fix --- weed/filer/hbase/hbase_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 6e1ccea35..fea6004be 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -114,7 +114,7 @@ func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (e func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} - expectedPrefix := []byte(dirPath+"/") + expectedPrefix := []byte(path+"/") scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { return err From c3d1b3b5aa5aef8d69cd1254818c296238e0fe64 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 21:49:01 -0800 Subject: [PATCH 03/11] hook up --- weed/command/scaffold.go | 4 ++++ weed/server/filer_server.go | 1 + 2 files changed, 5 insertions(+) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 04a988027..f3a094d5a 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -141,6 +141,10 @@ password="" # This changes the data layout. Only add new directories. Removing/Updating will cause data loss. superLargeDirectories = [] +[hbase] +zkquorum = "" +table = "seaweedfs" + [redis2] enabled = false address = "localhost:6379" diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2991d14ab..2da129ab2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -23,6 +23,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" From 32955b04b83a8132624ee332427e05fd59a32e59 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 22:08:41 -0800 Subject: [PATCH 04/11] Update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 87bc93284..d93a44379 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/syndtr/goleveldb v1.0.0 github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 - github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 // indirect + github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 github.com/valyala/bytebufferpool v1.0.0 github.com/viant/assertly v0.5.4 // indirect github.com/viant/ptrie v0.3.0 From c4a202ec417554f170a21d7331b44f860ea72274 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 23:23:05 -0800 Subject: [PATCH 05/11] fix wrong column family --- weed/filer/hbase/hbase_store.go | 4 +--- weed/filer/hbase/hbase_store_kv.go | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index fea6004be..8a9ea7fc1 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -22,7 +22,6 @@ type HbaseStore struct { table []byte cfKv string cfMetaDir string - cfFlatDir string column string } @@ -42,7 +41,6 @@ func (store *HbaseStore) initialize(zkquorum, table string) (err error) { store.table = []byte(table) store.cfKv = "kv" store.cfMetaDir = "meta" - store.cfFlatDir = "flat" store.column = "a" // check table exists @@ -59,7 +57,7 @@ func (store *HbaseStore) initialize(zkquorum, table string) (err error) { // create table adminClient := gohbase.NewAdminClient(zkquorum) - cFamilies := []string{store.cfKv, store.cfMetaDir, store.cfFlatDir} + cFamilies := []string{store.cfKv, store.cfMetaDir} cf := make(map[string]map[string]string, len(cFamilies)) for _, f := range cFamilies { cf[f] = nil diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go index b98eb3ed6..8f5794ac1 100644 --- a/weed/filer/hbase/hbase_store_kv.go +++ b/weed/filer/hbase/hbase_store_kv.go @@ -22,7 +22,7 @@ func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) { } func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte) (err error) { - values := map[string]map[string][]byte{store.cfKv: map[string][]byte{}} + values := map[string]map[string][]byte{cf: map[string][]byte{}} values[cf][COLUMN_NAME] = value putRequest, err := hrpc.NewPut(ctx, store.table, key, values) if err != nil { @@ -53,7 +53,7 @@ func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (valu } func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { - values := map[string]map[string][]byte{store.cfKv: map[string][]byte{}} + values := map[string]map[string][]byte{cf: map[string][]byte{}} values[cf][COLUMN_NAME] = nil deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values) if err != nil { From b5e2be635adb51808282358a1eab0b61080982d6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 23:49:22 -0800 Subject: [PATCH 06/11] adjust for directory listing --- weed/filer/hbase/hbase_store.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 8a9ea7fc1..cc690c48a 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -112,7 +112,7 @@ func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (e func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} - expectedPrefix := []byte(path+"/") + expectedPrefix := []byte(path + "/") scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { return err @@ -133,6 +133,11 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful if !bytes.HasPrefix(cell.Row, expectedPrefix) { break } + fullpath := util.FullPath(cell.Row) + dir, _ := fullpath.DirAndName() + if dir != string(dirPath) { + continue + } err = store.doDelete(ctx, store.cfMetaDir, cell.Row) if err != nil { @@ -149,7 +154,7 @@ func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util. func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} - expectedPrefix := []byte(dirPath.Child(prefix)) + expectedPrefix := []byte(string(dirPath) + "/" + prefix) scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { return nil, err @@ -176,12 +181,13 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa } fullpath := util.FullPath(cell.Row) - + dir, fileName := fullpath.DirAndName() + if dir != string(dirPath) { + continue + } value := cell.Value - _, fileName := fullpath.DirAndName() - if fileName == startFileName && !includeStartFile { continue } From 53bc1ea25bf707afd828141255f3fb7841c23686 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 23:53:46 -0800 Subject: [PATCH 07/11] fix compilation --- weed/filer/hbase/hbase_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index cc690c48a..68170ec18 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -135,7 +135,7 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful } fullpath := util.FullPath(cell.Row) dir, _ := fullpath.DirAndName() - if dir != string(dirPath) { + if dir != string(dir) { continue } From 2fea8cfa0fd56d7f2f459681b727d9625b62bc16 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 23 Dec 2020 23:54:15 -0800 Subject: [PATCH 08/11] fix compilation --- weed/filer/hbase/hbase_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 68170ec18..65cdfd847 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -135,7 +135,7 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful } fullpath := util.FullPath(cell.Row) dir, _ := fullpath.DirAndName() - if dir != string(dir) { + if dir != string(path) { continue } From 9bf6c10505bef2e9f679b203ab2380672d5afb4e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Dec 2020 00:18:59 -0800 Subject: [PATCH 09/11] fix prefix --- weed/filer/hbase/hbase_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 65cdfd847..03aecfd2b 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -112,7 +112,7 @@ func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (e func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} - expectedPrefix := []byte(path + "/") + expectedPrefix := []byte(path.Child("")) scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { return err @@ -154,7 +154,7 @@ func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util. func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) { family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} - expectedPrefix := []byte(string(dirPath) + "/" + prefix) + expectedPrefix := []byte(dirPath.Child(prefix)) scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) if err != nil { return nil, err From 75c6edba9e9f09ff102a059c46a55c16a8e9063e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Dec 2020 00:19:16 -0800 Subject: [PATCH 10/11] filer: hbase add enabled flag --- weed/command/scaffold.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index f3a094d5a..6cfd46427 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -142,6 +142,7 @@ password="" superLargeDirectories = [] [hbase] +enabled = false zkquorum = "" table = "seaweedfs" From 0e016bc7bd3d08d4c843388d6af55802ebec0dad Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 24 Dec 2020 12:10:35 -0800 Subject: [PATCH 11/11] hbase add ttl --- weed/filer/hbase/hbase_store.go | 2 +- weed/filer/hbase/hbase_store_kv.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go index 03aecfd2b..6b0ad58b9 100644 --- a/weed/filer/hbase/hbase_store.go +++ b/weed/filer/hbase/hbase_store.go @@ -79,7 +79,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er value = util.MaybeGzipData(value) } - return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value) + return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec) } func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go index 8f5794ac1..26bf763e2 100644 --- a/weed/filer/hbase/hbase_store_kv.go +++ b/weed/filer/hbase/hbase_store_kv.go @@ -4,13 +4,14 @@ import ( "context" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/tsuna/gohbase/hrpc" + "time" ) const( COLUMN_NAME = "a" ) func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return store.doPut(ctx, store.cfKv, key, value) + return store.doPut(ctx, store.cfKv, key, value, 0) } func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { @@ -21,10 +22,17 @@ func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) { return store.doDelete(ctx, store.cfKv, key) } -func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte) (err error) { +func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) { + if ttlSecond > 0 { + return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second)) + } + return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal)) +} + +func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) { values := map[string]map[string][]byte{cf: map[string][]byte{}} values[cf][COLUMN_NAME] = value - putRequest, err := hrpc.NewPut(ctx, store.table, key, values) + putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...) if err != nil { return err } @@ -55,7 +63,7 @@ func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (valu func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { values := map[string]map[string][]byte{cf: map[string][]byte{}} values[cf][COLUMN_NAME] = nil - deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values) + deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal)) if err != nil { return err }