seaweedfs/weed/server/filer_server.go
Sebastian Kurfuerst 10404c4275 FEATURE: add JWT to HTTP endpoints of Filer and use them in S3 Client
- one JWT for reading and one for writing, analogous to how the JWT
  between Master and Volume Server works
- I did not implement IP `whiteList` parameter on the filer

Additionally, because http_util.DownloadFile now sets the JWT,
the `download` command should now work when `jwt.signing.read` is
configured. By looking at the code, I think this case did not work
before.

## Docs to be adjusted after a release

Page `Amazon-S3-API`:

```
# Authentication with Filer

You can use mTLS for the gRPC connection between S3-API-Proxy and the filer, as
explained in [Security-Configuration](Security-Configuration) -
controlled by the `grpc.*` configuration in `security.toml`.

Starting with version XX, it is also possible to authenticate the HTTP
operations between the S3-API-Proxy and the Filer (especially
uploading new files). This is configured by setting
`filer_jwt.signing.key` and `filer_jwt.signing.read.key` in
`security.toml`.

With both configurations (gRPC and JWT), it is possible to have Filer
and S3 communicate in fully authenticated fashion; so Filer will reject
any unauthenticated communication.
```

Page `Security Overview`:

```
The following items are not covered, yet:

- master server http REST services

Starting with version XX, the Filer HTTP REST services can be secured
with a JWT, by setting `filer_jwt.signing.key` and
`filer_jwt.signing.read.key` in `security.toml`.

...

Before version XX: "weed filer -disableHttp", disable http operations, only gRPC operations are allowed. This works with "weed mount" by FUSE. It does **not work** with the [S3 Gateway](Amazon S3 API), as this does HTTP calls to the Filer.
Starting with version XX: secured by JWT, by setting `filer_jwt.signing.key` and `filer_jwt.signing.read.key` in `security.toml`. **This now works with the [S3 Gateway](Amazon S3 API).**

...

# Securing Filer HTTP with JWT

To enable JWT-based access control for the Filer,

1. generate `security.toml` file by `weed scaffold -config=security`
2. set `filer_jwt.signing.key` to a secret string - and optionally filer_jwt.signing.read.key` as well to a secret string
3. copy the same `security.toml` file to the filers and all S3 proxies.

If `filer_jwt.signing.key` is configured: When sending upload/update/delete HTTP operations to a filer server, the request header `Authorization` should be the JWT string (`Authorization: Bearer [JwtToken]`). The operation is authorized after the filer validates the JWT with `filer_jwt.signing.key`.

If `filer_jwt.signing.read.key` is configured: When sending GET or HEAD requests to a filer server, the request header `Authorization` should be the JWT string (`Authorization: Bearer [JwtToken]`). The operation is authorized after the filer validates the JWT with `filer_jwt.signing.read.key`.

The S3 API Gateway reads the above JWT keys and sends authenticated
HTTP requests to the filer.
```

Page `Security Configuration`:

```
(update scaffold file)

...

[filer_jwt.signing]
key = "blahblahblahblah"

[filer_jwt.signing.read]
key = "blahblahblahblah"
```

Resolves: #158
2021-12-30 14:45:27 +01:00

199 lines
6.4 KiB
Go

package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"net/http"
"os"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
)
type FilerOption struct {
Masters []pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
MaxMB int
DirListingLimit int
DataCenter string
Rack string
DataNode string
DefaultLevelDbDir string
DisableHttp bool
Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
ConcurrentUploadLimit int64
}
type FilerServer struct {
filer_pb.UnimplementedSeaweedFilerServer
option *FilerOption
secret security.SigningKey
filer *filer.Filer
filerGuard *security.Guard
grpcDialOption grpc.DialOption
// metrics read from the master
metricsAddress string
metricsIntervalSec int
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
brokers map[string]map[string]bool
brokersLock sync.Mutex
inFlightDataSize int64
inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
v := util.GetViper()
signingKey := v.GetString("jwt.filer_signing.key")
v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
readSigningKey := v.GetString("jwt.filer_signing.read.key")
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
brokers: make(map[string]map[string]bool),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
// we do not support IP whitelist right now
fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected()
if !util.LoadConfiguration("filer", false) {
v.Set("leveldb2.enabled", true)
v.Set("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
} else {
glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
v.SetDefault("filer.options.buckets_folder", "/buckets")
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/", fs.filerHandler)
}
if defaultMux != readonlyMux {
handleStaticResources(readonlyMux)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
fs.filer.AggregateFromPeers(option.Host)
fs.filer.LoadBuckets()
fs.filer.LoadFilerConf()
fs.filer.LoadRemoteStorageConfAndMapping()
grace.OnInterrupt(func() {
fs.filer.Shutdown()
})
return fs, nil
}
func (fs *FilerServer) checkWithMaster() {
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
}
fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
if fs.option.DefaultReplication == "" {
fs.option.DefaultReplication = resp.DefaultReplication
}
return nil
})
if readErr == nil {
isConnected = true
} else {
time.Sleep(7 * time.Second)
}
}
}
}