From 8425705643695df7e8f910c41b2f40ad71ecdedc Mon Sep 17 00:00:00 2001 From: nivekuil Date: Fri, 2 Jul 2021 13:02:26 -0700 Subject: [PATCH] Cassandra: Use TokenAwareHostPolicy by default with fallback See https://pkg.go.dev/github.com/gocql/gocql#hdr-Data_center_awareness_and_query_routing --- weed/command/scaffold.go | 2 ++ weed/filer/cassandra/cassandra_store.go | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 2d6729bd3..8e940f608 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -217,6 +217,8 @@ username="" password="" # This changes the data layout. Only add new directories. Removing/Updating will cause data loss. superLargeDirectories = [] +# Name of the datacenter local to this filer, used as host selection fallback. +localDC="" [hbase] enabled = false diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index 0398f5117..f4856657e 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -32,6 +32,7 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetStringSlice(prefix+"superLargeDirectories"), + configuration.GetString(prefix+"localDC"), ) } @@ -40,13 +41,19 @@ func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, return } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) { +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} } store.cluster.Keyspace = keyspace + fallback := gocql.RoundRobinHostPolicy() + if localDC != "" { + fallback = gocql.DCAwareRoundRobinPolicy(localDC) + } + store.cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback) store.cluster.Consistency = gocql.LocalQuorum + store.session, err = store.cluster.CreateSession() if err != nil { glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)