mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2024-01-19 02:48:24 +00:00
ensure monotonic n.AppendAtNs in each place (#3880)
https://github.com/seaweedfs/seaweedfs/issues/3852 Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
This commit is contained in:
parent
7b90696601
commit
6253058d9d
|
@ -142,6 +142,14 @@ func (n *Needle) ParsePath(fid string) (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetAppendAtNs(volumeLastAppendAtNs uint64) uint64 {
|
||||||
|
return max(uint64(time.Now().UnixNano()), volumeLastAppendAtNs+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Needle) UpdateAppendAtNs(volumeLastAppendAtNs uint64) {
|
||||||
|
n.AppendAtNs = max(uint64(time.Now().UnixNano()), volumeLastAppendAtNs+1)
|
||||||
|
}
|
||||||
|
|
||||||
func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) {
|
func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) {
|
||||||
if len(key_hash_string) <= CookieSize*2 {
|
if len(key_hash_string) <= CookieSize*2 {
|
||||||
return NeedleIdEmpty, 0, fmt.Errorf("KeyHash is too short.")
|
return NeedleIdEmpty, 0, fmt.Errorf("KeyHash is too short.")
|
||||||
|
@ -164,3 +172,10 @@ func ParseNeedleIdCookie(key_hash_string string) (NeedleId, Cookie, error) {
|
||||||
func (n *Needle) LastModifiedString() string {
|
func (n *Needle) LastModifiedString() string {
|
||||||
return time.Unix(int64(n.LastModified), 0).Format("2006-01-02T15:04:05")
|
return time.Unix(int64(n.LastModified), 0).Format("2006-01-02T15:04:05")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func max(x, y uint64) uint64 {
|
||||||
|
if x <= y {
|
||||||
|
return y
|
||||||
|
}
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
|
@ -4,13 +4,11 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrorNotFound = errors.New("not found")
|
var ErrorNotFound = errors.New("not found")
|
||||||
|
@ -157,7 +155,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle, checkCookie bool) (offset uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// append to dat file
|
// append to dat file
|
||||||
n.AppendAtNs = max(uint64(time.Now().UnixNano()), v.lastAppendAtNs+1)
|
n.UpdateAppendAtNs(v.lastAppendAtNs)
|
||||||
offset, size, _, err = n.Append(v.DataBackend, v.Version())
|
offset, size, _, err = n.Append(v.DataBackend, v.Version())
|
||||||
v.checkReadWriteError(err)
|
v.checkReadWriteError(err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -218,6 +216,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
|
||||||
size := nv.Size
|
size := nv.Size
|
||||||
if !v.hasRemoteFile {
|
if !v.hasRemoteFile {
|
||||||
n.Data = nil
|
n.Data = nil
|
||||||
|
n.UpdateAppendAtNs(v.lastAppendAtNs)
|
||||||
n.AppendAtNs = uint64(time.Now().UnixNano())
|
n.AppendAtNs = uint64(time.Now().UnixNano())
|
||||||
offset, _, _, err = n.Append(v.DataBackend, v.Version())
|
offset, _, _, err = n.Append(v.DataBackend, v.Version())
|
||||||
v.checkReadWriteError(err)
|
v.checkReadWriteError(err)
|
||||||
|
@ -318,7 +317,7 @@ func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size
|
||||||
return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
|
return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
appendAtNs := uint64(time.Now().UnixNano())
|
appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs)
|
||||||
offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
|
offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
|
||||||
|
|
||||||
v.checkReadWriteError(err)
|
v.checkReadWriteError(err)
|
||||||
|
@ -334,10 +333,3 @@ func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func max(x, y uint64) uint64 {
|
|
||||||
if x <= y {
|
|
||||||
return y
|
|
||||||
}
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue