Skip to content

Commit

Permalink
add seek
Browse files Browse the repository at this point in the history
Signed-off-by: Kefu Chai <[email protected]>
  • Loading branch information
tchaikov committed Jul 2, 2024
1 parent e34f7ad commit e81841b
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 0 deletions.
6 changes: 6 additions & 0 deletions include/seastar/core/file.hh
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public:
virtual future<struct stat> stat() = 0;
virtual future<> truncate(uint64_t length) = 0;
virtual future<> discard(uint64_t offset, uint64_t length) = 0;
virtual future<uint64_t> seek(off_t offset, int whence) = 0;
virtual future<int> ioctl(uint64_t cmd, void* argp) noexcept;
virtual future<int> ioctl_short(uint64_t cmd, void* argp) noexcept;
virtual future<int> fcntl(int op, uintptr_t arg) noexcept;
Expand Down Expand Up @@ -685,6 +686,11 @@ public:
/// before calling \c close().
future<> close() noexcept;

/// Set the current position of file descriptor \c fd to position pos, from
/// \c whence, and return the new position in bytes relative to the start of
/// the file.
future<uint64_t> seek(off_t offset, int whence);

/// Returns a directory listing, given that this file object is a directory.
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next);

Expand Down
10 changes: 10 additions & 0 deletions include/seastar/core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public:
virtual ~data_source_impl() {}
virtual future<temporary_buffer<char>> get() = 0;
virtual future<temporary_buffer<char>> skip(uint64_t n);
virtual future<temporary_buffer<char>> seek(uint64_t pos);
virtual future<> close() { return make_ready_future<>(); }
};

Expand Down Expand Up @@ -91,6 +92,15 @@ public:
return current_exception_as_future<tmp_buf>();
}
}

future<> seek(uint64_t pos) noexcept {
try {
return _dsi->seek(pos);
} catch (...) {
return current_exception_as_future<>();
}
}

future<> close() noexcept {
try {
return _dsi->close();
Expand Down
5 changes: 5 additions & 0 deletions include/seastar/core/posix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public:
throw_system_error_on(r == -1, "fstat");
return buf.st_size;
}
off_t seek(off_t offset, int whence) {
off_t r = ::lseek(_fd, offset, whence);
throw_system_error_on(r == static_cast<off_t>(-1), "lseek");
return r;
}
std::optional<size_t> read(void* buffer, size_t len) {
auto r = ::read(_fd, buffer, len);
if (r == -1 && errno == EAGAIN) {
Expand Down
1 change: 1 addition & 0 deletions src/core/file-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public:
future<struct stat> stat() noexcept override;
future<> truncate(uint64_t length) noexcept override;
future<> discard(uint64_t offset, uint64_t length) noexcept override;
future<uint64_t> seek(off_t offset, int whence) noexcept override;
future<int> ioctl(uint64_t cmd, void* argp) noexcept override;
future<int> ioctl_short(uint64_t cmd, void* argp) noexcept override;
future<int> fcntl(int op, uintptr_t arg) noexcept override;
Expand Down
19 changes: 19 additions & 0 deletions src/core/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
});
}

future<uint64_t>
posix_file_impl::seek(off_t offset, int whence) noexcept {
return engine()._thread_pool->submit<syscall_result<off_t>>([this, offset, whence] () mutable {
return wrap_syscall<off_t>(::lseek(_fd, offset, whence));
}).then([] (syscall_result<off_t> sr) {
sr.throw_if_error();
return make_ready_future<uint64_t>(sr.result);
});
}

future<>
posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
#ifdef FALLOC_FL_ZERO_RANGE
Expand Down Expand Up @@ -1156,6 +1166,15 @@ future<> file::close() noexcept {
});
}

future<uint64_t> file::seek(off_t offset, int whence) {
try {
return _file_impl->seek(offset, whence);
} catch (...) {
return current_exception_as_future<uint64_t>();
}

}

subscription<directory_entry>
file::list_directory(std::function<future<>(directory_entry de)> next) {
return _file_impl->list_directory(std::move(next));
Expand Down
19 changes: 19 additions & 0 deletions src/core/fstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,25 @@ class file_data_source_impl : public data_source_impl {
update_history_unused(dropped);
return make_ready_future<temporary_buffer<char>>();
}
virtual future<> seek(uint64_t pos) override {
_done.emplace();
if (!_reads_in_progress) {
_done->set_value();
}
_intent.cancel();
co_await _done->get_future();
uint64_t dropped = 0;
for (auto&& c : _read_buffers) {
_reactor._io_stats.fstream_read_aheads_discarded += 1;
_reactor._io_stats.fstream_read_ahead_discarded_bytes += c._size;
dropped += c._size;
ignore_read_future(std::move(c._ready));
}
update_history_unused(dropped);
co_await std::move(_dropped_reads);
co_await _file.seek(pos, SEEK_SET);
_pos = pos;
}
virtual future<> close() override {
_done.emplace();
if (!_reads_in_progress) {
Expand Down

0 comments on commit e81841b

Please sign in to comment.