Skip to content

Commit

Permalink
feat: Use Velox fs for ssd cache checkpoint file (#11783)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11783

Switch the ssd cache checkpoint file to use Velox filesystem for file r/w operations, so that more advanced testing can be built by leveraging features like fault injections.

Reviewed By: xiaoxmeng

Differential Revision: D66892136

fbshipit-source-id: d4da2df1f5da976b0a71c4a1a087a2c0ba569328
  • Loading branch information
zacw7 authored and facebook-github-bot committed Dec 12, 2024
1 parent d9a6012 commit ac13440
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 164 deletions.
295 changes: 170 additions & 125 deletions velox/common/caching/SsdFile.cpp

Large diffs are not rendered by default.

63 changes: 55 additions & 8 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFileTracker.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileInputStream.h"
#include "velox/common/file/FileSystems.h"

DECLARE_bool(ssd_odirect);
Expand Down Expand Up @@ -373,13 +374,7 @@ class SsdFile {

/// Returns the checkpoint file path.
std::string getCheckpointFilePath() const {
// Faulty file path needs to be handled manually before we switch checkpoint
// file to Velox filesystem.
const std::string faultyPrefix = "faulty:";
std::string checkpointPath = fileName_ + kCheckpointExtension;
return checkpointPath.find(faultyPrefix) == 0
? checkpointPath.substr(faultyPrefix.size())
: checkpointPath;
return fileName_ + kCheckpointExtension;
}

/// Resets this' to a post-construction empty state. See SsdCache::clear().
Expand Down Expand Up @@ -448,7 +443,7 @@ class SsdFile {
// Reads a checkpoint state file and sets 'this' accordingly if read is
// successful. Return true for successful read. A failed read deletes the
// checkpoint and leaves the log truncated open.
void readCheckpoint(std::ifstream& state);
void readCheckpoint(std::unique_ptr<common::FileInputStream> stream);

// Logs an error message, deletes the checkpoint and stop making new
// checkpoints.
Expand Down Expand Up @@ -493,6 +488,48 @@ class SsdFile {
// file system not supporting cow feature.
void disableFileCow();

// Truncates the eviction log file to 0.
void truncateEvictLogFile();

// Truncates the checkpoint file to 0.
void truncateCheckpointFile();

// Deletes the eviction log file if it exists.
void deleteEvictLogFile();

// Deletes the checkpoint file if it exists.
void deleteCheckpointFile();

// Allocates 'kCheckpointBufferSize' buffer from cache memory pool for
// checkpointing.
void allocateCheckpointBuffer();

// Frees checkpoint buffer.
void freeCheckpointBuffer();

// Appends 'size' bytes from source buffer to the checkpoint buffer and
// flushes the buffered data to disk if necessary.
void appendToCheckpointBuffer(const void* source, int32_t size);

void appendToCheckpointBuffer(const std::string& string);

template <typename T>
void appendToCheckpointBuffer(const std::vector<T>& vector) {
appendToCheckpointBuffer(vector.data(), vector.size() * sizeof(T));
}

template <typename T>
void appendToCheckpointBuffer(const T& data) {
appendToCheckpointBuffer(&data, sizeof(data));
}

// Flushs the buffered data to write file if the buffered data has exceeded
// 'kCheckpointBufferSize' or 'force' is set true.
void maybeFlushCheckpointBuffer(uint32_t appendBytes, bool force = false);

// Flushs the buffered data to disk.
void flushCheckpointFile();

// Returns true if checksum write is enabled for the given version.
static bool isChecksumEnabledOnCheckpointVersion(
const std::string& checkpointVersion) {
Expand All @@ -501,6 +538,7 @@ class SsdFile {

static constexpr const char* kLogExtension = ".log";
static constexpr const char* kCheckpointExtension = ".cpt";
static constexpr uint32_t kCheckpointBufferSize = 1 << 20; // 1MB

// Name of cache file, used as prefix for checkpoint files.
const std::string fileName_;
Expand Down Expand Up @@ -565,6 +603,9 @@ class SsdFile {
// WriteFile for evict log file.
std::unique_ptr<WriteFile> evictLogWriteFile_;

// WriteFile for checkpoint file.
std::unique_ptr<WriteFile> checkpointWriteFile_;

// Counters.
SsdCacheStats stats_;

Expand All @@ -581,6 +622,12 @@ class SsdFile {
// True if there was an error with checkpoint and the checkpoint was deleted.
bool checkpointDeleted_{false};

// Used for checkpoint buffer and is only set during the checkpoint write.
void* checkpointBuffer_ = nullptr;

// Buffered data size for checkpoint.
uint32_t checkpointBufferedDataSize_;

friend class test::SsdFileTestHelper;
friend class test::SsdCacheTestHelper;
};
Expand Down
14 changes: 14 additions & 0 deletions velox/common/caching/tests/AsyncDataCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ class AsyncDataCacheTest : public ::testing::TestWithParam<TestParam> {
}
}

void initializeMemoryManager(int64_t capacity) {
if (!memory::MemoryManager::testInstance()) {
memory::MemoryManagerOptions options;
options.useMmapAllocator = true;
options.allocatorCapacity = capacity;
options.arbitratorCapacity = capacity;
options.trackDefaultUsage = true;
memory::MemoryManager::initialize(options);
}
}

void initializeCache(
uint64_t maxBytes,
int64_t ssdBytes = 0,
Expand Down Expand Up @@ -1204,6 +1215,8 @@ TEST_P(AsyncDataCacheTest, shutdown) {
constexpr uint64_t kRamBytes = 16 << 20;
constexpr uint64_t kSsdBytes = 64UL << 20;

initializeMemoryManager(kRamBytes);

for (const auto asyncShutdown : {false, true}) {
SCOPED_TRACE(fmt::format("asyncShutdown {}", asyncShutdown));
// Initialize cache with a big checkpointIntervalBytes, giving eviction log
Expand Down Expand Up @@ -1535,6 +1548,7 @@ TEST_P(AsyncDataCacheTest, checkpoint) {
constexpr uint64_t kRamBytes = 16UL << 20; // 16 MB
constexpr uint64_t kSsdBytes = 64UL << 20; // 64 MB

initializeMemoryManager(kRamBytes);
initializeCache(
kRamBytes,
kSsdBytes,
Expand Down
10 changes: 9 additions & 1 deletion velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <fcntl.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -100,7 +101,8 @@ class SsdFileTest : public testing::Test {
checkpointIntervalBytes,
disableFileCow,
checksumEnabled,
checksumReadVerificationEnabled);
checksumReadVerificationEnabled,
ssdExecutor());
ssdFile_ = std::make_unique<SsdFile>(config);
if (ssdFile_ != nullptr) {
ssdFileHelper_ =
Expand Down Expand Up @@ -167,6 +169,12 @@ class SsdFileTest : public testing::Test {
}
}

static folly::IOThreadPoolExecutor* ssdExecutor() {
static std::unique_ptr<folly::IOThreadPoolExecutor> ssdExecutor =
std::make_unique<folly::IOThreadPoolExecutor>(20);
return ssdExecutor.get();
}

// Gets consecutive entries from file 'fileId' starting at 'startOffset' with
// sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double
// each time and go back to 'minSize' after exceeding 'maxSize'. This stops
Expand Down
2 changes: 2 additions & 0 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ void LocalWriteFile::truncate(int64_t newSize) {
0,
"ftruncate failed in LocalWriteFile::truncate: {}.",
folly::errnoStr(errno));
// Reposition the file offset to the end of the file for append().
::lseek(fd_, newSize, SEEK_SET);
size_ = newSize;
}

Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/Memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options)
.coreOnAllocationFailureEnabled =
options.coreOnAllocationFailureEnabled})},
spillPool_{addLeafPool("__sys_spilling__")},
cachePool_{addLeafPool("__sys_caching__")},
tracePool_{addLeafPool("__sys_tracing__")},
sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) {
VELOX_CHECK_NOT_NULL(allocator_);
Expand Down
6 changes: 6 additions & 0 deletions velox/common/memory/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ class MemoryManager {
return spillPool_.get();
}

/// Returns the process wide leaf memory pool used for ssd cache.
MemoryPool* cachePool() {
return cachePool_.get();
}

/// Returns the process wide leaf memory pool used for query tracing.
MemoryPool* tracePool() const {
return tracePool_.get();
Expand Down Expand Up @@ -311,6 +316,7 @@ class MemoryManager {

const std::shared_ptr<MemoryPool> sysRoot_;
const std::shared_ptr<MemoryPool> spillPool_;
const std::shared_ptr<MemoryPool> cachePool_;
const std::shared_ptr<MemoryPool> tracePool_;
const std::vector<std::shared_ptr<MemoryPool>> sharedLeafPools_;

Expand Down
60 changes: 30 additions & 30 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) {
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools;
{
MemoryManager manager{};
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);
ASSERT_EQ(manager.capacity(), kMaxMemory);
ASSERT_EQ(0, manager.getTotalBytes());
ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment);
Expand All @@ -67,7 +67,7 @@ TEST_F(MemoryManagerTest, ctor) {
MemoryManager manager{
{.allocatorCapacity = kCapacity, .arbitratorCapacity = kCapacity}};
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
}
{
Expand All @@ -81,7 +81,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment());
// TODO: replace with root pool memory tracker quota check.
ASSERT_EQ(
kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount());
kSharedPoolCount + 3, manager.testingDefaultRoot().getChildCount());
ASSERT_EQ(kCapacity, manager.capacity());
ASSERT_EQ(0, manager.getTotalBytes());
}
Expand All @@ -98,7 +98,7 @@ TEST_F(MemoryManagerTest, ctor) {
ASSERT_EQ(arbitrator->stats().maxCapacityBytes, kCapacity);
ASSERT_EQ(
manager.toString(),
"Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]");
"Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]");
}
}

Expand Down Expand Up @@ -263,10 +263,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) {
TEST_F(MemoryManagerTest, defaultMemoryManager) {
auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager());
auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
ASSERT_EQ(managerA.numPools(), 2);
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3;
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount);
ASSERT_EQ(managerB.numPools(), 2);
ASSERT_EQ(managerB.numPools(), 3);
ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount);

auto child1 = managerA.addLeafPool("child_1");
Expand All @@ -277,38 +277,38 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount());
EXPECT_EQ(
kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 4);
ASSERT_EQ(managerB.numPools(), 4);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 5);
ASSERT_EQ(managerB.numPools(), 5);
auto pool = managerB.addRootPool();
ASSERT_EQ(managerA.numPools(), 6);
ASSERT_EQ(managerB.numPools(), 6);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
child1.reset();
EXPECT_EQ(
kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount());
child2.reset();
EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount());
ASSERT_EQ(managerA.numPools(), 4);
ASSERT_EQ(managerB.numPools(), 4);
pool.reset();
ASSERT_EQ(managerA.numPools(), 3);
ASSERT_EQ(managerB.numPools(), 3);
pool.reset();
ASSERT_EQ(managerA.numPools(), 2);
ASSERT_EQ(managerB.numPools(), 2);
ASSERT_EQ(
managerA.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
ASSERT_EQ(
managerB.toString(),
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]");
const std::string detailedManagerStr = managerA.toString(true);
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr(
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
"Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n"));
ASSERT_THAT(
detailedManagerStr,
testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n"));
Expand All @@ -326,7 +326,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) {
// TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool.
TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) {
auto& manager = toMemoryManager(deprecatedDefaultMemoryManager());
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3;
ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount);
{
auto poolA = deprecatedAddDefaultLeafMemoryPool();
Expand Down Expand Up @@ -381,7 +381,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
MemoryManagerOptions options;
options.alignment = alignment;
MemoryManager manager{options};
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);
const int numPools = 100;
std::vector<std::shared_ptr<MemoryPool>> userRootPools;
std::vector<std::shared_ptr<MemoryPool>> userLeafPools;
Expand All @@ -406,14 +406,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) {
ASSERT_FALSE(rootUnamedPool->name().empty());
ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootUnamedPool->parent(), nullptr);
ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1);
ASSERT_EQ(manager.numPools(), 1 + numPools + 3 + 1);
userLeafPools.clear();
leafUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1);
ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1 + 1);
userRootPools.clear();
ASSERT_EQ(manager.numPools(), 1 + 2);
ASSERT_EQ(manager.numPools(), 1 + 3);
rootUnamedPool.reset();
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);
}

// TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side
Expand All @@ -430,7 +430,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_NE(manager, globalManager);
ASSERT_EQ(manager, memoryManager());
auto* managerII = memoryManager();
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2;
const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3;
{
auto& rootI = manager->testingDefaultRoot();
const std::string childIName("some_child");
Expand Down Expand Up @@ -464,9 +464,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) {
ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate);
ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1);
ASSERT_EQ(manager->numPools(), 2 + 2);
ASSERT_EQ(manager->numPools(), 2 + 3);
}
ASSERT_EQ(manager->numPools(), 2);
ASSERT_EQ(manager->numPools(), 3);
}

TEST_F(MemoryManagerTest, alignmentOptionCheck) {
Expand Down Expand Up @@ -563,9 +563,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) {
}
stopCheck = true;
checkThread.join();
ASSERT_EQ(manager.numPools(), pools.size() + 2);
ASSERT_EQ(manager.numPools(), pools.size() + 3);
pools.clear();
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);
}

TEST_F(MemoryManagerTest, quotaEnforcement) {
Expand Down Expand Up @@ -681,7 +681,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) {
ASSERT_EQ(manager.capacity(), 64LL << 20);
ASSERT_EQ(manager.shrinkPools(), 0);
// Default 1 system pool with 1 leaf child
ASSERT_EQ(manager.numPools(), 2);
ASSERT_EQ(manager.numPools(), 3);

VELOX_ASSERT_THROW(
leaf0->allocate(38LL << 20), "Exceeded memory pool capacity");
Expand Down

0 comments on commit ac13440

Please sign in to comment.