diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c5f58cd72..521418e2a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: uses: actions/checkout@v2 - name: Go Release Binaries - uses: wangyoucao577/go-release-action@feature/customize-target-release + uses: wangyoucao577/go-release-action@v1.10 with: github_token: ${{ secrets.GITHUB_TOKEN }} goos: linux # default is @@ -28,7 +28,7 @@ jobs: project_path: weed binary_name: weed-large-disk - name: Go Release Binaries - uses: wangyoucao577/go-release-action@feature/customize-target-release + uses: wangyoucao577/go-release-action@v1.10 with: github_token: ${{ secrets.GITHUB_TOKEN }} goos: linux # default is diff --git a/go.mod b/go.mod index 08e784510..51c24bcf0 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/aws/aws-sdk-go v1.33.5 github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 github.com/cespare/xxhash v1.1.0 - github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 + github.com/chrislusf/raft v1.0.3 github.com/coreos/go-semver v0.3.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 @@ -24,6 +24,7 @@ require ( github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/fclairamb/ftpserverlib v0.8.0 github.com/frankban/quicktest v1.7.2 // indirect + github.com/go-errors/errors v1.1.1 // indirect github.com/go-redis/redis v6.15.7+incompatible github.com/go-sql-driver/mysql v1.5.0 github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6 @@ -45,6 +46,7 @@ require ( github.com/klauspost/reedsolomon v1.9.2 github.com/kurin/blazer v0.5.3 github.com/lib/pq v1.2.0 + github.com/lunixbochs/vtclean v1.0.0 // indirect github.com/magiconair/properties v1.8.1 // indirect github.com/mattn/go-ieproxy v0.0.0-20190805055040-f9202b1cfdeb // indirect github.com/mattn/go-runewidth v0.0.4 // indirect @@ -67,6 +69,9 @@ require ( github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 github.com/valyala/bytebufferpool v1.0.0 + github.com/viant/assertly v0.5.4 // indirect + github.com/viant/ptrie v0.3.0 + github.com/viant/toolbox v0.33.2 // indirect github.com/willf/bitset v1.1.10 // indirect github.com/willf/bloom v2.0.3+incompatible github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect diff --git a/go.sum b/go.sum index 11bc7b7b4..51d96d997 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 h1:vN1GvfLgDg8kIPCdhuVKAjlYpxG1B86jiKejB6MC/Q0= -github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= +github.com/chrislusf/raft v1.0.3 h1:11YrnzJtVa5z7m9lhY2p8VcPHoUlC1UswyoAo+U1m1k= +github.com/chrislusf/raft v1.0.3/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -159,6 +159,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-errors/errors v1.1.1 h1:ljK/pL5ltg3qoN+OtN6yCv9HWSfMwxSx90GJCZQxYNg= +github.com/go-errors/errors v1.1.1/go.mod h1:psDX2osz5VnTOnFWbDeWwS7yejl+uV3FEWEp4lssFEs= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -386,6 +388,8 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= +github.com/lunixbochs/vtclean v1.0.0 h1:xu2sLAri4lGiovBDQKxl5mrXyESr3gUr5m5SM5+LVb8= +github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -621,6 +625,12 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/viant/assertly v0.5.4 h1:5Hh4U3pLZa6uhCFAGpYOxck/8l9TZczEzoHNfJAhHEQ= +github.com/viant/assertly v0.5.4/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= +github.com/viant/ptrie v0.3.0 h1:SDaRd7Gqr1+ItCNz0GpTxRdK21nOfqjV6YtBm9jGlMY= +github.com/viant/ptrie v0.3.0/go.mod h1:VguMnbGfz95Zw+V5VarYSqtqslDxJbOv++xLzxkMhec= +github.com/viant/toolbox v0.33.2 h1:Av844IIeGz81gT672qZemyptGfbrcxqGymA5RFnIPjE= +github.com/viant/toolbox v0.33.2/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA= diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 99bb96dce..ee08dbec5 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 2.10 \ No newline at end of file +version: 2.11 \ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 414330dd2..1bc619cb6 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "2.10" + imageTag: "2.11" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 09f891a8a..5a4bbaead 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.3 + 1.5.4 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 28a62d66f..e24bcca9f 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.3 + 1.5.4 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 78e4986cf..724c3c3b9 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.5.3 + 1.5.4 org.sonatype.oss diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 035b2c852..7338d5bee 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -275,9 +275,9 @@ public class FilerClient { try { FilerProto.CreateEntryResponse createEntryResponse = filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + .setDirectory(parent) + .setEntry(entry) + .build()); if (Strings.isNullOrEmpty(createEntryResponse.getError())) { return true; } @@ -333,4 +333,13 @@ public class FilerClient { return true; } + public Iterator watch(String prefix, String clientName, long sinceNs) { + return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + .setPathPrefix(prefix) + .setClientName(clientName) + .setSinceNs(sinceNs) + .build() + ); + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a9ddd51db..2b530d2dd 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -28,20 +28,26 @@ public class SeaweedRead { List chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + Map knownLocations = new HashMap<>(); + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); for (ChunkView chunkView : chunkViews) { String vid = parseVolumeId(chunkView.fileId); - if (volumeIdCache.getLocations(vid)==null){ + FilerProto.Locations locations = volumeIdCache.getLocations(vid); + if (locations == null) { lookupRequest.addVolumeIds(vid); + } else { + knownLocations.put(vid, locations); } } - if (lookupRequest.getVolumeIdsCount()>0){ + if (lookupRequest.getVolumeIdsCount() > 0) { FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient .getBlockingStub().lookupVolume(lookupRequest.build()); Map vid2Locations = lookupResponse.getLocationsMapMap(); - for (Map.Entry entry : vid2Locations.entrySet()) { + for (Map.Entry entry : vid2Locations.entrySet()) { volumeIdCache.setLocations(entry.getKey(), entry.getValue()); + knownLocations.put(entry.getKey(), entry.getValue()); } } @@ -57,7 +63,7 @@ public class SeaweedRead { startOffset += gap; } - FilerProto.Locations locations = volumeIdCache.getLocations(parseVolumeId(chunkView.fileId)); + FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId)); if (locations == null || locations.getLocationsCount() == 0) { LOG.error("failed to locate {}", chunkView.fileId); // log here! diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java index 38daa14ac..fd2649cc2 100644 --- a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java @@ -15,7 +15,7 @@ public class VolumeIdCache { } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) - .expireAfterAccess(1, TimeUnit.HOURS) + .expireAfterAccess(5, TimeUnit.MINUTES) .build(); } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index f75caec4e..4d9398897 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -362,6 +362,7 @@ message FilerConf { SSD = 1; } DiskType disk_type = 5; + bool fsync = 6; } repeated PathConf locations = 2; } diff --git a/other/java/unzip/pom.xml b/other/java/examples/pom.xml similarity index 92% rename from other/java/unzip/pom.xml rename to other/java/examples/pom.xml index 1f86bb688..7cbb56ec3 100644 --- a/other/java/unzip/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ com.github.chrislusf seaweedfs-client - 1.5.3 + 1.5.4 compile com.github.chrislusf seaweedfs-hadoop2-client - 1.5.3 + 1.5.4 compile diff --git a/other/java/unzip/src/main/java/com/example/test/Example.java b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java similarity index 66% rename from other/java/unzip/src/main/java/com/example/test/Example.java rename to other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java index 393ccb5ab..0529a5c73 100644 --- a/other/java/unzip/src/main/java/com/example/test/Example.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java @@ -1,4 +1,4 @@ -package com.example.test; +package com.seaweedfs.examples; import seaweed.hdfs.SeaweedInputStream; import seaweedfs.client.FilerClient; @@ -10,17 +10,20 @@ import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -public class Example { - - public static FilerClient filerClient = new FilerClient("localhost", 18888); - public static FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); +public class UnzipFile { public static void main(String[] args) throws IOException { - // 本地模式,速度很快 + FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); + FilerClient filerClient = new FilerClient(filerGrpcClient); + + long startTime = System.currentTimeMillis(); parseZip("/Users/chris/tmp/test.zip"); - // swfs读取,慢 + long startTime2 = System.currentTimeMillis(); + + long localProcessTime = startTime2 - startTime; + SeaweedInputStream seaweedInputStream = new SeaweedInputStream( filerGrpcClient, new org.apache.hadoop.fs.FileSystem.Statistics(""), @@ -29,6 +32,11 @@ public class Example { ); parseZip(seaweedInputStream); + long swProcessTime = System.currentTimeMillis() - startTime2; + + System.out.println("Local time: " + localProcessTime); + System.out.println("SeaweedFS time: " + swProcessTime); + } public static void parseZip(String filename) throws IOException { diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java new file mode 100644 index 000000000..e489cb3b1 --- /dev/null +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java @@ -0,0 +1,46 @@ +package com.seaweedfs.examples; + +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; + +public class WatchFiles { + + public static void main(String[] args) throws IOException { + + FilerClient filerClient = new FilerClient("localhost", 18888); + + long sinceNs = (System.currentTimeMillis() - 3600 * 1000) * 1000000L; + + Iterator watch = filerClient.watch( + "/buckets", + "exampleClientName", + sinceNs + ); + + System.out.println("Connected to filer, subscribing from " + new Date()); + + while (watch.hasNext()) { + FilerProto.SubscribeMetadataResponse event = watch.next(); + FilerProto.EventNotification notification = event.getEventNotification(); + if (!event.getDirectory().equals(notification.getNewParentPath())) { + // move an entry to a new directory, possibly with a new name + if (notification.hasOldEntry() && notification.hasNewEntry()) { + System.out.println("moved " + event.getDirectory() + "/" + notification.getOldEntry().getName() + " to " + notification.getNewParentPath() + "/" + notification.getNewEntry().getName()); + } else { + System.out.println("this should not happen."); + } + } else if (notification.hasNewEntry() && !notification.hasOldEntry()) { + System.out.println("created entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); + } else if (!notification.hasNewEntry() && notification.hasOldEntry()) { + System.out.println("deleted entry " + event.getDirectory() + "/" + notification.getOldEntry().getName()); + } else if (notification.hasNewEntry() && notification.hasOldEntry()) { + System.out.println("updated entry " + event.getDirectory() + "/" + notification.getNewEntry().getName()); + } + } + + } +} diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 4be9ff180..b598d8402 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ - 1.5.3 + 1.5.4 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index 478ae40ad..de518a0dc 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.5.3 + 1.5.4 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index c09ab0040..262c3ca80 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ - 1.5.3 + 1.5.4 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index d579daa76..1952305e9 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.5.3 + 1.5.4 3.1.1 diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 0c4d610c5..105c8e04f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -41,6 +41,7 @@ type Filer struct { metaLogReplication string MetaAggregator *MetaAggregator Signature int32 + FilerConf *FilerConf } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -49,6 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, + FilerConf: NewFilerConf(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go new file mode 100644 index 000000000..182449d49 --- /dev/null +++ b/weed/filer/filer_conf.go @@ -0,0 +1,139 @@ +package filer + +import ( + "bytes" + "context" + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "github.com/viant/ptrie" +) + +const ( + DirectoryEtc = "/etc" + FilerConfName = "filer.conf" +) + +type FilerConf struct { + rules ptrie.Trie +} + +func NewFilerConf() (fc *FilerConf) { + fc = &FilerConf{ + rules: ptrie.New(), + } + return fc +} + +func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) { + filerConfPath := util.NewFullPath(DirectoryEtc, FilerConfName) + entry, err := filer.FindEntry(context.Background(), filerConfPath) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + glog.Errorf("read filer conf entry %s: %v", filerConfPath, err) + return + } + + return fc.loadFromChunks(filer, entry.Chunks) +} + +func (fc *FilerConf) loadFromChunks(filer *Filer, chunks []*filer_pb.FileChunk) (err error) { + data, err := filer.readEntry(chunks) + if err != nil { + glog.Errorf("read filer conf content: %v", err) + return + } + + return fc.LoadFromBytes(data) +} + +func (fc *FilerConf) LoadFromBytes(data []byte) (err error) { + conf := &filer_pb.FilerConf{} + + if err := jsonpb.Unmarshal(bytes.NewReader(data), conf); err != nil { + + err = proto.UnmarshalText(string(data), conf) + if err != nil { + glog.Errorf("unable to parse filer conf: %v", err) + // this is not recoverable + return nil + } + + return nil + } + + return fc.doLoadConf(conf) +} + +func (fc *FilerConf) doLoadConf(conf *filer_pb.FilerConf) (err error) { + for _, location := range conf.Locations { + err = fc.AddLocationConf(location) + if err != nil { + // this is not recoverable + return nil + } + } + return nil +} + +func (fc *FilerConf) AddLocationConf(locConf *filer_pb.FilerConf_PathConf) (err error) { + err = fc.rules.Put([]byte(locConf.LocationPrefix), locConf) + if err != nil { + glog.Errorf("put location prefix: %v", err) + } + return +} + +func (fc *FilerConf) DeleteLocationConf(locationPrefix string) { + rules := ptrie.New() + fc.rules.Walk(func(key []byte, value interface{}) bool { + if string(key) == locationPrefix { + return true + } + rules.Put(key, value) + return true + }) + fc.rules = rules + return +} + +var ( + EmptyFilerConfPathConf = &filer_pb.FilerConf_PathConf{} +) + +func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) { + fc.rules.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { + pathConf = value.(*filer_pb.FilerConf_PathConf) + return true + }) + if pathConf == nil { + return EmptyFilerConfPathConf + } + return pathConf +} + +func (fc *FilerConf) ToProto() *filer_pb.FilerConf { + m := &filer_pb.FilerConf{} + fc.rules.Walk(func(key []byte, value interface{}) bool { + pathConf := value.(*filer_pb.FilerConf_PathConf) + m.Locations = append(m.Locations, pathConf) + return true + }) + return m +} + +func (fc *FilerConf) ToText(writer io.Writer) error { + + m := jsonpb.Marshaler{ + EmitDefaults: false, + Indent: " ", + } + + return m.Marshal(writer, fc.ToProto()) +} diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go new file mode 100644 index 000000000..1bfe8bcfe --- /dev/null +++ b/weed/filer/filer_conf_test.go @@ -0,0 +1,29 @@ +package filer + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/stretchr/testify/assert" +) + +func TestFilerConf(t *testing.T) { + + fc := NewFilerConf() + + conf := &filer_pb.FilerConf{Locations: []*filer_pb.FilerConf_PathConf{ + { + LocationPrefix: "/buckets/abc", + Collection: "abc", + }, + { + LocationPrefix: "/buckets/abcd", + Collection: "abcd", + }, + }} + fc.doLoadConf(conf) + + assert.Equal(t, "abc", fc.MatchStorageRule("/buckets/abc/jasdf").Collection) + assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection) + +} diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index b1836b046..19da97f6c 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -13,7 +13,7 @@ import ( func (f *Filer) appendToFile(targetFile string, data []byte) error { - assignResult, uploadResult, err2 := f.assignAndUpload(data) + assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data) if err2 != nil { return err2 } @@ -46,14 +46,16 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { return err } -func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { +func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) { // assign a volume location + rule := f.FilerConf.MatchStorageRule(targetFile) assignRequest := &operation.VolumeAssignRequest{ Count: 1, - Collection: f.metaLogCollection, - Replication: f.metaLogReplication, + Collection: util.Nvl(f.metaLogCollection, rule.Collection), + Replication: util.Nvl(f.metaLogReplication, rule.Replication), WritableVolumeCount: 1, } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) if err != nil { return nil, nil, fmt.Errorf("AssignVolume: %v", err) diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go new file mode 100644 index 000000000..3de27da6e --- /dev/null +++ b/weed/filer/filer_on_meta_event.go @@ -0,0 +1,61 @@ +package filer + +import ( + "bytes" + "math" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers +func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { + if DirectoryEtc != event.Directory { + if DirectoryEtc != event.EventNotification.NewParentPath { + return + } + } + + entry := event.EventNotification.NewEntry + if entry == nil { + return + } + + glog.V(0).Infof("procesing %v", event) + if entry.Name == FilerConfName { + f.reloadFilerConfiguration(entry) + } + +} + +func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { + var buf bytes.Buffer + err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) { + fc := NewFilerConf() + err := fc.loadFromChunks(f, entry.Chunks) + if err != nil { + glog.Errorf("read filer conf chunks: %v", err) + return + } + f.FilerConf = fc +} + +func (f *Filer) LoadFilerConf() { + fc := NewFilerConf() + err := util.Retry("loadFilerConf", func() error { + return fc.loadFromFiler(f) + }) + if err != nil { + glog.Errorf("read filer conf: %v", err) + return + } + f.FilerConf = fc +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index b90457339..9437e9992 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -141,6 +141,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string return fmt.Errorf("process %v: %v", resp, err) } lastTsNs = resp.TsNs + + f.onMetadataChangeEvent(resp) + } }) if err != nil { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 9e1342370..3bffa156e 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -294,7 +294,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { // find the earliest incoming chunk newChunks := chunks earliestChunk := newChunks[0] - for i:=1;i 0 { - ttlStr = strconv.Itoa(int(req.TtlSec)) - } - collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication) + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) - var altRequest *operation.VolumeAssignRequest + assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - dataCenter := req.DataCenter - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := req.Rack - if rack == "" { - rack = fs.option.Rack - } - - assignRequest := &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: dataCenter, - Rack: rack, - } - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: "", - Rack: "", - } - } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) @@ -374,8 +350,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, Auth: string(assignResult.Auth), - Collection: collection, - Replication: replication, + Collection: so.Collection, + Replication: so.Replication, }, nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index dc93ae062..ba2d9989c 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -131,6 +131,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.LoadBuckets() + fs.filer.LoadFilerConf() + grace.OnInterrupt(func() { fs.filer.Shutdown() }) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index a64b23927..451e2a2de 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -27,7 +27,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { case "DELETE": stats.FilerRequestCounter.WithLabelValues("delete").Inc() if _, ok := r.URL.Query()["tagging"]; ok { - fs.DeleteTaggingHandler(w,r) + fs.DeleteTaggingHandler(w, r) } else { fs.DeleteHandler(w, r) } @@ -35,7 +35,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { case "PUT": stats.FilerRequestCounter.WithLabelValues("put").Inc() if _, ok := r.URL.Query()["tagging"]; ok { - fs.PutTaggingHandler(w,r) + fs.PutTaggingHandler(w, r) } else { fs.PostHandler(w, r) } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 267b8752d..5806b0c1f 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -29,30 +29,13 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() - ar := &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttlString, - DataCenter: dataCenter, - } - var altRequest *operation.VolumeAssignRequest - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttlString, - DataCenter: "", - Rack: "", - } - } + ar, altRequest := so.ToAssignRequests(1) assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { @@ -62,7 +45,7 @@ func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ra } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid - if fsync { + if so.Fsync { urlLocation += "?fsync=true" } auth = assignResult.Auth @@ -74,25 +57,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) - dataCenter := query.Get("dataCenter") - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := query.Get("rack") - if dataCenter == "" { - rack = fs.option.Rack - } - ttlString := r.URL.Query().Get("ttl") + so := fs.detectStorageOption0(r.RequestURI, + query.Get("collection"), + query.Get("replication"), + query.Get("ttl"), + query.Get("dataCenter"), + query.Get("rack"), + ) - // read ttl in seconds - ttl, err := needle.ReadTTL(ttlString) - ttlSeconds := int32(0) - if err == nil { - ttlSeconds = int32(ttl.Minutes()) * 60 - } - - fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync) + fs.autoChunk(ctx, w, r, so) } @@ -130,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { - // default - collection = fs.option.Collection - replication = fs.option.DefaultReplication - - // get default collection settings - if qCollection != "" { - collection = qCollection - } - if qReplication != "" { - replication = qReplication - } +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption { + collection := util.Nvl(qCollection, fs.option.Collection) + replication := util.Nvl(qReplication, fs.option.DefaultReplication) // required by buckets folder - bucketDefaultReplication := "" + bucketDefaultReplication, fsync := "", false if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] t := strings.Index(bucketAndObjectKey, "/") @@ -160,5 +124,32 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st replication = bucketDefaultReplication } - return + rule := fs.filer.FilerConf.MatchStorageRule(requestURI) + + if ttlSeconds == 0 { + ttl, err := needle.ReadTTL(rule.GetTtl()) + if err != nil { + glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err) + } + ttlSeconds = int32(ttl.Minutes()) * 60 + } + + return &operation.StorageOption{ + Replication: util.Nvl(replication, rule.Replication), + Collection: util.Nvl(collection, rule.Collection), + DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, fs.option.Rack), + TtlSeconds: ttlSeconds, + Fsync: fsync || rule.Fsync, + } +} + +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption { + + ttl, err := needle.ReadTTL(qTtl) + if err != nil { + glog.Errorf("fail to parse ttl %s: %v", qTtl, err) + } + + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index ee0af9aab..fd2db884f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -25,7 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -51,10 +51,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { reply, err = fs.mkdir(ctx, w, r) } else { - reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so) } } else { - reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so) } if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) @@ -66,7 +66,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -87,46 +87,46 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so) if err != nil { return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return } md5bytes = md5Hash.Sum(nil) - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" contentType := "" - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) if err != nil { return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return } md5bytes = md5Hash.Sum(nil) - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { // detect file mode modeStr := r.URL.Query().Get("mode") @@ -163,9 +163,9 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSec, + Replication: so.Replication, + Collection: so.Collection, + TtlSec: so.TtlSeconds, Mime: contentType, Md5: md5bytes, FileSize: uint64(chunkOffset), @@ -199,7 +199,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { var fileChunks []*filer_pb.FileChunk md5Hash := md5.New() @@ -211,7 +211,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) if assignErr != nil { return nil, nil, 0, assignErr } @@ -255,11 +255,11 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht return uploadResult, err } -func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, rack string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) if assignErr != nil { return nil, "", "", assignErr } @@ -270,7 +270,7 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return nil, "", "", uploadErr } - return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + return uploadResult.ToPbFileChunk(fileId, offset), so.Collection, so.Replication, nil } } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 720d97027..3cc0d0c41 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -16,12 +16,12 @@ import ( ) // handling single chunk POST or PUT upload -func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { +func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(so) if err != nil || fileId == "" || urlLocation == "" { - return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) + return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter) } glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) @@ -65,9 +65,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Mode: 0660, Uid: OS_UID, Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, + Replication: so.Replication, + Collection: so.Collection, + TtlSec: so.TtlSeconds, Mime: pu.MimeType, Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go new file mode 100644 index 000000000..d4d70048d --- /dev/null +++ b/weed/shell/command_fs_configure.go @@ -0,0 +1,126 @@ +package shell + +import ( + "bytes" + "flag" + "fmt" + "io" + "math" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsConfigure{}) +} + +type commandFsConfigure struct { +} + +func (c *commandFsConfigure) Name() string { + return "fs.configure" +} + +func (c *commandFsConfigure) Help() string { + return `configure and apply storage options for each location + + # see the possible configuration file content + fs.configure + + # trying the changes and see the possible configuration file content + fs.configure -locationPrfix=/my/folder -collection=abc + fs.configure -locationPrfix=/my/folder -collection=abc -ttl=7d + + # apply the changes + fs.configure -locationPrfix=/my/folder -collection=abc -apply + + # delete the changes + fs.configure -locationPrfix=/my/folder -delete -apply + +` +} + +func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + fsConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + locationPrefix := fsConfigureCommand.String("locationPrefix", "", "path prefix, required to update the path-specific configuration") + collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") + replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") + ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") + fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") + isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") + apply := fsConfigureCommand.Bool("apply", false, "update and apply filer configuration") + if err = fsConfigureCommand.Parse(args); err != nil { + return nil + } + + var buf bytes.Buffer + if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: filer.DirectoryEtc, + Name: filer.FilerConfName, + } + respLookupEntry, err := filer_pb.LookupEntry(client, request) + if err != nil { + return err + } + + return filer.StreamContent(commandEnv.MasterClient, &buf, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + + }); err != nil { + return err + } + + fc := filer.NewFilerConf() + if err = fc.LoadFromBytes(buf.Bytes()); err != nil { + return err + } + + if *locationPrefix != "" { + locConf := &filer_pb.FilerConf_PathConf{ + LocationPrefix: *locationPrefix, + Collection: *collection, + Replication: *replication, + Ttl: *ttl, + Fsync: *fsync, + } + if *isDelete { + fc.DeleteLocationConf(*locationPrefix) + } else { + fc.AddLocationConf(locConf) + } + } + + buf.Reset() + fc.ToText(&buf) + + fmt.Fprintf(writer, string(buf.Bytes())) + fmt.Fprintln(writer) + + if *apply { + + target := fmt.Sprintf("http://%s:%d%s/%s", commandEnv.option.FilerHost, commandEnv.option.FilerPort, filer.DirectoryEtc, filer.FilerConfName) + + // set the HTTP method, url, and request body + req, err := http.NewRequest(http.MethodPut, target, &buf) + if err != nil { + return err + } + + // set the request header Content-Type for json + req.Header.Set("Content-Type", "text/plain; charset=utf-8") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + util.CloseResponse(resp) + + } + + return nil + +} diff --git a/weed/util/constants.go b/weed/util/constants.go index f1864cf95..0aeee111e 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 10) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 11) COMMIT = "" ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 48823c1c3..7851d8293 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -1,7 +1,6 @@ package util import ( - "bytes" "compress/gzip" "encoding/json" "errors" @@ -29,22 +28,6 @@ func init() { } } -func PostBytes(url string, body []byte) ([]byte, error) { - r, err := client.Post(url, "", bytes.NewReader(body)) - if err != nil { - return nil, fmt.Errorf("Post to %s: %v", url, err) - } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return nil, fmt.Errorf("Read response body: %v", err) - } - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } - return b, nil -} - func Post(url string, values url.Values) ([]byte, error) { r, err := client.PostForm(url, values) if err != nil { diff --git a/weed/util/retry.go b/weed/util/retry.go index faaab0351..85c4d150d 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -29,3 +29,13 @@ func Retry(name string, job func() error) (err error) { } return err } + +// return the first non empty string +func Nvl(values ...string) string { + for _, s := range values { + if s != "" { + return s + } + } + return "" +}