filer: redis store reduce from 2 redis operations to 1 for updates.

This commit is contained in:
chrislu 2022-10-12 23:50:09 -07:00
parent f95c25e113
commit f5d4952d73
3 changed files with 56 additions and 35 deletions

View file

@ -35,6 +35,22 @@ func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error
func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
if err = store.doInsertEntry(ctx, entry); err != nil {
return err
}
dir, name := entry.FullPath.DirAndName()
if name != "" {
_, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil
}
func (store *UniversalRedisStore) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks() value, err := entry.EncodeAttributesAndChunks()
if err != nil { if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
@ -49,21 +65,12 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
if err != nil { if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err) return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
} }
dir, name := entry.FullPath.DirAndName()
if name != "" {
_, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
return nil return nil
} }
func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry) return store.doInsertEntry(ctx, entry)
} }
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {

View file

@ -47,17 +47,8 @@ func (store *UniversalRedis2Store) RollbackTransaction(ctx context.Context) erro
func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks() if err = store.doInsertEntry(ctx, entry); err != nil {
if err != nil { return err
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
} }
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
@ -74,9 +65,25 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return nil return nil
} }
func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry) return store.doInsertEntry(ctx, entry)
} }
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {

View file

@ -3,11 +3,11 @@ package redis3
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/go-redsync/redsync/v4"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
redsync "github.com/go-redsync/redsync/v4"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -35,17 +35,8 @@ func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) erro
func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks() if err = store.doInsertEntry(ctx, entry); err != nil {
if err != nil { return err
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
} }
dir, name := entry.FullPath.DirAndName() dir, name := entry.FullPath.DirAndName()
@ -59,9 +50,25 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer
return nil return nil
} }
func (store *UniversalRedis3Store) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
return nil
}
func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry) return store.doInsertEntry(ctx, entry)
} }
func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {