2018-05-28 06:53:10 +00:00
package weed_server
import (
2019-03-15 22:55:34 +00:00
"context"
2020-04-08 15:12:00 +00:00
"crypto/md5"
2018-05-28 06:53:10 +00:00
"io"
2020-04-08 15:12:00 +00:00
"io/ioutil"
2018-05-28 06:53:10 +00:00
"net/http"
"path"
"strconv"
2018-07-22 00:47:59 +00:00
"strings"
2018-05-28 06:59:49 +00:00
"time"
2018-05-28 06:53:10 +00:00
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
2019-02-15 08:09:19 +00:00
"github.com/chrislusf/seaweedfs/weed/security"
2019-06-23 05:53:52 +00:00
"github.com/chrislusf/seaweedfs/weed/stats"
2020-03-23 07:01:34 +00:00
"github.com/chrislusf/seaweedfs/weed/util"
2018-05-28 06:53:10 +00:00
)
2019-03-18 07:35:15 +00:00
func ( fs * FilerServer ) autoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
2020-04-12 06:37:10 +00:00
replication string , collection string , dataCenter string , ttlSec int32 , ttlString string , fsync bool ) bool {
2018-05-28 06:53:10 +00:00
if r . Method != "POST" {
glog . V ( 4 ) . Infoln ( "AutoChunking not supported for method" , r . Method )
return false
}
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r . URL . Query ( )
parsedMaxMB , _ := strconv . ParseInt ( query . Get ( "maxMB" ) , 10 , 32 )
maxMB := int32 ( parsedMaxMB )
2018-07-07 09:18:47 +00:00
if maxMB <= 0 && fs . option . MaxMB > 0 {
maxMB = int32 ( fs . option . MaxMB )
2018-05-28 06:53:10 +00:00
}
if maxMB <= 0 {
glog . V ( 4 ) . Infoln ( "AutoChunking not enabled" )
return false
}
glog . V ( 4 ) . Infoln ( "AutoChunking level set to" , maxMB , "(MB)" )
chunkSize := 1024 * 1024 * maxMB
contentLength := int64 ( 0 )
if contentLengthHeader := r . Header [ "Content-Length" ] ; len ( contentLengthHeader ) == 1 {
contentLength , _ = strconv . ParseInt ( contentLengthHeader [ 0 ] , 10 , 64 )
if contentLength <= int64 ( chunkSize ) {
glog . V ( 4 ) . Infoln ( "Content-Length of" , contentLength , "is less than the chunk size of" , chunkSize , "so autoChunking will be skipped." )
return false
}
}
if contentLength <= 0 {
glog . V ( 4 ) . Infoln ( "Content-Length value is missing or unexpected so autoChunking will be skipped." )
return false
}
2020-04-12 06:37:10 +00:00
reply , err := fs . doAutoChunk ( ctx , w , r , contentLength , chunkSize , replication , collection , dataCenter , ttlSec , ttlString , fsync )
2018-05-28 06:53:10 +00:00
if err != nil {
writeJsonError ( w , r , http . StatusInternalServerError , err )
} else if reply != nil {
writeJsonQuiet ( w , r , http . StatusCreated , reply )
}
return true
}
2019-03-18 07:35:15 +00:00
func ( fs * FilerServer ) doAutoChunk ( ctx context . Context , w http . ResponseWriter , r * http . Request ,
2020-04-12 06:37:10 +00:00
contentLength int64 , chunkSize int32 , replication string , collection string , dataCenter string , ttlSec int32 , ttlString string , fsync bool ) ( filerResult * FilerPostResult , replyerr error ) {
2018-05-28 06:53:10 +00:00
2019-06-23 05:53:52 +00:00
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunk" ) . Inc ( )
start := time . Now ( )
2019-06-23 08:57:51 +00:00
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunk" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
2019-06-23 05:53:52 +00:00
2018-05-28 06:53:10 +00:00
multipartReader , multipartReaderErr := r . MultipartReader ( )
if multipartReaderErr != nil {
return nil , multipartReaderErr
}
part1 , part1Err := multipartReader . NextPart ( )
if part1Err != nil {
return nil , part1Err
}
fileName := part1 . FileName ( )
if fileName != "" {
fileName = path . Base ( fileName )
}
2020-03-09 04:39:33 +00:00
contentType := part1 . Header . Get ( "Content-Type" )
2018-05-28 06:53:10 +00:00
var fileChunks [ ] * filer_pb . FileChunk
2020-04-08 15:12:00 +00:00
md5Hash := md5 . New ( )
var partReader = ioutil . NopCloser ( io . TeeReader ( part1 , md5Hash ) )
2018-05-28 06:53:10 +00:00
chunkOffset := int64 ( 0 )
2020-02-04 01:04:06 +00:00
for chunkOffset < contentLength {
2020-04-08 15:12:00 +00:00
limitedReader := io . LimitReader ( partReader , int64 ( chunkSize ) )
2018-05-28 06:53:10 +00:00
2020-02-04 01:04:06 +00:00
// assign one file id for one chunk
2020-07-20 00:59:43 +00:00
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( replication , collection , dataCenter , ttlString , fsync )
2020-02-04 01:04:06 +00:00
if assignErr != nil {
return nil , assignErr
2018-05-28 06:53:10 +00:00
}
2020-02-04 01:04:06 +00:00
// upload the chunk to the volume server
2020-03-09 04:39:33 +00:00
uploadResult , uploadErr := fs . doUpload ( urlLocation , w , r , limitedReader , fileName , contentType , nil , auth )
2020-02-04 01:04:06 +00:00
if uploadErr != nil {
return nil , uploadErr
}
2018-05-28 06:53:10 +00:00
2020-02-04 01:04:06 +00:00
// if last chunk exhausted the reader exactly at the border
2020-03-07 14:06:58 +00:00
if uploadResult . Size == 0 {
2018-05-28 06:53:10 +00:00
break
}
2020-02-04 01:04:06 +00:00
// Save to chunk manifest structure
2020-05-01 00:20:44 +00:00
fileChunks = append ( fileChunks , uploadResult . ToPbFileChunk ( fileId , chunkOffset ) )
2020-02-04 01:04:06 +00:00
2020-03-07 14:06:58 +00:00
glog . V ( 4 ) . Infof ( "uploaded %s chunk %d to %s [%d,%d) of %d" , fileName , len ( fileChunks ) , fileId , chunkOffset , chunkOffset + int64 ( uploadResult . Size ) , contentLength )
2020-02-04 01:04:06 +00:00
2020-03-03 04:27:14 +00:00
// reset variables for the next chunk
2020-03-07 14:06:58 +00:00
chunkOffset = chunkOffset + int64 ( uploadResult . Size )
2020-03-03 04:27:14 +00:00
2020-02-04 01:04:06 +00:00
// if last chunk was not at full chunk size, but already exhausted the reader
2020-03-07 14:06:58 +00:00
if int64 ( uploadResult . Size ) < int64 ( chunkSize ) {
2020-02-04 01:04:06 +00:00
break
2018-05-28 06:53:10 +00:00
}
}
2020-07-20 00:59:43 +00:00
fileChunks , replyerr = filer2 . MaybeManifestize ( fs . saveAsChunk ( replication , collection , dataCenter , ttlString , fsync ) , fileChunks )
if replyerr != nil {
glog . V ( 0 ) . Infof ( "manifestize %s: %v" , r . RequestURI , replyerr )
return
}
2018-05-28 06:53:10 +00:00
path := r . URL . Path
2018-07-22 00:47:59 +00:00
if strings . HasSuffix ( path , "/" ) {
if fileName != "" {
path += fileName
2018-05-28 06:53:10 +00:00
}
}
glog . V ( 4 ) . Infoln ( "saving" , path )
entry := & filer2 . Entry {
2020-03-23 07:01:34 +00:00
FullPath : util . FullPath ( path ) ,
2018-05-28 06:53:10 +00:00
Attr : filer2 . Attr {
2018-06-10 23:57:32 +00:00
Mtime : time . Now ( ) ,
Crtime : time . Now ( ) ,
Mode : 0660 ,
2018-07-22 00:47:59 +00:00
Uid : OS_UID ,
Gid : OS_GID ,
2018-06-10 23:57:32 +00:00
Replication : replication ,
Collection : collection ,
2020-03-09 07:16:10 +00:00
TtlSec : ttlSec ,
2020-03-09 04:39:33 +00:00
Mime : contentType ,
2020-04-08 15:12:00 +00:00
Md5 : md5Hash . Sum ( nil ) ,
2018-05-28 06:53:10 +00:00
} ,
Chunks : fileChunks ,
}
2020-02-04 01:04:06 +00:00
filerResult = & FilerPostResult {
Name : fileName ,
Size : chunkOffset ,
}
2020-07-01 05:53:53 +00:00
if dbErr := fs . filer . CreateEntry ( ctx , entry , false , false ) ; dbErr != nil {
2019-12-13 08:23:05 +00:00
fs . filer . DeleteChunks ( entry . Chunks )
2019-06-23 05:53:52 +00:00
replyerr = dbErr
filerResult . Error = dbErr . Error ( )
glog . V ( 0 ) . Infof ( "failing to write %s to filer server : %v" , path , dbErr )
2018-05-28 06:53:10 +00:00
return
}
return
}
2020-03-08 22:42:44 +00:00
func ( fs * FilerServer ) doUpload ( urlLocation string , w http . ResponseWriter , r * http . Request , limitedReader io . Reader , fileName string , contentType string , pairMap map [ string ] string , auth security . EncodedJwt ) ( * operation . UploadResult , error ) {
2018-05-28 06:53:10 +00:00
2019-06-23 05:53:52 +00:00
stats . FilerRequestCounter . WithLabelValues ( "postAutoChunkUpload" ) . Inc ( )
start := time . Now ( )
2019-06-23 08:57:51 +00:00
defer func ( ) {
stats . FilerRequestHistogram . WithLabelValues ( "postAutoChunkUpload" ) . Observe ( time . Since ( start ) . Seconds ( ) )
} ( )
2019-06-23 05:53:52 +00:00
2020-03-28 20:41:58 +00:00
uploadResult , err , _ := operation . Upload ( urlLocation , fileName , fs . option . Cipher , limitedReader , false , contentType , pairMap , auth )
return uploadResult , err
2018-05-28 06:53:10 +00:00
}
2020-07-20 00:59:43 +00:00
func ( fs * FilerServer ) saveAsChunk ( replication string , collection string , dataCenter string , ttlString string , fsync bool ) filer2 . SaveDataAsChunkFunctionType {
return func ( reader io . Reader , name string , offset int64 ) ( * filer_pb . FileChunk , string , string , error ) {
// assign one file id for one chunk
fileId , urlLocation , auth , assignErr := fs . assignNewFileInfo ( replication , collection , dataCenter , ttlString , fsync )
if assignErr != nil {
return nil , "" , "" , assignErr
}
// upload the chunk to the volume server
uploadResult , uploadErr , _ := operation . Upload ( urlLocation , name , fs . option . Cipher , reader , false , "" , nil , auth )
if uploadErr != nil {
return nil , "" , "" , uploadErr
}
return uploadResult . ToPbFileChunk ( fileId , offset ) , collection , replication , nil
}
}