Skip to content

Commit

Permalink
Merge pull request #2 from hmushegh/FZK
Browse files Browse the repository at this point in the history
Testing looks good
  • Loading branch information
krishna-veni authored Feb 15, 2021
2 parents aa5e473 + 1a9c42a commit c02ea99
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 47 deletions.
50 changes: 27 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,46 @@
dCache Endit Provider
==============================================
# dCache Endit Provider

This [dCache] plugin interfaces with the [Endit] TSM integration system.

To compile the plugin, run:
```
mvn package
```

mvn package
To install the plugin, unpack the resulting tarball in the dCache's plugin directory (usually `/usr/local/share/dcache/plugins`).

To install the plugin, unpack the resulting tarball in the dCache
plugin directory (usually `/usr/local/share/dcache/plugins`).

## Configuration
## Watching provider

To use, define a nearline storage in the dCache admin interface:
```
hsm create osm the-hsm-name endit -directory=/path/to/endit/directory
```

hsm create osm the-hsm-name endit -directory=/path/to/endit/directory

The endit directory must be on the same file system as the pool's
data directory.
The endit directory must be on the same file system as the pool's data directory.

The above will create a provider that uses the JVMs file event
notification feature which in most cases maps directly to a native
file event notification facility of the operating system.
The above will create a provider that uses JVM's file event notification feature which in most cases maps directly to a native file event notification facility of the operating system.

## Polling provider

To use a provider that polls for changes, use:
```
hsm create osm the-hsm-name endit-polling -directory=/path/to/endit/directory
```

hsm create osm osm the-hsm-name -directory=/path/to/endit/directory
This provider accepts two additional options with the following default values:
* `threads=1`
* `period=5000`

This provider accepts two additional options with the following default
values:
The first is the number of threads used for polling for file changes and the second is the poll period in milliseconds.

-threads=1
-period=5000
[dCache]: http://www.dcache.org/
[Endit]: https://github.com/maswan/endit

The first is the number of threads used for polling for file changes
and the second is the poll period in milliseconds.
### Notes on the provider behaviour

[dCache]: http://www.dcache.org/
[Endit]: https://github.com/neicnordic/endit
* The polling provider does *not* monitor the request files, once they are created. Editing or deleting them has no consequences from the perspective of dCache.
* The polling provider will check whether a requested file does exist already in the `/in` folder, before it writes a new request file and, if so, move it into the pool's inventory without staging anything.
* The polling provider will *overwrite* existing request files, when the pool receives a request (that isn't satisfied by the content of the `/in` folder). That is important regarding *retries* of recalls from the pool and *pool restarts*!
* The polling provider will check for *error files* with every poll. If such a file exists for a requested file, it's content is read and verbatim raised as an exception from the staging task. Because the exception is raised, the task will be aborted and all related files should get purged.
* The error file's path has to be `/request/<pnfsid>.err`
* Shutting down the polling provider and/or the pool does clean up existing request files.
15 changes: 13 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.ndgf</groupId>
<artifactId>dcache-endit-provider</artifactId>
<version>1.0.5</version>
<version>1.0.8</version>
<packaging>jar</packaging>

<name>dCache Endit provider</name>
Expand Down Expand Up @@ -44,8 +44,18 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.1.1</version>
<version>24.1.1-jre</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.dcache</groupId>
<artifactId>dcache-nearline-spi</artifactId>
Expand All @@ -67,6 +77,7 @@
<configuration>
<source>1.7</source>
<target>1.7</target>

<compilerArgs>
<arg>-Xlint:deprecation</arg>
</compilerArgs>
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public ListenableFuture<Void> remove(final RemoveRequest request)
@Override
protected ListenableFuture<Set<URI>> flush(FlushRequest request)
{
final PollingTask<Set<URI>> task = new FlushTask(request, outDir, type, name);
final PollingTask<Set<URI>> task = new FlushTask(request, requestDir, outDir, type, name);
return Futures.transformAsync(request.activate(),
new AsyncFunction<Void, Set<URI>>()
{
Expand Down
54 changes: 47 additions & 7 deletions src/main/java/org/ndgf/endit/FlushTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
*/
package org.ndgf.endit;

import diskCacheV111.util.PnfsId;

import java.nio.charset.StandardCharsets;
import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand All @@ -35,25 +34,44 @@

import org.dcache.pool.nearline.spi.FlushRequest;

import diskCacheV111.util.PnfsId;

import static java.util.Arrays.asList;

import org.dcache.util.Checksum;

import com.google.gson.JsonObject;
import org.apache.commons.io.FileUtils;

class FlushTask implements PollingTask<Set<URI>>
{
private final Path outFile;
private final File file;
private final PnfsId pnfsId;
private final String type;
private final String name;

private final Path requestFile;
private final long size;
private final String storageClass;
private final String path;

private final Set<Checksum> checksums;

private final static Logger LOGGER = LoggerFactory.getLogger(FlushTask.class);

public FlushTask(FlushRequest request, Path outDir, String type, String name)
public FlushTask(FlushRequest request, Path requestDir, Path outDir, String type, String name)
{
this.type = type;
this.name = name;
file = request.getFile();
outFile = outDir.resolve(file.getName());
pnfsId = request.getFileAttributes().getPnfsId();
requestFile = requestDir.resolve(pnfsId.toString());
size = request.getFileAttributes().getSize();
storageClass =request.getFileAttributes().getStorageClass();
path = request.getFileAttributes().getStorageInfo().getMap().get("path");
checksums = request.getFileAttributes().getChecksums();

}

public List<Path> getFilesToWatch()
Expand All @@ -64,6 +82,26 @@ public List<Path> getFilesToWatch()
@Override
public Set<URI> start() throws IOException
{
String checksumType="";
String checksumValue="";

for (Checksum checksum: checksums) {
checksumType = checksum.getType().getName().toLowerCase();
checksumValue = checksum.getValue();
}


JsonObject jsObj = new JsonObject();
jsObj.addProperty("file_size", size);
jsObj.addProperty("time", System.currentTimeMillis() / 1000);
jsObj.addProperty("storage_class", storageClass);
jsObj.addProperty("action", "migrate");
jsObj.addProperty("path", path);
jsObj.addProperty("checksumType", checksumType);
jsObj.addProperty("checksumValue", checksumValue);

FileUtils.write(requestFile.toFile(), jsObj.toString(), StandardCharsets.UTF_8);

try {
Files.createLink(outFile, file.toPath());
} catch (FileAlreadyExistsException ignored) {
Expand All @@ -72,7 +110,7 @@ public Set<URI> start() throws IOException
}

@Override
public Set<URI> poll() throws URISyntaxException
public Set<URI> poll() throws URISyntaxException, IOException
{
if (!Files.exists(outFile)) {
LOGGER.debug("File " + name + " deleted");
Expand All @@ -83,14 +121,16 @@ public Set<URI> poll() throws URISyntaxException
// <storename> and <groupname> : The store and group name of the file as provided by the arguments to this executable.
// <bfid>: The unique identifier needed to restore or remove the file if necessary.
LOGGER.debug("Send back uri: " + uri.toString());
return Collections.singleton(uri);
Files.deleteIfExists(requestFile);

return Collections.singleton(uri);
}
return null;
}

@Override
public boolean abort() throws IOException
{
return Files.deleteIfExists(outFile);
return Files.deleteIfExists(outFile) && Files.deleteIfExists(requestFile);
}
}
5 changes: 2 additions & 3 deletions src/main/java/org/ndgf/endit/RemoveTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
*/
package org.ndgf.endit;

import com.google.common.base.Charsets;
import com.google.common.base.Splitter;

import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
Expand Down Expand Up @@ -49,7 +48,7 @@ public Void call() throws IOException

/* Tell Endit to remove it from tape.
*/
Files.write(trashDir.resolve(id), uri.toASCIIString().getBytes(Charsets.UTF_8));
Files.write(trashDir.resolve(id), uri.toASCIIString().getBytes(StandardCharsets.UTF_8));
return null;
}

Expand Down
32 changes: 21 additions & 11 deletions src/main/java/org/ndgf/endit/StageTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package org.ndgf.endit;

import com.google.common.base.Charsets;
import java.nio.charset.StandardCharsets;
import com.sun.jna.Library;
import com.sun.jna.Native;

Expand All @@ -39,6 +39,8 @@
import org.dcache.vehicles.FileAttributes;

import static java.util.Arrays.asList;
import com.google.gson.JsonObject;
import org.apache.commons.io.FileUtils;

class StageTask implements PollingTask<Set<Checksum>>
{
Expand All @@ -55,6 +57,8 @@ class StageTask implements PollingTask<Set<Checksum>>
private final Path errorFile;
private final Path requestFile;
private final long size;
private final String storageClass;
private final String path;

StageTask(StageRequest request, Path requestDir, Path inDir)
{
Expand All @@ -65,6 +69,8 @@ class StageTask implements PollingTask<Set<Checksum>>
inFile = inDir.resolve(id);
errorFile = requestDir.resolve(id + ".err");
requestFile = requestDir.resolve(id);
storageClass = fileAttributes.getStorageClass();
path = request.getFileAttributes().getStorageInfo().getMap().get("path");
}

@Override
Expand All @@ -80,9 +86,17 @@ public Set<Checksum> start() throws Exception
Files.move(inFile, file, StandardCopyOption.ATOMIC_MOVE);
return Collections.emptySet();
}
String s = String.format("{ \"file_size\": %d, \"parent_pid\": %d, \"time\": %d }",
size, PID, System.currentTimeMillis() / 1000);
Files.write(requestFile, s.getBytes(Charsets.UTF_8));

JsonObject jsObj = new JsonObject();
jsObj.addProperty("file_size", size);
jsObj.addProperty("parent_pid", PID);
jsObj.addProperty("time", System.currentTimeMillis() / 1000);
jsObj.addProperty("storage_class", storageClass);
jsObj.addProperty("action", "recall");
jsObj.addProperty("path", path);

FileUtils.write(requestFile.toFile(), jsObj.toString(), StandardCharsets.UTF_8);

return null;
}

Expand All @@ -93,7 +107,7 @@ public Set<Checksum> poll() throws IOException, InterruptedException, EnditExcep
List<String> lines;
try {
Thread.sleep(ERROR_GRACE_PERIOD);
lines = Files.readAllLines(errorFile, Charsets.UTF_8);
lines = Files.readAllLines(errorFile, StandardCharsets.UTF_8);
} finally {
Files.deleteIfExists(inFile);
Files.deleteIfExists(errorFile);
Expand All @@ -117,12 +131,8 @@ public Set<Checksum> poll() throws IOException, InterruptedException, EnditExcep
@Override
public boolean abort() throws Exception
{
if (Files.deleteIfExists(requestFile)) {
Files.deleteIfExists(errorFile);
Files.deleteIfExists(inFile);
return true;
}
return false;
return Files.deleteIfExists(requestFile) && Files.deleteIfExists(errorFile) && Files.deleteIfExists(inFile);

}

private interface CLibrary extends Library
Expand Down

0 comments on commit c02ea99

Please sign in to comment.