Skip to content

Commit

Permalink
Merge pull request #88 from Etherna/feature/BHM-148-import-bee-load-b…
Browse files Browse the repository at this point in the history
…alancer

Feature/bhm 148 import bee load balancer
  • Loading branch information
tmm360 authored Dec 21, 2024
2 parents 4bda8eb + 666a8eb commit 2b82c31
Show file tree
Hide file tree
Showing 35 changed files with 1,083 additions and 72 deletions.
2 changes: 1 addition & 1 deletion src/Beehive.Domain/Beehive.Domain.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Bee.Net.Core" Version="0.4.0-alpha.73" />
<PackageReference Include="Bee.Net.Core" Version="0.4.0-alpha.75" />
<PackageReference Include="Etherna.DomainEvents" Version="1.4.0" />
<PackageReference Include="MongODM.Core" Version="0.25.0-alpha.6" />
<PackageReference Include="Nethereum.Web3" Version="4.26.0" />
Expand Down
2 changes: 1 addition & 1 deletion src/Beehive.Services/Beehive.Services.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Bee.Net.Client" Version="0.4.0-alpha.73" />
<PackageReference Include="Bee.Net.Client" Version="0.4.0-alpha.75" />
<PackageReference Include="Etherna.DomainEvents.AspNetCore" Version="1.4.0" />
<PackageReference Include="MongODM.Hangfire" Version="0.25.0-alpha.6" />
<PackageReference Include="Nethereum.JsonRpc.WebSocketClient" Version="4.26.0" />
Expand Down
35 changes: 18 additions & 17 deletions src/Beehive.Services/Utilities/BeeNodeLiveManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using Etherna.BeeNet.Exceptions;
using Etherna.BeeNet.Models;
using Etherna.MongoDB.Driver.Linq;
using MoreLinq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -39,7 +38,7 @@ internal sealed class BeeNodeLiveManager(IBeehiveDbContext dbContext)
: IBeeNodeLiveManager, IDisposable
{
// Consts.
private const int HeartbeatPeriod = 10000; //10s
private readonly TimeSpan HeartbeatPeriod = TimeSpan.FromSeconds(10);

// Fields.
private Timer? heartbeatTimer;
Expand Down Expand Up @@ -80,9 +79,6 @@ public async Task<BeeNodeLiveInstance> GetBeeNodeLiveInstanceAsync(string nodeId
return await AddBeeNodeAsync(beeNode);
}

public BeeNodeLiveInstance GetBeeNodeLiveInstanceByOwnedPostageBatch(PostageBatchId batchId) =>
AllNodes.First(n => n.Status.PostageBatchesId.Contains(batchId));

public IEnumerable<BeeNodeLiveInstance> GetBeeNodeLiveInstancesByPinnedContent(string hash, bool requireAliveNodes) =>
AllNodes.Where(n => n.Status.PinnedHashes.Contains(hash) &&
(!requireAliveNodes || n.Status.IsAlive));
Expand All @@ -98,28 +94,33 @@ public async Task LoadAllNodesAsync()
public bool RemoveBeeNode(string nodeId) =>
beeNodeInstances.TryRemove(nodeId, out _);

public BeeNodeLiveInstance SelectDownloadNode(SwarmAddress address)
public Task<BeeNodeLiveInstance> SelectDownloadNodeAsync(SwarmAddress address) =>
SelectDownloadNodeAsync(address.Hash);

public Task<BeeNodeLiveInstance> SelectDownloadNodeAsync(SwarmHash hash) =>
SelectHealthyNodeAsync();

public async Task<BeeNodeLiveInstance> SelectHealthyNodeAsync(
BeeNodeSelectionMode mode = BeeNodeSelectionMode.RoundRobin,
string? selectionContext = null,
Func<BeeNodeLiveInstance, Task<bool>>? isValidPredicate = null)
{
// Select all alive nodes.
var beeNodeLiveInstances = HealthyNodes.ToArray();
if (beeNodeLiveInstances.Length == 0)
throw new InvalidOperationException("Can't select a valid node");

//select a random one
return beeNodeLiveInstances.RandomSubset(1).First();
var node = await TrySelectHealthyNodeAsync(mode, selectionContext, isValidPredicate);
return node ?? throw new InvalidOperationException();
}

public BeeNodeLiveInstance SelectDownloadNode(SwarmHash hash) =>
SelectDownloadNode(new SwarmAddress(hash));
public BeeNodeLiveInstance SelectUploadNode(PostageBatchId batchId) =>
AllNodes.First(n => n.Status.PostageBatchesId.Contains(batchId));

public void StartHealthHeartbeat() =>
heartbeatTimer = new Timer(async _ => await HeartbeatCallbackAsync(), null, 0, HeartbeatPeriod);
heartbeatTimer = new Timer(async _ =>
await HeartbeatCallbackAsync(), null, 0, (int)HeartbeatPeriod.TotalMilliseconds);

public void StopHealthHeartbeat() =>
heartbeatTimer?.Change(Timeout.Infinite, 0);

public async Task<BeeNodeLiveInstance?> TrySelectHealthyNodeAsync(
BeeNodeSelectionMode mode,
BeeNodeSelectionMode mode = BeeNodeSelectionMode.RoundRobin,
string? selectionContext = null,
Func<BeeNodeLiveInstance, Task<bool>>? isValidPredicate = null)
{
Expand Down
12 changes: 8 additions & 4 deletions src/Beehive.Services/Utilities/IBeeNodeLiveManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ public interface IBeeNodeLiveManager
// Methods.
Task<BeeNodeLiveInstance> AddBeeNodeAsync(BeeNode beeNode);
Task<BeeNodeLiveInstance> GetBeeNodeLiveInstanceAsync(string nodeId);
BeeNodeLiveInstance GetBeeNodeLiveInstanceByOwnedPostageBatch(PostageBatchId batchId);
IEnumerable<BeeNodeLiveInstance> GetBeeNodeLiveInstancesByPinnedContent(string hash, bool requireAliveNodes);
Task LoadAllNodesAsync();
bool RemoveBeeNode(string nodeId);
BeeNodeLiveInstance SelectDownloadNode(SwarmAddress address);
BeeNodeLiveInstance SelectDownloadNode(SwarmHash hash);
Task<BeeNodeLiveInstance> SelectDownloadNodeAsync(SwarmAddress address);
Task<BeeNodeLiveInstance> SelectDownloadNodeAsync(SwarmHash hash);
Task<BeeNodeLiveInstance> SelectHealthyNodeAsync(
BeeNodeSelectionMode mode = BeeNodeSelectionMode.RoundRobin,
string? selectionContext = null,
Func<BeeNodeLiveInstance, Task<bool>>? isValidPredicate = null);
BeeNodeLiveInstance SelectUploadNode(PostageBatchId batchId);
void StartHealthHeartbeat();
void StopHealthHeartbeat();
Task<BeeNodeLiveInstance?> TrySelectHealthyNodeAsync(
BeeNodeSelectionMode mode,
BeeNodeSelectionMode mode = BeeNodeSelectionMode.RoundRobin,
string? selectionContext = null,
Func<BeeNodeLiveInstance, Task<bool>>? isValidPredicate = null);
}
Expand Down
53 changes: 53 additions & 0 deletions src/Beehive/Areas/Api/Controllers/BytesController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021-present Etherna SA
// This file is part of Beehive.
//
// Beehive is free software: you can redistribute it and/or modify it under the terms of the
// GNU Affero General Public License as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// Beehive is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License along with Beehive.
// If not, see <https://www.gnu.org/licenses/>.

using Etherna.Beehive.Areas.Api.DtoModels;
using Etherna.Beehive.Areas.Api.Services;
using Etherna.Beehive.Attributes;
using Etherna.BeeNet.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;

namespace Etherna.Beehive.Areas.Api.Controllers
{
[ApiController]
[Route("bytes")]
[Route("v{api-version:apiVersion}/bytes")]
public class BytesController(IBytesControllerService service)
: ControllerBase
{
// Get.

[HttpGet("{*hash:minlength(1)}")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status404NotFound)]
public Task<IResult> DownloadBytesAsync(SwarmHash hash) =>
service.DownloadBytesAsync(hash, HttpContext);

// Post.

[HttpPost]
[SimpleExceptionFilter]
[ProducesResponseType(typeof(ChunkReferenceDto), StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status402PaymentRequired)]
public Task<IResult> UploadBytesAsync(
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.UploadBytesAsync(batchId, HttpContext);
}
}
15 changes: 14 additions & 1 deletion src/Beehive/Areas/Api/Controllers/BzzController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// You should have received a copy of the GNU Affero General Public License along with Beehive.
// If not, see <https://www.gnu.org/licenses/>.

using Etherna.Beehive.Areas.Api.DtoModels;
using Etherna.Beehive.Areas.Api.Services;
using Etherna.Beehive.Attributes;
using Etherna.BeeNet.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;

namespace Etherna.Beehive.Areas.Api.Controllers
Expand All @@ -35,6 +37,17 @@ public class BzzController(IBzzControllerService service)
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status404NotFound)]
public Task<IResult> DownloadBzzAsync(SwarmAddress address) =>
service.DownloadBzzAsync(address);
service.DownloadBzzAsync(address, HttpContext);

// Post.

[HttpPost]
[SimpleExceptionFilter]
[ProducesResponseType(typeof(ChunkReferenceDto), StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status402PaymentRequired)]
public Task<IResult> UploadBzzAsync(
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.UploadBzzAsync(batchId, HttpContext);
}
}
44 changes: 19 additions & 25 deletions src/Beehive/Areas/Api/Controllers/ChunksController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
// You should have received a copy of the GNU Affero General Public License along with Beehive.
// If not, see <https://www.gnu.org/licenses/>.

using Etherna.Beehive.Areas.Api.DtoModels;
using Etherna.Beehive.Areas.Api.Services;
using Etherna.Beehive.Attributes;
using Etherna.BeeNet.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System;
using System.IO;
using System.Linq;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;

namespace Etherna.Beehive.Areas.Api.Controllers
Expand All @@ -42,36 +42,30 @@ public Task<IResult> DownloadChunkAsync(SwarmHash hash) =>

// Post.

[HttpPost("~/chunks/bulk-upload")]
[HttpPost]
[SimpleExceptionFilter]
[ProducesResponseType(typeof(ChunkReferenceDto), StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status402PaymentRequired)]
public Task<IActionResult> UploadChunkAsync(
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.UploadChunkAsync(batchId, HttpContext);

[Obsolete("Used with BeeTurbo")]
[HttpPost("~/chunks/bulk-upload")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
public Task ChunksBulkUploadBeeTurboAsync() =>
ChunksBulkUploadAsync();
public Task BulkUploadChunksBeeTurboAsync(
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.BulkUploadChunksAsync(batchId, HttpContext);

[HttpPost("~/ev1/chunks/bulk-upload")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
public async Task ChunksBulkUploadAsync()
{
// Get headers.
HttpContext.Request.Headers.TryGetValue(
SwarmHttpConsts.SwarmPostageBatchId,
out var batchIdHeaderValue);
var batchId = PostageBatchId.FromString(batchIdHeaderValue.Single()!);

// Read payload.
await using var memoryStream = new MemoryStream();
await HttpContext.Request.Body.CopyToAsync(memoryStream);
var payload = memoryStream.ToArray();

// Invoke service.
var statusCode = await service.ChunksBulkUploadAsync(
batchId,
payload);
HttpContext.Response.StatusCode = statusCode;
}
public Task BulkUploadChunksAsync(
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.BulkUploadChunksAsync(batchId, HttpContext);
}
}
55 changes: 55 additions & 0 deletions src/Beehive/Areas/Api/Controllers/FeedsController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2021-present Etherna SA
// This file is part of Beehive.
//
// Beehive is free software: you can redistribute it and/or modify it under the terms of the
// GNU Affero General Public License as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// Beehive is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License along with Beehive.
// If not, see <https://www.gnu.org/licenses/>.

using Etherna.Beehive.Areas.Api.Services;
using Etherna.Beehive.Attributes;
using Etherna.BeeNet.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;

namespace Etherna.Beehive.Areas.Api.Controllers
{
[ApiController]
[Route("feeds")]
[Route("v{api-version:apiVersion}/feeds")]
public class FeedsController(IFeedsControllerService service)
: ControllerBase
{
// Get.

[HttpGet("{owner:length(40)}/{topic}")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status404NotFound)]
public Task<IResult> FindFeedUpdateAsync(string owner, string topic) =>
service.FindFeedUpdateAsync(owner, topic, HttpContext);

// Post.

[HttpPost("{owner:length(40)}/{topic}")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status401Unauthorized)]
[ProducesResponseType(StatusCodes.Status402PaymentRequired)]
public Task<IResult> CreateFeedRootManifestAsync(
string owner,
string topic,
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.CreateFeedRootManifestAsync(owner, topic, batchId, HttpContext);
}
}
44 changes: 44 additions & 0 deletions src/Beehive/Areas/Api/Controllers/PssController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021-present Etherna SA
// This file is part of Beehive.
//
// Beehive is free software: you can redistribute it and/or modify it under the terms of the
// GNU Affero General Public License as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// Beehive is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License along with Beehive.
// If not, see <https://www.gnu.org/licenses/>.

using Etherna.Beehive.Areas.Api.Services;
using Etherna.Beehive.Attributes;
using Etherna.BeeNet.Models;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Threading.Tasks;

namespace Etherna.Beehive.Areas.Api.Controllers
{
[ApiController]
[Route("pss")]
[Route("v{api-version:apiVersion}/pss")]
public class PssController(IPssControllerService service)
: ControllerBase
{
// Post.

[HttpPost("send/{topic}/{targets}")]
[SimpleExceptionFilter]
[ProducesResponseType(StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
[ProducesResponseType(StatusCodes.Status402PaymentRequired)]
public Task<IResult> SendPssMessageAsync(
string topic,
string targets,
[FromHeader(Name = SwarmHttpConsts.SwarmPostageBatchIdHeader), Required] PostageBatchId batchId) =>
service.SendPssMessageAsync(topic, targets, batchId, HttpContext);
}
}
Loading

0 comments on commit 2b82c31

Please sign in to comment.