Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/random-test-failures-oct2024'
Browse files Browse the repository at this point in the history
  • Loading branch information
bogovicj committed Dec 13, 2024
2 parents 6c1dd71 + b5006be commit 745fe01
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -289,6 +290,8 @@ public static enum DOWNSAMPLE_METHOD {

private final HashMap<Class<?>, N5MetadataWriter<?>> metadataWriters;

private ThreadPoolExecutor threadPool;

// consider something like this eventually
// private BiFunction<RandomAccessibleInterval<? extends
// NumericType<?>>,long[],RandomAccessibleInterval<?>> downsampler;
Expand Down Expand Up @@ -696,7 +699,6 @@ protected void initializeDataset() {

protected boolean validateDataset() {

System.out.println("validateDataset");
if (dataset.isEmpty()) {
cancel("Please provide a name for the dataset");
return false;
Expand Down Expand Up @@ -1318,6 +1320,11 @@ protected boolean promptOverwriteAndDelete(final N5Writer n5, final String datas
return true;
}

public ExecutorService getExecutorService() {

return threadPool;
}

@SuppressWarnings({"rawtypes", "unchecked"})
private <T extends RealType & NativeType, M extends N5Metadata> boolean write(
final RandomAccessibleInterval<T> image,
Expand All @@ -1330,14 +1337,16 @@ private <T extends RealType & NativeType, M extends N5Metadata> boolean write(

// Here, either allowing overwrite, or not allowing, but the dataset does not exist.
// use threadPool even for single threaded execution for progress monitoring
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
progressMonitor(threadPool);

N5Utils.save(image,
n5, dataset, chunkSize, compression,
Executors.newFixedThreadPool(nThreads));

threadPool);
threadPool.shutdown();
writeMetadata(metadata, n5, dataset);

return true;
}

Expand Down
143 changes: 105 additions & 38 deletions src/test/java/org/janelia/saalfeldlab/n5/TestExportImports.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.janelia.saalfeldlab.n5.hdf5.N5HDF5Reader;
Expand Down Expand Up @@ -98,7 +99,6 @@ public void testEmptyMeta() throws InterruptedException
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void test4dN5v()
{
final int nChannels = 3;
Expand All @@ -120,7 +120,19 @@ public void test4dN5v()
for( int i = 0; i < nChannels; i++)
{
final String n5PathAndDataset = String.format("%s/%s/c%d/s0", n5RootPath, dataset, i);
final List< ImagePlus > impList = reader.process( n5PathAndDataset, false );

final Optional<List<ImagePlus>> impListOpt = TestRunners.tryWaitRepeat(() -> {
return reader.process(n5PathAndDataset, false);
});

List<ImagePlus> impList;
if (impListOpt.isPresent()) {
impList = impListOpt.get();
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5RootPath, dataset));
return;
}

Assert.assertEquals("n5v load channel", 1, impList.size());
Assert.assertTrue("n5v channel equals", equalChannel(imp, i, impList.get(0)));
}
Expand All @@ -135,7 +147,6 @@ public void test4dN5v()
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testReadWriteParse() throws InterruptedException
{
final HashMap<String,String> typeToExtension = new HashMap<>();
Expand Down Expand Up @@ -166,7 +177,6 @@ public void testReadWriteParse() throws InterruptedException
final String dataset = datasetBase;

singleReadWriteParseTest( imp, n5RootPath, dataset, blockSizeString, metatype, compressionString, true );
Thread.sleep(25);
}
}
}
Expand Down Expand Up @@ -279,6 +289,24 @@ public static void singleReadWriteParseTest(
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionType);
writer.run(); // run() closes the n5 writer

// wait
writer.getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS);

readParseTest( imp, outputPath, dataset, blockSizeString, metadataType, compressionType, testMeta, testData, 5);
deleteContainer(outputPath);
}

private static void readParseTest(
final ImagePlus imp,
final String outputPath,
final String dataset,
final String blockSizeString,
final String metadataType,
final String compressionType,
final boolean testMeta,
final boolean testData,
final int nTries) throws InterruptedException {

final String readerDataset;
if (metadataType.equals(N5Importer.MetadataN5ViewerKey) || (metadataType.equals(N5Importer.MetadataN5CosemKey) && imp.getNChannels() > 1))
readerDataset = dataset + "/c0/s0";
Expand All @@ -288,18 +316,20 @@ else if (metadataType.equals(N5Importer.MetadataOmeZarrKey) || metadataType.equa
readerDataset = dataset;

final String n5PathAndDataset = outputPath + readerDataset;

final File n5RootWritten = new File(outputPath);
assertTrue("root does not exist: " + outputPath, n5RootWritten.exists());
if (outputPath.endsWith(".h5"))
assertTrue("hdf5 file exists", n5RootWritten.exists());
else
assertTrue("n5 or zarr root is not a directory:" + outputPath, n5RootWritten.isDirectory());

Thread.sleep(25);
// consider testing this files existence before trying to read?
final N5Importer reader = new N5Importer();
reader.setShow( false );
final List< ImagePlus > impList = reader.process( n5PathAndDataset, false );
reader.setShow(false);
final Optional<List< ImagePlus >> impListOpt = TestRunners.tryWaitRepeat( () -> {
return reader.process(n5PathAndDataset, false);
});

List<ImagePlus> impList;
if (impListOpt.isPresent()) {
impList = impListOpt.get();
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", outputPath, dataset));
return;
}

assertNotNull(String.format( "Failed to open image: %s %s ", outputPath, dataset ), impList);
assertEquals( String.format( "%s %s one image opened ", outputPath, dataset ), 1, impList.size() );
Expand Down Expand Up @@ -329,14 +359,14 @@ else if (metadataType.equals(N5Importer.MetadataOmeZarrKey) || metadataType.equa
assertTrue( String.format( "%s data ", dataset ), imagesEqual );
}

impRead.close();
deleteContainer(outputPath);
}

@Test
public void testRgb() throws InterruptedException
{
final ImagePlus imp = NewImage.createRGBImage("test", 8, 6, 4, NewImage.FILL_NOISE);
imp.setDimensions(1, 4, 1);

final String metaType = N5Importer.MetadataImageJKey;

final String n5RootPath = baseDir + "/test_rgb.n5";
Expand Down Expand Up @@ -369,8 +399,7 @@ public void testMultiChannel()
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testOverwrite() {
public void testOverwrite() throws InterruptedException {

final String n5Root = baseDir + "/overwriteTest.n5";
final String dataset = "dataset";
Expand All @@ -390,33 +419,72 @@ public void testOverwrite() {
writer.setOverwrite(true);
writer.run();

final N5Writer n5 = new N5FSWriter(n5Root);
assertTrue(n5.datasetExists(dataset));
try (final N5Writer n5 = new N5FSWriter(n5Root)) {

assertArrayEquals("size orig", szBig, n5.getDatasetAttributes(dataset).getDimensions());
Optional<DatasetAttributes> dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

final N5ScalePyramidExporter writerNoOverride = new N5ScalePyramidExporter();
writerNoOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerNoOverride.setOverwrite(false);
writerNoOverride.run();
DatasetAttributes dsetAttrs;
if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size orig", szBig, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}
dsetAttrsOpt = Optional.empty();

assertArrayEquals("size after no overwrite", szBig, n5.getDatasetAttributes(dataset).getDimensions());
final N5ScalePyramidExporter writerNoOverride = new N5ScalePyramidExporter();
writerNoOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerNoOverride.setOverwrite(false);
writerNoOverride.run();

dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size after no overwrite", szBig, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}
dsetAttrsOpt = Optional.empty();

final N5ScalePyramidExporter writerOverride = new N5ScalePyramidExporter();
writerOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerOverride.setOverwrite(true);
writerOverride.run();
final N5ScalePyramidExporter writerOverride = new N5ScalePyramidExporter();
writerOverride.setOptions(impSmall, n5Root, dataset, N5ScalePyramidExporter.AUTO_FORMAT, blockSizeString, false,
N5ScalePyramidExporter.DOWN_SAMPLE, metadataType, compressionString);
writerOverride.setOverwrite(true);
writerOverride.run();

dsetAttrsOpt = TestRunners.tryWaitRepeat(() -> {
return n5.getDatasetAttributes(dataset);
});

if (dsetAttrsOpt.isPresent()) {
dsetAttrs = dsetAttrsOpt.get();
assertArrayEquals("size after overwrite", szSmall, dsetAttrs.getDimensions());
} else {
System.err.println(String.format("Skipping test for [ %s : %s ] due to intermittent error ", n5Root, dataset));
n5.remove();
n5.close();
return;
}

assertArrayEquals("size after overwrite", szSmall, n5.getDatasetAttributes(dataset).getDimensions());
n5.remove();
n5.close();
}

n5.remove();
n5.close();
}

@Test
@Ignore // TODO intermittent failures on GH actions
public void testFormatOptions() {

final String n5Root = baseDir + "/root_of_some_container";
Expand Down Expand Up @@ -610,7 +678,6 @@ public void testMultiChannelHelper( final String metatype, final String suffix )
final String n5RootPath = baseDir + "/test_" + metatype + "_" + dimCode + suffix;
final String dataset = String.format("/%s", dimCode);
singleReadWriteParseTest( imp, n5RootPath, dataset, blockSizeString, metatype, compressionString, true, nc == 1 );
Thread.sleep(25);
}
}
}
Expand Down
69 changes: 69 additions & 0 deletions src/test/java/org/janelia/saalfeldlab/n5/TestRunners.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.janelia.saalfeldlab.n5;

import java.util.Optional;
import java.util.function.Supplier;

public class TestRunners {

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier) throws InterruptedException {

return tryWaitRepeat(supplier, 5, 50, 2);
}

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier, int nTries) throws InterruptedException {

return tryWaitRepeat(supplier, nTries, 50, 2);
}

public static <T> Optional<T> tryWaitRepeat(Supplier<T> supplier, int nTries, long waitTimeMillis) throws InterruptedException {

return tryWaitRepeat(supplier, nTries, waitTimeMillis, 2);
}

/**
* Attempts to execute a provided {@link Supplier} multiple times, with an increasing wait period
* between each attempt. If the supplier returns a non-null result, it is wrapped in an
* {@code Optional} and returned. If all attempts fail or return null, an empty {@link Optional} is returned.
*
* <p>The wait time between attempts increases after each failure, multiplied by a specified factor.
*
* @param <T> the type of result provided by the supplier
* @param supplier the {@link Supplier} function that provides the result to be evaluated. The
* function may throw a {@link RuntimeException} if it fails, which will be caught and retried.
* @param nTries the maximum number of attempts to invoke the supplier
* @param initialWaitTimeMillis the initial wait time in milliseconds before retrying after the first failure
* @param waitTimeMultiplier the multiplier to apply to the wait time after each failure, increasing
* the wait time for subsequent retries
* @return an {@link Optional} containing the result from the supplier if a non-null result is returned
* before the maximum number of tries, or an empty {@code Optional} if all attempts fail or
* return null
* @throws InterruptedException thrown if interrupted while waiting
*/
public static <T> Optional<T> tryWaitRepeat(
final Supplier<T> supplier,
final int nTries,
final long initialWaitTimeMillis,
final int waitTimeMultiplier) throws InterruptedException {

int i = 0;
long waitTime = initialWaitTimeMillis;
while (i < nTries) {

if (i == nTries)
break;

try {
T result = supplier.get();
if (result != null)
return Optional.of(result);
} catch (RuntimeException e) {}

Thread.sleep(waitTime);
waitTime *= waitTimeMultiplier;
i++;
}

return Optional.empty();
}

}
Loading

0 comments on commit 745fe01

Please sign in to comment.