diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index 94a413ac0..ebf44ea6f 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -24,7 +24,7 @@ import ( "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" - _ "gocloud.dev/pubsub/azuresb" + // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9c76e6918..eddba9ff8 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -9,7 +9,7 @@ import ( "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" - _ "gocloud.dev/pubsub/azuresb" + // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1a17327a0..e0d1fd174 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -52,8 +52,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } + t.Sequence.SetMax(heartbeat.MaxFileKey) + if dn == nil { - t.Sequence.SetMax(heartbeat.MaxFileKey) if heartbeat.Ip == "" { if pr, ok := peer.FromContext(stream.Context()); ok { if pr.Addr != net.Addr(nil) { diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index d7fbb6edf..fdb7937d2 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -58,7 +58,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B break } n.LastModified = now - if size, err := vs.store.Delete(volumeId, n); err != nil { + if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusInternalServerError, diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 698bad5b8..34c55a599 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -110,7 +110,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, _, err := vs.store.Write(v.Id, n) + _, _, err := vs.store.WriteVolumeNeedle(v.Id, n) return err }) diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index b280157b8..47e6d3d1e 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -81,7 +81,7 @@ func (shard *EcVolumeShard) Close() { func (shard *EcVolumeShard) Destroy() { os.Remove(shard.FileName() + ToExt(int(shard.ShardId))) - stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec() } func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { diff --git a/weed/storage/store.go b/weed/storage/store.go index d2dd76d52..e99d77774 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -211,7 +211,7 @@ func (s *Store) Close() { } } -func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("volume %d is read only", i) @@ -230,7 +230,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUncha return } -func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) { +func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 93ce1eab9..0327e5a1f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -96,13 +96,29 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn n.Ttl = v.Ttl } + // check whether existing needle cookie matches + nv, ok := v.nm.Get(n.Id) + if ok { + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset()) + if existingNeedleReadErr != nil { + err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) + return + } + if existingNeedle.Cookie != n.Cookie { + glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) + err = fmt.Errorf("mismatching cookie %x", n.Cookie) + return + } + } + + // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { return } v.lastAppendAtNs = n.AppendAtNs - nv, ok := v.nm.Get(n.Id) + // add to needle map if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) @@ -224,6 +240,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } if err != nil { glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) } offset += NeedleHeaderSize + rest glog.V(4).Infof("==> new entry offset %d", offset) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index cc7c7d6e6..3bb306649 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -53,7 +53,7 @@ func (v *Volume) CommitCompact() error { glog.V(0).Infof("fail to close volume %d", v.Id) } v.dataFile = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index d4076d548..d21c4d210 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -25,7 +25,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, //check JWT jwt := security.GetJwt(r) - size, isUnchanged, err = s.Write(volumeId, n) + size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) if err != nil { err = fmt.Errorf("failed to write to local disk: %v", err) return @@ -89,7 +89,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, //check JWT jwt := security.GetJwt(r) - ret, err := store.Delete(volumeId, n) + ret, err := store.DeleteVolumeNeedle(volumeId, n) if err != nil { glog.V(0).Infoln("delete error:", err) return ret, err