Skip to content

Commit

Permalink
storage: Fix hadoop compatibility issues. #TASK-7320
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Jan 17, 2025
1 parent 3349ee6 commit 74dda67
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public static HBaseCompatApi getInstance() {
public abstract List<ServerName> getServerList(Admin admin) throws IOException;

public abstract byte[][] getTableStartKeys(Admin admin, Table table) throws IOException;

public abstract boolean isSnappyAvailable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;

Expand Down Expand Up @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
}
return startKeys;
}

@Override
public boolean isSnappyAvailable() {
return SnappyCodec.isNativeCodeLoaded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;

Expand Down Expand Up @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
}
return startKeys;
}

@Override
public boolean isSnappyAvailable() {
return SnappyCodec.isNativeCodeLoaded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<properties>
<hbase.version>2.4.17</hbase.version>
<hadoop.version>2.10.0</hadoop.version>
<hadoop.version>3.3.4</hadoop.version>
<phoenix.version>5.1.3</phoenix.version>
<phoenix-thirdparty.version>1.1.0</phoenix-thirdparty.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ public List<ServerName> getServerList(Admin admin) throws IOException {
public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
return table.getRegionLocator().getStartKeys();
}

@Override
public boolean isSnappyAvailable() {
// [HADOOP-17125] - Using snappy-java in SnappyCodec - 3.3.1, 3.4.0
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@
<ignoredUsedUndeclaredDependency>
org.apache.tephra:tephra-core
</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>
org.apache.tephra:tephra-core-shaded
</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>
com.lmax:disruptor
</ignoredUsedUndeclaredDependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Expand All @@ -24,6 +23,7 @@
import org.opencb.biodata.models.variant.avro.GeneCancerAssociation;
import org.opencb.biodata.models.variant.avro.VariantAvro;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
import org.opencb.opencga.storage.hadoop.HBaseCompat;
import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver;
import org.opencb.opencga.storage.hadoop.variant.mr.VariantFileOutputFormat;
import org.opencb.opencga.storage.hadoop.variant.mr.VariantLocusKey;
Expand Down Expand Up @@ -144,7 +144,7 @@ protected void setupJob(Job job) throws IOException {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
outputFormatClass = LazyOutputFormat.class;
}
if (SnappyCodec.isNativeCodeLoaded()) {
if (HBaseCompat.getInstance().isSnappyAvailable()) {
FileOutputFormat.setCompressOutput(job, true);
// FIXME: SnappyCodec might not be available in client side
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
Expand All @@ -17,6 +16,7 @@
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
import org.opencb.opencga.storage.hadoop.HBaseCompat;
import org.opencb.opencga.storage.hadoop.utils.ValueOnlyTextOutputFormat;
import org.opencb.opencga.storage.hadoop.variant.io.VariantDriver;
import org.slf4j.Logger;
Expand Down Expand Up @@ -164,7 +164,7 @@ protected void setupJob(Job job) throws IOException {
outputFormatClass = LazyOutputFormat.class;

job.setOutputFormatClass(ValueOnlyTextOutputFormat.class);
if (SnappyCodec.isNativeCodeLoaded()) {
if (HBaseCompat.getInstance().isSnappyAvailable()) {
FileOutputFormat.setCompressOutput(job, true);
// FIXME: SnappyCodec might not be available in client side
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
Expand Down Expand Up @@ -54,6 +53,15 @@
*/
public class VariantFileOutputFormat extends FileOutputFormat<Variant, NullWritable> {

private static Class<?> abfsOutputStreamClass;

static {
try {
abfsOutputStreamClass = Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream");
} catch (ClassNotFoundException e) {
abfsOutputStreamClass = null;
}
}

public static final String VARIANT_OUTPUT_FORMAT = "variant.output_format";

Expand All @@ -74,7 +82,7 @@ public RecordWriter<Variant, NullWritable> getRecordWriter(TaskAttemptContext jo
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fsOs = fs.create(file, false);
OutputStream out;
if (fsOs.getWrappedStream() instanceof AbfsOutputStream) {
if (isAbfsOutputStream(fsOs)) {
// Disable flush on ABFS. See HADOOP-16548
out = new FilterOutputStream(fsOs) {
@Override
Expand All @@ -92,6 +100,10 @@ public void flush() throws IOException {
return new VariantRecordWriter(configureWriter(job, countingOut), countingOut);
}

private static boolean isAbfsOutputStream(FSDataOutputStream fsOs) {
return abfsOutputStreamClass != null && abfsOutputStreamClass.isInstance(fsOs.getWrappedStream());
}

private DataWriter<Variant> configureWriter(final TaskAttemptContext job, OutputStream fileOut) throws IOException {
// job.getCounter(VcfDataWriter.class.getName(), "failed").increment(0); // init
final Configuration conf = job.getConfiguration();
Expand Down

0 comments on commit 74dda67

Please sign in to comment.