Skip to content

Commit

Permalink
Use TF filesystem API to read HDFS
Browse files Browse the repository at this point in the history
  • Loading branch information
372046933 committed May 11, 2022
1 parent 5360168 commit b8da5aa
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 22 deletions.
5 changes: 3 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,9 @@ http_archive(
http_archive(
name = "liborc",
build_file = "//third_party:liborc.BUILD",
patch_cmds = [
"tar -xzf c++/libs/libhdfspp/libhdfspp.tar.gz -C c++/libs/libhdfspp",
patch_args = ["-p1"],
patches = [
"//third_party:liborc.patch",
],
sha256 = "39d983f4c7feb8ea1e8ab8e3e53e9afc643282b7a500b3a93c91aa6490f65c17",
strip_prefix = "orc-rel-release-1.6.14",
Expand Down
23 changes: 3 additions & 20 deletions third_party/liborc.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ cc_library(
],
copts = [],
defines = [],
local_defines = ["BUILD_LIBHDFSPP"],
includes = [
"c++/include",
"c++/src",
Expand All @@ -49,34 +50,16 @@ cc_library(
linkopts = [],
visibility = ["//visibility:public"],
deps = [
":libhdfspp",
":orc_cc_proto",
"@local_config_tf//:libtensorflow_framework",
"@local_config_tf//:tf_header_lib",
"@lz4",
"@snappy",
"@zlib",
"@zstd",
],
)

cc_library(
name = "libhdfspp",
srcs = glob(
[
"c++/libs/libhdfspp/include/hdfspp/*.h",
],
exclude = [
],
),
hdrs = [
],
copts = [],
defines = [],
includes = [
"c++/libs/libhdfspp/include",
],
deps = [],
)

proto_library(
name = "orc_proto",
srcs = ["proto/orc_proto.proto"],
Expand Down
195 changes: 195 additions & 0 deletions third_party/liborc.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
--- a/c++/src/OrcHdfsFile.cc 2022-04-11 04:30:41.000000000 +0800
+++ b/c++/src/OrcHdfsFile.cc 2022-04-11 19:56:37.206680217 +0800
@@ -1,4 +1,5 @@
/**
+ * 1
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,145 +30,57 @@
#include <sys/types.h>
#include <unistd.h>

-#include "hdfspp/hdfspp.h"
+#include "tensorflow/core/platform/env.h"
+#include "tensorflow/core/platform/file_system.h"
+#include "tensorflow/core/platform/logging.h"
+#include "tensorflow/core/platform/status.h"
+#include "tensorflow/core/platform/types.h"

namespace orc {

- class HdfsFileInputStream : public InputStream {
- private:
- std::string filename;
- std::unique_ptr<hdfs::FileHandle> file;
- std::unique_ptr<hdfs::FileSystem> file_system;
- uint64_t totalLength;
- const uint64_t READ_SIZE = 1024 * 1024; //1 MB
-
- public:
- HdfsFileInputStream(std::string _filename) {
- filename = _filename ;
-
- //Building a URI object from the given uri_path
- hdfs::URI uri;
- try {
- uri = hdfs::URI::parse_from_string(filename);
- } catch (const hdfs::uri_parse_error&) {
- throw ParseError("Malformed URI: " + filename);
- }
-
- //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
- //and loads configs core-site.xml and hdfs-site.xml from the conf path
- hdfs::ConfigParser parser;
- if(!parser.LoadDefaultResources()){
- throw ParseError("Could not load default resources. ");
- }
- auto stats = parser.ValidateResources();
- //validating core-site.xml
- if(!stats[0].second.ok()){
- throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString());
- }
- //validating hdfs-site.xml
- if(!stats[1].second.ok()){
- throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString());
- }
- hdfs::Options options;
- if(!parser.get_options(options)){
- throw ParseError("Could not load Options object. ");
- }
- hdfs::IoService * io_service = hdfs::IoService::New();
- //Wrapping file_system into a unique pointer to guarantee deletion
- file_system = std::unique_ptr<hdfs::FileSystem>(
- hdfs::FileSystem::New(io_service, "", options));
- if (file_system.get() == nullptr) {
- throw ParseError("Can't create FileSystem object. ");
- }
- hdfs::Status status;
- //Checking if the user supplied the host
- if(!uri.get_host().empty()){
- //Using port if supplied, otherwise using "" to look up port in configs
- std::string port = uri.has_port() ?
- std::to_string(uri.get_port()) : "";
- status = file_system->Connect(uri.get_host(), port);
- if (!status.ok()) {
- throw ParseError("Can't connect to " + uri.get_host()
- + ":" + port + ". " + status.ToString());
- }
- } else {
- status = file_system->ConnectToDefaultFs();
- if (!status.ok()) {
- if(!options.defaultFS.get_host().empty()){
- throw ParseError("Error connecting to " +
- options.defaultFS.str() + ". " + status.ToString());
- } else {
- throw ParseError(
- "Error connecting to the cluster: defaultFS is empty. "
- + status.ToString());
- }
- }
- }
-
- if (file_system.get() == nullptr) {
- throw ParseError("Can't connect the file system. ");
- }
-
- hdfs::FileHandle *file_raw = nullptr;
- status = file_system->Open(uri.get_path(), &file_raw);
- if (!status.ok()) {
- throw ParseError("Can't open "
- + uri.get_path() + ". " + status.ToString());
- }
- //Wrapping file_raw into a unique pointer to guarantee deletion
- file.reset(file_raw);
-
- hdfs::StatInfo stat_info;
- status = file_system->GetFileInfo(uri.get_path(), stat_info);
- if (!status.ok()) {
- throw ParseError("Can't stat "
- + uri.get_path() + ". " + status.ToString());
- }
- totalLength = stat_info.length;
+class HdfsFileInputStream : public InputStream {
+ private:
+ std::string filename_;
+ std::unique_ptr<tensorflow::RandomAccessFile> file_;
+ uint64_t total_length_;
+ const uint64_t READ_SIZE = 1024 * 1024; // 1 MB
+
+ public:
+ HdfsFileInputStream(std::string filename) {
+ filename_ = filename;
+ tensorflow::Status status =
+ tensorflow::Env::Default()->NewRandomAccessFile(filename_, &file_);
+ if (!status.ok()) {
+ LOG(FATAL) << status.ToString();
}

- uint64_t getLength() const override {
- return totalLength;
- }
+ tensorflow::Env::Default()->GetFileSize(filename_, &total_length_);
+ }

- uint64_t getNaturalReadSize() const override {
- return READ_SIZE;
- }
+ uint64_t getLength() const override { return total_length_; }

- void read(void* buf,
- uint64_t length,
- uint64_t offset) override {
-
- if (!buf) {
- throw ParseError("Buffer is null");
- }
-
- hdfs::Status status;
- size_t total_bytes_read = 0;
- size_t last_bytes_read = 0;
-
- do {
- status = file->PositionRead(buf,
- static_cast<size_t>(length) - total_bytes_read,
- static_cast<off_t>(offset + total_bytes_read), &last_bytes_read);
- if(!status.ok()) {
- throw ParseError("Error reading the file: " + status.ToString());
- }
- total_bytes_read += last_bytes_read;
- } while (total_bytes_read < length);
- }
+ uint64_t getNaturalReadSize() const override { return READ_SIZE; }

- const std::string& getName() const override {
- return filename;
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ if (!buf) {
+ LOG(FATAL) << " Null buf";
+ }
+ tensorflow::StringPiece sp;
+ tensorflow::Status s =
+ file_->Read(offset, length, &sp, static_cast<char*>(buf));
+ if (!(s.ok() || tensorflow::errors::IsOutOfRange(s))) {
+ LOG(FATAL) << s.ToString();
}
+ }

- ~HdfsFileInputStream() override;
- };
+ const std::string& getName() const override { return filename_; }

- HdfsFileInputStream::~HdfsFileInputStream() {
- }
+ ~HdfsFileInputStream() override;
+};

- std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
- return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
- }
+HdfsFileInputStream::~HdfsFileInputStream() {}
+
+std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
+ return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
}
+} // namespace orc

0 comments on commit b8da5aa

Please sign in to comment.