diff --git a/README.md b/README.md index 487feb1..4227224 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,46 @@ -dCache Endit Provider -============================================== +# dCache Endit Provider This [dCache] plugin interfaces with the [Endit] TSM integration system. To compile the plugin, run: +``` +mvn package +``` - mvn package +To install the plugin, unpack the resulting tarball in the dCache's plugin directory (usually `/usr/local/share/dcache/plugins`). -To install the plugin, unpack the resulting tarball in the dCache -plugin directory (usually `/usr/local/share/dcache/plugins`). - -## Configuration +## Watching provider To use, define a nearline storage in the dCache admin interface: +``` +hsm create osm the-hsm-name endit -directory=/path/to/endit/directory +``` - hsm create osm the-hsm-name endit -directory=/path/to/endit/directory - -The endit directory must be on the same file system as the pool's -data directory. +The endit directory must be on the same file system as the pool's data directory. -The above will create a provider that uses the JVMs file event -notification feature which in most cases maps directly to a native -file event notification facility of the operating system. +The above will create a provider that uses JVM's file event notification feature which in most cases maps directly to a native file event notification facility of the operating system. ## Polling provider To use a provider that polls for changes, use: +``` +hsm create osm the-hsm-name endit-polling -directory=/path/to/endit/directory +``` - hsm create osm osm the-hsm-name -directory=/path/to/endit/directory +This provider accepts two additional options with the following default values: +* `threads=1` +* `period=5000` -This provider accepts two additional options with the following default -values: +The first is the number of threads used for polling for file changes and the second is the poll period in milliseconds. - -threads=1 - -period=5000 +[dCache]: http://www.dcache.org/ +[Endit]: https://github.com/maswan/endit -The first is the number of threads used for polling for file changes -and the second is the poll period in milliseconds. +### Notes on the provider behaviour -[dCache]: http://www.dcache.org/ -[Endit]: https://github.com/neicnordic/endit +* The polling provider does *not* monitor the request files, once they are created. Editing or deleting them has no consequences from the perspective of dCache. +* The polling provider will check whether a requested file does exist already in the `/in` folder, before it writes a new request file and, if so, move it into the pool's inventory without staging anything. +* The polling provider will *overwrite* existing request files, when the pool receives a request (that isn't satisfied by the content of the `/in` folder). That is important regarding *retries* of recalls from the pool and *pool restarts*! +* The polling provider will check for *error files* with every poll. If such a file exists for a requested file, it's content is read and verbatim raised as an exception from the staging task. Because the exception is raised, the task will be aborted and all related files should get purged. + * The error file's path has to be `/request/.err` +* Shutting down the polling provider and/or the pool does clean up existing request files. diff --git a/pom.xml b/pom.xml index cd26069..30e07b4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.ndgf dcache-endit-provider - 1.0.5 + 1.0.8 jar dCache Endit provider @@ -44,8 +44,18 @@ com.google.guava guava - 24.1.1 + 24.1.1-jre + + commons-io + commons-io + 2.6 + + + com.google.code.gson + gson + 2.8.6 + org.dcache dcache-nearline-spi @@ -67,6 +77,7 @@ 1.7 1.7 + -Xlint:deprecation diff --git a/src/main/java/org/ndgf/endit/.FlushTask.java.swo b/src/main/java/org/ndgf/endit/.FlushTask.java.swo new file mode 100644 index 0000000..cb662a8 Binary files /dev/null and b/src/main/java/org/ndgf/endit/.FlushTask.java.swo differ diff --git a/src/main/java/org/ndgf/endit/AbstractEnditNearlineStorage.java b/src/main/java/org/ndgf/endit/AbstractEnditNearlineStorage.java index 9a3b875..7775a07 100644 --- a/src/main/java/org/ndgf/endit/AbstractEnditNearlineStorage.java +++ b/src/main/java/org/ndgf/endit/AbstractEnditNearlineStorage.java @@ -112,7 +112,7 @@ public ListenableFuture remove(final RemoveRequest request) @Override protected ListenableFuture> flush(FlushRequest request) { - final PollingTask> task = new FlushTask(request, outDir, type, name); + final PollingTask> task = new FlushTask(request, requestDir, outDir, type, name); return Futures.transformAsync(request.activate(), new AsyncFunction>() { diff --git a/src/main/java/org/ndgf/endit/FlushTask.java b/src/main/java/org/ndgf/endit/FlushTask.java index ac8d700..905b9f3 100644 --- a/src/main/java/org/ndgf/endit/FlushTask.java +++ b/src/main/java/org/ndgf/endit/FlushTask.java @@ -17,8 +17,7 @@ */ package org.ndgf.endit; -import diskCacheV111.util.PnfsId; - +import java.nio.charset.StandardCharsets; import java.io.File; import java.io.IOException; import java.net.URI; @@ -35,8 +34,15 @@ import org.dcache.pool.nearline.spi.FlushRequest; +import diskCacheV111.util.PnfsId; + import static java.util.Arrays.asList; +import org.dcache.util.Checksum; + +import com.google.gson.JsonObject; +import org.apache.commons.io.FileUtils; + class FlushTask implements PollingTask> { private final Path outFile; @@ -44,16 +50,28 @@ class FlushTask implements PollingTask> private final PnfsId pnfsId; private final String type; private final String name; - + private final Path requestFile; + private final long size; + private final String storageClass; + private final String path; + + private final Set checksums; + private final static Logger LOGGER = LoggerFactory.getLogger(FlushTask.class); - public FlushTask(FlushRequest request, Path outDir, String type, String name) + public FlushTask(FlushRequest request, Path requestDir, Path outDir, String type, String name) { this.type = type; this.name = name; file = request.getFile(); outFile = outDir.resolve(file.getName()); pnfsId = request.getFileAttributes().getPnfsId(); + requestFile = requestDir.resolve(pnfsId.toString()); + size = request.getFileAttributes().getSize(); + storageClass =request.getFileAttributes().getStorageClass(); + path = request.getFileAttributes().getStorageInfo().getMap().get("path"); + checksums = request.getFileAttributes().getChecksums(); + } public List getFilesToWatch() @@ -64,6 +82,26 @@ public List getFilesToWatch() @Override public Set start() throws IOException { + String checksumType=""; + String checksumValue=""; + + for (Checksum checksum: checksums) { + checksumType = checksum.getType().getName().toLowerCase(); + checksumValue = checksum.getValue(); + } + + + JsonObject jsObj = new JsonObject(); + jsObj.addProperty("file_size", size); + jsObj.addProperty("time", System.currentTimeMillis() / 1000); + jsObj.addProperty("storage_class", storageClass); + jsObj.addProperty("action", "migrate"); + jsObj.addProperty("path", path); + jsObj.addProperty("checksumType", checksumType); + jsObj.addProperty("checksumValue", checksumValue); + + FileUtils.write(requestFile.toFile(), jsObj.toString(), StandardCharsets.UTF_8); + try { Files.createLink(outFile, file.toPath()); } catch (FileAlreadyExistsException ignored) { @@ -72,7 +110,7 @@ public Set start() throws IOException } @Override - public Set poll() throws URISyntaxException + public Set poll() throws URISyntaxException, IOException { if (!Files.exists(outFile)) { LOGGER.debug("File " + name + " deleted"); @@ -83,7 +121,9 @@ public Set poll() throws URISyntaxException // and : The store and group name of the file as provided by the arguments to this executable. // : The unique identifier needed to restore or remove the file if necessary. LOGGER.debug("Send back uri: " + uri.toString()); - return Collections.singleton(uri); + Files.deleteIfExists(requestFile); + + return Collections.singleton(uri); } return null; } @@ -91,6 +131,6 @@ public Set poll() throws URISyntaxException @Override public boolean abort() throws IOException { - return Files.deleteIfExists(outFile); + return Files.deleteIfExists(outFile) && Files.deleteIfExists(requestFile); } } diff --git a/src/main/java/org/ndgf/endit/RemoveTask.java b/src/main/java/org/ndgf/endit/RemoveTask.java index b0e537d..c98c8de 100644 --- a/src/main/java/org/ndgf/endit/RemoveTask.java +++ b/src/main/java/org/ndgf/endit/RemoveTask.java @@ -17,9 +17,8 @@ */ package org.ndgf.endit; -import com.google.common.base.Charsets; import com.google.common.base.Splitter; - +import java.nio.charset.StandardCharsets; import java.io.IOException; import java.net.URI; import java.nio.file.Files; @@ -49,7 +48,7 @@ public Void call() throws IOException /* Tell Endit to remove it from tape. */ - Files.write(trashDir.resolve(id), uri.toASCIIString().getBytes(Charsets.UTF_8)); + Files.write(trashDir.resolve(id), uri.toASCIIString().getBytes(StandardCharsets.UTF_8)); return null; } diff --git a/src/main/java/org/ndgf/endit/StageTask.java b/src/main/java/org/ndgf/endit/StageTask.java index a8da767..0dea0a7 100644 --- a/src/main/java/org/ndgf/endit/StageTask.java +++ b/src/main/java/org/ndgf/endit/StageTask.java @@ -19,7 +19,7 @@ */ package org.ndgf.endit; -import com.google.common.base.Charsets; +import java.nio.charset.StandardCharsets; import com.sun.jna.Library; import com.sun.jna.Native; @@ -39,6 +39,8 @@ import org.dcache.vehicles.FileAttributes; import static java.util.Arrays.asList; +import com.google.gson.JsonObject; +import org.apache.commons.io.FileUtils; class StageTask implements PollingTask> { @@ -55,6 +57,8 @@ class StageTask implements PollingTask> private final Path errorFile; private final Path requestFile; private final long size; + private final String storageClass; + private final String path; StageTask(StageRequest request, Path requestDir, Path inDir) { @@ -65,6 +69,8 @@ class StageTask implements PollingTask> inFile = inDir.resolve(id); errorFile = requestDir.resolve(id + ".err"); requestFile = requestDir.resolve(id); + storageClass = fileAttributes.getStorageClass(); + path = request.getFileAttributes().getStorageInfo().getMap().get("path"); } @Override @@ -80,9 +86,17 @@ public Set start() throws Exception Files.move(inFile, file, StandardCopyOption.ATOMIC_MOVE); return Collections.emptySet(); } - String s = String.format("{ \"file_size\": %d, \"parent_pid\": %d, \"time\": %d }", - size, PID, System.currentTimeMillis() / 1000); - Files.write(requestFile, s.getBytes(Charsets.UTF_8)); + + JsonObject jsObj = new JsonObject(); + jsObj.addProperty("file_size", size); + jsObj.addProperty("parent_pid", PID); + jsObj.addProperty("time", System.currentTimeMillis() / 1000); + jsObj.addProperty("storage_class", storageClass); + jsObj.addProperty("action", "recall"); + jsObj.addProperty("path", path); + + FileUtils.write(requestFile.toFile(), jsObj.toString(), StandardCharsets.UTF_8); + return null; } @@ -93,7 +107,7 @@ public Set poll() throws IOException, InterruptedException, EnditExcep List lines; try { Thread.sleep(ERROR_GRACE_PERIOD); - lines = Files.readAllLines(errorFile, Charsets.UTF_8); + lines = Files.readAllLines(errorFile, StandardCharsets.UTF_8); } finally { Files.deleteIfExists(inFile); Files.deleteIfExists(errorFile); @@ -117,12 +131,8 @@ public Set poll() throws IOException, InterruptedException, EnditExcep @Override public boolean abort() throws Exception { - if (Files.deleteIfExists(requestFile)) { - Files.deleteIfExists(errorFile); - Files.deleteIfExists(inFile); - return true; - } - return false; + return Files.deleteIfExists(requestFile) && Files.deleteIfExists(errorFile) && Files.deleteIfExists(inFile); + } private interface CLibrary extends Library