Skip to content

Commit

Permalink
Merge pull request #64 from koculu/63-enhancement-reduce-overall-memo…
Browse files Browse the repository at this point in the history
…ry-consumption-and-improve-read-speed

63 enhancement reduce overall memory consumption and improve read speed
  • Loading branch information
koculu authored Jul 19, 2024
2 parents 3486123 + 91cc5d6 commit 85a5a81
Show file tree
Hide file tree
Showing 81 changed files with 1,565 additions and 878 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ Note: For small data you don't need a maintainer.
.OpenOrCreate();

using var maintainer = zoneTree.CreateMaintainer();
maintainer.EnableJobForCleaningInactiveCaches = true;

// 2. Read/Write data
zoneTree.Upsert(39, "Hello ZoneTree!");
Expand Down
7 changes: 0 additions & 7 deletions src/Playground/Benchmark/OldTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public static void Insert(WriteAheadLogMode mode, int count)
using var zoneTree = OpenOrCreateZoneTree(mode, dataPath);
using var basicMaintainer = zoneTree.CreateMaintainer();
basicMaintainer.ThresholdForMergeOperationStart = TestConfig.ThresholdForMergeOperationStart;
basicMaintainer.MinimumSparseArrayLength = TestConfig.MinimumSparseArrayLength;
new StatsCollector().LogWithColor(
"Loaded in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -117,8 +116,6 @@ public static void ShowBottomSegments(WriteAheadLogMode mode, int count)
string dataPath = GetDataPath(mode, count);
var stopWatch = new Stopwatch();
stopWatch.Start();
TestConfig.DiskSegmentMaximumCachedBlockCount = 400;
TestConfig.MinimumSparseArrayLength = 33;
using var zoneTree = OpenOrCreateZoneTree(mode, dataPath);

new StatsCollector().LogWithColor(
Expand Down Expand Up @@ -173,7 +170,6 @@ public static void InsertSingleAndMerge(WriteAheadLogMode mode, int count, int k
using var zoneTree = OpenOrCreateZoneTree(mode, dataPath);
using var basicMaintainer = zoneTree.CreateMaintainer();
basicMaintainer.ThresholdForMergeOperationStart = TestConfig.ThresholdForMergeOperationStart;
basicMaintainer.MinimumSparseArrayLength = TestConfig.MinimumSparseArrayLength;
new StatsCollector().LogWithColor(
"Loaded in:",
stopWatch.ElapsedMilliseconds,
Expand Down Expand Up @@ -274,9 +270,7 @@ private static IZoneTree<int, int> OpenOrCreateZoneTree(WriteAheadLogMode mode,
return new ZoneTreeFactory<int, int>()
.DisableDeleteValueConfigurationValidation(false)
.SetMutableSegmentMaxItemCount(TestConfig.MutableSegmentMaxItemCount)
.SetDiskSegmentCompression(TestConfig.EnableDiskSegmentCompression)
.SetDiskSegmentCompressionBlockSize(TestConfig.DiskCompressionBlockSize)
.SetDiskSegmentMaximumCachedBlockCount(TestConfig.DiskSegmentMaximumCachedBlockCount)
.SetDataDirectory(dataPath)
.SetWriteAheadLogDirectory(dataPath)
.ConfigureDiskSegmentOptions(x => x.DiskSegmentMode = TestConfig.DiskSegmentMode)
Expand All @@ -286,7 +280,6 @@ private static IZoneTree<int, int> OpenOrCreateZoneTree(WriteAheadLogMode mode,
x.WriteAheadLogMode = mode;
x.EnableIncrementalBackup = TestConfig.EnableIncrementalBackup;
})
.SetInitialSparseArrayLength(TestConfig.MinimumSparseArrayLength)
.OpenOrCreate();
}
}
4 changes: 2 additions & 2 deletions src/Playground/Benchmark/StevesChallenge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ namespace Playground.Benchmark;
/// How about a billion variable-length records averaging 8-byte keys and 8-byte values?
/// That's a little more challenging. 😀
/// </summary>
public sealed class StevesChallenge : ZoneTreeTestBase<byte[], byte[]>
public sealed class StevesChallenge : ZoneTreeTestBase<Memory<byte>, Memory<byte>>
{
const string FolderName = "-byte-byte";

public override string DataPath =>
public override string DataPath =>
RootPath + WALMode + "-" + Count.ToHuman()
+ "_" + CompressionMethod + "_"
+ FolderName;
Expand Down
11 changes: 1 addition & 10 deletions src/Playground/Benchmark/TestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@ public static class TestConfig

public static bool EnableIncrementalBackup = true;

public static bool EnableDiskSegmentCompression = true;

public static int WALCompressionBlockSize = 32768;

public static int DiskCompressionBlockSize = 32768;

public static int DiskSegmentMaximumCachedBlockCount = 1000;

public static int MinimumSparseArrayLength = 1_000_000;

public static bool EnableParalelInserts = false;

public static DiskSegmentMode DiskSegmentMode = DiskSegmentMode.MultiPartDiskSegment;

public static CompressionMethod CompressionMethod = CompressionMethod.LZ4;

public static int CompressionLevel = 0;

public static void PrintConfig()
Expand All @@ -39,11 +33,8 @@ public static void PrintConfig()
Console.WriteLine($"MutableSegmentMaxItemCount: {MutableSegmentMaxItemCount}");
Console.WriteLine($"DiskSegmentMaxItemCount: {DiskSegmentMaxItemCount}");
Console.WriteLine($"EnableIncrementalBackup: {EnableIncrementalBackup}");
Console.WriteLine($"EnableDiskSegmentCompression: {EnableDiskSegmentCompression}");
Console.WriteLine($"WALCompressionBlockSize: {WALCompressionBlockSize}");
Console.WriteLine($"DiskCompressionBlockSize: {DiskCompressionBlockSize}");
Console.WriteLine($"DiskSegmentMaximumCachedBlockCount: {DiskSegmentMaximumCachedBlockCount}");
Console.WriteLine($"MinimumSparseArrayLength: {MinimumSparseArrayLength}");
Console.WriteLine($"EnableParalelInserts: {EnableParalelInserts}");
Console.WriteLine($"DiskSegmentMode: {DiskSegmentMode}");
Console.WriteLine($"CompressionMethod: {CompressionMethod}");
Expand Down
4 changes: 0 additions & 4 deletions src/Playground/Benchmark/ZoneTreeTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ protected ZoneTreeFactory<TKey, TValue> GetFactory()
.DisableDeleteValueConfigurationValidation(false)
.SetMutableSegmentMaxItemCount(TestConfig.MutableSegmentMaxItemCount)
.SetDiskSegmentMaxItemCount(TestConfig.DiskSegmentMaxItemCount)
.SetDiskSegmentCompression(TestConfig.EnableDiskSegmentCompression)
.SetDiskSegmentCompressionBlockSize(TestConfig.DiskCompressionBlockSize)
.SetDiskSegmentMaximumCachedBlockCount(TestConfig.DiskSegmentMaximumCachedBlockCount)
.SetDataDirectory(DataPath)
.SetInitialSparseArrayLength(TestConfig.MinimumSparseArrayLength)
.ConfigureDiskSegmentOptions(x =>
{
x.DiskSegmentMode = TestConfig.DiskSegmentMode;
Expand Down Expand Up @@ -70,7 +67,6 @@ protected IMaintainer CreateMaintainer(IZoneTree<TKey, TValue> zoneTree)
{
var maintainer = zoneTree.CreateMaintainer();
maintainer.ThresholdForMergeOperationStart = TestConfig.ThresholdForMergeOperationStart;
maintainer.MinimumSparseArrayLength = TestConfig.MinimumSparseArrayLength;
return maintainer;
}

Expand Down
2 changes: 0 additions & 2 deletions src/Playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
TestConfig.ThresholdForMergeOperationStart = 2_000_000;
TestConfig.RecreateDatabases = true;
TestConfig.EnableParalelInserts = false;
TestConfig.DiskSegmentMaximumCachedBlockCount = 1;
TestConfig.DiskCompressionBlockSize = 1024 * 1024 * 10;
TestConfig.WALCompressionBlockSize = 1024 * 32 * 8;
TestConfig.MinimumSparseArrayLength = 0;
TestConfig.DiskSegmentMode = DiskSegmentMode.SingleDiskSegment;
ConsoleLogger.DefaultLogLevel = LogLevel.Info;

Expand Down
2 changes: 0 additions & 2 deletions src/Playground/RecoverFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public static void Recover1()
RandomAccessDeviceManager = deviceManager,
DiskSegmentOptions = new()
{
EnableCompression = true
},
KeySerializer = new Utf8StringSerializer(),
ValueSerializer = new Utf8StringSerializer(),
Expand Down Expand Up @@ -62,7 +61,6 @@ public static void Recover2()
DiskSegmentOptions = new()
{
CompressionBlockSize = meta.DiskSegmentOptions.CompressionBlockSize,
EnableCompression = true,
},
KeySerializer = new Int32Serializer(),
ValueSerializer = new Int32Serializer(),
Expand Down
12 changes: 5 additions & 7 deletions src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static void SeveralParallelTransactions()
})
.OpenOrCreateTransactional();
using var basicMaintainer = new ZoneTreeMaintainer<int, int>(zoneTree);

Console.WriteLine("Loaded: " + stopWatch.ElapsedMilliseconds);

stopWatch.Restart();
Expand Down Expand Up @@ -88,7 +88,6 @@ public static void TestReverseIterator(
})
.ConfigureDiskSegmentOptions(x =>
{
x.EnableCompression = true;
x.DiskSegmentMode = DiskSegmentMode.MultiPartDiskSegment;
x.CompressionBlockSize = 1024 * 1024 * 20;
x.MinimumRecordCount = 10_000;
Expand Down Expand Up @@ -151,7 +150,7 @@ public static void TestReverseIterator(

if (2 * count != iterateCount)
{
throw new Exception($"iterateCount != {2*count} " + iterateCount);
throw new Exception($"iterateCount != {2 * count} " + iterateCount);
}
stopwatchAll.Stop();
Console.WriteLine($"All Time:{stopwatchAll.Elapsed}");
Expand Down Expand Up @@ -201,7 +200,6 @@ public static void TestIteratorBehavior(
})
.ConfigureDiskSegmentOptions(x =>
{
x.EnableCompression = true;
x.DiskSegmentMode = DiskSegmentMode.SingleDiskSegment;
})
.OpenOrCreate();
Expand All @@ -210,7 +208,7 @@ public static void TestIteratorBehavior(
using var maintainer = zoneTree1.CreateMaintainer();
var t1 = Task.Run(() =>
{
for(var i = 0; i < count; ++i)
for (var i = 0; i < count; ++i)
{
zoneTree1.Upsert(i, i);
}
Expand All @@ -223,7 +221,7 @@ public static void TestIteratorBehavior(
var c = zoneTree1.Count();
var s = 0;
Console.WriteLine("count:" + c);
using var it = reverse ?
using var it = reverse ?
zoneTree1.CreateReverseIterator() :
zoneTree1.CreateIterator();
it.Next();
Expand All @@ -236,7 +234,7 @@ public static void TestIteratorBehavior(
var k = it.CurrentKey;
if (Math.Abs(k - p) > 1)
{
Console.WriteLine($"{k} - {k-p}");
Console.WriteLine($"{k} - {k - p}");
}
p = k;
}
Expand Down
59 changes: 58 additions & 1 deletion src/ZoneTree.UnitTests/StringTreeTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Tenray.ZoneTree.AbstractFileStream;
using Newtonsoft.Json.Linq;
using Tenray.ZoneTree.AbstractFileStream;
using Tenray.ZoneTree.Comparers;
using Tenray.ZoneTree.Exceptions;
using Tenray.ZoneTree.Serializers;
using Tenray.ZoneTree.WAL;

Expand Down Expand Up @@ -72,4 +74,59 @@ public void TestSingleCharacter()

}
}

[Test]
public void HelloWorldTest()
{
var dataPath = "data/HelloWorldTest";
if (Directory.Exists(dataPath))
Directory.Delete(dataPath, true);

using var zoneTree = new ZoneTreeFactory<int, string>()
.OpenOrCreate();
zoneTree.Upsert(39, "Hello Zone Tree");
zoneTree.TryGet(39, out var value);
Assert.That(value, Is.EqualTo("Hello Zone Tree"));
}

[Test]
public void HelloWorldTest2()
{
var dataPath = "data/HelloWorldTest2";
if (Directory.Exists(dataPath))
Directory.Delete(dataPath, true);

using var zoneTree = new ZoneTreeFactory<int, string>()
.SetComparer(new Int32ComparerAscending())
.SetDataDirectory(dataPath)
.SetKeySerializer(new Int32Serializer())
.SetValueSerializer(new Utf8StringSerializer())
.OpenOrCreate();

// atomic (thread-safe) on single mutable-segment.
zoneTree.Upsert(39, "Hello Zone Tree!");

zoneTree.TryGet(39, out var value);
Assert.That(value, Is.EqualTo("Hello Zone Tree!"));
// atomic across all segments
zoneTree.TryAtomicAddOrUpdate(39, "a",
bool (ref string x) =>
{
x += "b";
return true;
});
zoneTree.TryGet(39, out value);
Assert.That(value, Is.EqualTo("Hello Zone Tree!b"));
}

[Test]
public void HelloWorldTest3()
{
var dataPath = "data/HelloWorldTest";
if (Directory.Exists(dataPath))
Directory.Delete(dataPath, true);

Assert.Throws<MissingOptionException>(() => new ZoneTreeFactory<int, int>()
.OpenOrCreate());
}
}
67 changes: 38 additions & 29 deletions src/ZoneTree/AbstractFileStream/IFileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,76 @@ public interface IFileStream : IDisposable
bool CanRead { get; }

int ReadTimeout { get; set; }

int WriteTimeout { get; set; }

IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state);

IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state);

void Close();

void CopyTo(Stream destination, int bufferSize);

void CopyTo(Stream destination);

Task CopyToAsync(Stream destination);

Task CopyToAsync(Stream destination, int bufferSize);

Task CopyToAsync(Stream destination, CancellationToken cancellationToken);

Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken);

ValueTask DisposeAsync();

int EndRead(IAsyncResult asyncResult);

void EndWrite(IAsyncResult asyncResult);

void Flush();

void Flush(bool flushToDisk);

Task FlushAsync();

Task FlushAsync(CancellationToken cancellationToken);

int Read(Span<byte> buffer);

int Read(byte[] buffer, int offset, int count);


/// <summary>
/// Custom ReadExactly method that does not use internal buffer.
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns></returns>
int ReadFaster(byte[] buffer, int offset, int count);

ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default);

Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);

Task<int> ReadAsync(byte[] buffer, int offset, int count);

int ReadByte();

long Seek(long offset, SeekOrigin origin);

void SetLength(long value);

void Write(byte[] buffer, int offset, int count);

void Write(ReadOnlySpan<byte> buffer);

Task WriteAsync(byte[] buffer, int offset, int count);

Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);

ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

void WriteByte(byte value);

Stream ToStream();
Expand Down
Loading

0 comments on commit 85a5a81

Please sign in to comment.