Skip to content

Commit

Permalink
#861 Refactor ILogChronicle.GetFactsAsync(LogChronicleFactFilter)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhabis committed May 16, 2023
1 parent 7fc3959 commit 6ad62b8
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 13 deletions.
12 changes: 12 additions & 0 deletions src/Azos.Sky.Server/Chronicle/Server/ChronicleServerLogic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ public async Task WriteAsync(LogBatch data)
public async Task<IEnumerable<Message>> GetAsync(LogChronicleFilter filter)
=> await m_Log.NonNull().GetAsync(filter.NonNull(nameof(filter))).ConfigureAwait(false);

//20230515 DKh #861
public async Task<IEnumerable<Fact>> GetFactsAsync(LogChronicleFactFilter filter)
{
var messages = await m_Log.NonNull().GetAsync(filter.NonNull(nameof(filter)).LogFilter.NonNull(nameof(filter.LogFilter))).ConfigureAwait(false);

//Server-side convert to facts
var facts = messages.Select(one => ArchiveConventions.LogMessageToFact(one))
.ToArray();//materialize, as conversion is costly

return facts;
}

public async Task WriteAsync(InstrumentationBatch data)
=> await m_Instrumentation.NonNull().WriteAsync(data.NonNull(nameof(data))).ConfigureAwait(false);

Expand Down
13 changes: 13 additions & 0 deletions src/Azos.Sky.Server/Chronicle/Server/Web/ConsumptionControllers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ string esc(string s)
[ChroniclePermission(ChronicleAccessLevel.Browse)]
public async Task<object> Filter(LogChronicleFilter filter) => await ApplyFilterAsync(filter).ConfigureAwait(false);

[ApiEndpointDoc(Title = "Filter Facts",
Uri = "filter-facts",
Description = "Queries log chronicle by applying a structured filter `{@LogChronicleFilter}` and returning facts",
Methods = new[] { "POST = post filter object for query execution" },
RequestHeaders = new[] { API_DOC_HDR_ACCEPT_JSON },
ResponseHeaders = new[] { API_DOC_HDR_NO_CACHE },
RequestBody = "JSON representation of `{@LogChronicleFilter}`",
ResponseContent = "JSON filter result - enumerable of `{@Fact}`",
TypeSchemas = new[] { typeof(Fact) })]
[ActionOnPost(Name = "filter"), AcceptsJson]
[ChroniclePermission(ChronicleAccessLevel.Browse)]
public async Task<object> Filter_Facts(LogChronicleFactFilter filter) => await ApplyFilterAsync(filter).ConfigureAwait(false);

[ApiEndpointDoc(Title = "Batch",
Uri = "batch",
Description = "Uploads a batch of log messages to log chronicle using `{@LogBatch}`",
Expand Down
47 changes: 47 additions & 0 deletions src/Azos.Sky/Chronicle/ChronicleWebClientLogic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public async Task<IEnumerable<Message>> GetAsync(LogChronicleFilter filter)
=> await (filter.NonNull(nameof(filter)).CrossShard ? getCrossShard(filter)
: getOneShard(filter)).ConfigureAwait(false);

//20230515 DKh #861
public async Task<IEnumerable<Fact>> GetFactsAsync(LogChronicleFactFilter filter)
=> await (filter.NonNull(nameof(filter)).LogFilter.NonNull(nameof(filter.LogFilter)).CrossShard ? getFactsCrossShard(filter)
: getFactsOneShard(filter)).ConfigureAwait(false);

private async Task<IEnumerable<Message>> getCrossShard(LogChronicleFilter filter)
{
filter.CrossShard = false; //stop recursion, each shard should return just its own data
Expand Down Expand Up @@ -134,6 +139,48 @@ private async Task<IEnumerable<Message>> getOneShard(LogChronicleFilter filter)
return result;
}

private async Task<IEnumerable<Fact>> getFactsCrossShard(LogChronicleFactFilter filter)
{
filter.LogFilter.CrossShard = false; //stop recursion, each shard should return just its own data
var shards = m_Server.GetEndpointsForAllShards(LogServiceAddress, nameof(ILogChronicle));

var calls = shards.Select(shard => shard.Call((http, ct) => http.Client.PostAndGetJsonMapAsync("filter-facts", new { filter = filter })));

var responses = await Task.WhenAll(calls.Select(async call => {
try
{
return await call.ConfigureAwait(false);
}
catch (Exception error)
{
WriteLog(MessageType.Warning, nameof(getCrossShard), "Shard fetch error: " + error.ToMessageWithType(), error);
return null;
}
})).ConfigureAwait(false);

var result = responses.SelectMany(response => response.UnwrapPayloadArray()
.OfType<JsonDataMap>()
.Select(imap => JsonReader.ToDoc<Fact>(imap)))
.OrderBy(m => m.UtcTimestamp)
.ToArray();

return result;
}

private async Task<IEnumerable<Fact>> getFactsOneShard(LogChronicleFactFilter filter)
{
var response = await m_Server.Call(LogServiceAddress,
nameof(ILogChronicle),
new ShardKey(0u),
(http, ct) => http.Client.PostAndGetJsonMapAsync("filter-facts", new { filter = filter })).ConfigureAwait(false);

var result = response.UnwrapPayloadArray()
.OfType<JsonDataMap>()
.Select(imap => JsonReader.ToDoc<Fact>(imap));

return result;
}

public async Task WriteAsync(InstrumentationBatch data)
{
var response = await m_Server.Call(InstrumentationServiceAddress,
Expand Down
7 changes: 7 additions & 0 deletions src/Azos.Sky/Chronicle/ILogChronicle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public interface ILogChronicle
/// Gets chronicle (a list) of messages satisfying the supplied LogChronicleFilter object
/// </summary>
Task<IEnumerable<Message>> GetAsync(LogChronicleFilter filter);

//20230515 DKh #861
/// <summary>
/// Gets chronicle (a list) of facts extracted from log messages on the server satisfying
/// the supplied LogChronicleFilter object
/// </summary>
Task<IEnumerable<Fact>> GetFactsAsync(LogChronicleFactFilter filter);
}

/// <summary>
Expand Down
35 changes: 35 additions & 0 deletions src/Azos.Sky/Chronicle/LogChronicleFactFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*<FILE_LICENSE>
* Azos (A to Z Application Operating System) Framework
* The A to Z Foundation (a.k.a. Azist) licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
</FILE_LICENSE>*/

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

using Azos.Apps.Injection;
using Azos.Data;
using Azos.Data.AST;
using Azos.Data.Business;
using Azos.Log;
using Azos.Serialization.Bix;
using Azos.Time;

namespace Azos.Sky.Chronicle
{
[Bix("a06fb712-7e64-435e-8325-f28d67a544e3")]
[Schema(Description = "Provides model for filtering log chronicles for facts")]
public sealed class LogChronicleFactFilter : FilterModel<IEnumerable<Fact>>
{
[Field(required: true, description: "Log message filter which facts are based on")]
public LogChronicleFilter LogFilter { get; set; }

[InjectModule] ILogChronicle m_Chronicle;

protected async override Task<SaveResult<IEnumerable<Fact>>> DoSaveAsync()
=> new SaveResult<IEnumerable<Fact>>(await m_Chronicle.GetFactsAsync(this).ConfigureAwait(false));
}

}
27 changes: 14 additions & 13 deletions src/Azos/Log/ArchiveConventions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,27 +136,28 @@ public static string EncodeStructuredData(object data)
}

/// <summary>
/// Converts analytics fact data into <see cref="Message"/> suitable for processing in log archives/pipes, such as chronicle.
/// Returns a message, you must to call `msg.InitDefaultFields(m_App);` if you need default values
/// Establishes fact data protocol around passed parameters converting them into into
/// <see cref="Message"/> suitable for processing in log archives/pipes, such as chronicle.
/// Returns a message, you must call <see cref="Message.InitDefaultFields(IApplication)"/> if you need default values
/// </summary>
public static Message AnalyticsFactDataToLogMessage(Atom factType,
object dims,
object metrics,
int source = 0,
Guid rel = default(Guid),
string host = null,
MessageType messageType = MessageType.Info,
DateTime utcTimeStamp = default,
string topic = null,
Atom channel = default)
public static Message FactDataToLogMessage(Atom factType,
object dims,
object metrics,
int source = 0,
Guid rel = default(Guid),
string host = null,
MessageType messageType = MessageType.Info,
DateTime utcTimeStamp = default,
Atom topic = default,
Atom channel = default)
{
factType.HasRequiredValue(nameof(factType));

var msg = new Message
{
Channel = channel.Default(CoreConsts.LOG_CHANNEL_ANALYTICS),
Host = host,
Topic = topic.Default(CoreConsts.LOG_TOPIC),
Topic = topic.Value,//null for Atom.Zero
Type = messageType,
Source = source,
From = factType.Value,
Expand Down

0 comments on commit 6ad62b8

Please sign in to comment.