This commit is contained in:
Chris Lu 2020-05-10 03:50:30 -07:00
parent 6bf3eb69cb
commit 39e72fb23c
13 changed files with 15 additions and 14 deletions

View file

@ -232,7 +232,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Reader: &FakeReader{id: uint64(id), size: fileSize, random: random}, Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize, FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content MimeType: "image/bench", // prevent gzip benchmark content
Fsync: *b.fsync, Fsync: *b.fsync,
} }
ar := &operation.VolumeAssignRequest{ ar := &operation.VolumeAssignRequest{
Count: 1, Count: 1,

View file

@ -428,7 +428,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
return return
} }
chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i * chunkSize) chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i) }(i)

View file

@ -59,7 +59,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{ return &ChunkReadAt{
chunkViews: chunkViews, chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient), lookupFileId: LookupFn(filerClient),
bufferOffset: -1, bufferOffset: -1,
chunkCache: chunkCache, chunkCache: chunkCache,

View file

@ -103,7 +103,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
return &ChunkStreamReader{ return &ChunkStreamReader{
chunkViews: chunkViews, chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient), lookupFileId: LookupFn(filerClient),
} }
} }

View file

@ -37,8 +37,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// IsTransient: true, // IsTransient: true,
} }
if err = stream.Send(&messaging_pb.BrokerMessage{ if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
}); err != nil {
return err return err
} }

View file

@ -1,8 +1,8 @@
package broker package broker
import ( import (
"github.com/cespare/xxhash"
"github.com/buraksezer/consistent" "github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
) )
type Member string type Member string

View file

@ -18,7 +18,7 @@ func TestPickMember(t *testing.T) {
total := 1000 total := 1000
distribution := make(map[string]int) distribution := make(map[string]int)
for i:=0;i<total;i++{ for i := 0; i < total; i++ {
tp := fmt.Sprintf("tp:%2d", i) tp := fmt.Sprintf("tp:%2d", i)
m := PickMember(servers, []byte(tp)) m := PickMember(servers, []byte(tp))
// println(tp, "=>", m) // println(tp, "=>", m)

View file

@ -16,9 +16,11 @@ type TopicPartition struct {
Topic string Topic string
Partition int32 Partition int32
} }
const ( const (
TopicPartitionFmt = "%s/%s_%02d" TopicPartitionFmt = "%s/%s_%02d"
) )
func (tp *TopicPartition) String() string { func (tp *TopicPartition) String() string {
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition) return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
} }

View file

@ -28,7 +28,6 @@ func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
} }
} }
func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
for _, broker := range mc.bootstrapBrokers { for _, broker := range mc.bootstrapBrokers {

View file

@ -16,6 +16,7 @@ type Publisher struct {
messageCount uint64 messageCount uint64
publisherId string publisherId string
} }
/* /*
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
// read topic configuration // read topic configuration

View file

@ -5,8 +5,8 @@ import (
"io" "io"
"time" "time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"google.golang.org/grpc"
) )
type Subscriber struct { type Subscriber struct {