Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dluc committed Nov 18, 2024
1 parent c57657b commit cd0d270
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 67 deletions.
3 changes: 2 additions & 1 deletion extensions/Anthropic/Client/RawAnthropicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ internal async IAsyncEnumerable<StreamingResponseMessage> CallClaudeStreamingAsy
if (!response.IsSuccessStatusCode)
{
var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}");
var isTransient = (new List<int> { 500, 502, 503, 504 }).Contains((int)response.StatusCode);
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}", isTransient: isTransient);
}

var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public async Task<string> ExtractTextFromImageAsync(Stream imageContent, Cancell
}
catch (RequestFailedException e) when (e.Status is >= 400 and < 500)
{
throw new NonRetriableException(e.Message, e);
throw new AzureAIDocIntelException(e.Message, e, isTransient: false);
}
}
}
26 changes: 26 additions & 0 deletions extensions/AzureAIDocIntel/AzureAIDocIntelException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.DataFormats.AzureAIDocIntel;

public class AzureAIDocIntelException : KernelMemoryException
{
/// <inheritdoc />
public AzureAIDocIntelException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAIDocIntelException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAIDocIntelException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ namespace Microsoft.KernelMemory.MemoryDb.AzureAISearch;
public class AzureAISearchMemoryException : KernelMemoryException
{
/// <inheritdoc />
public AzureAISearchMemoryException()
public AzureAISearchMemoryException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAISearchMemoryException(string? message) : base(message)
public AzureAISearchMemoryException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureAISearchMemoryException(string? message, Exception? innerException) : base(message, innerException)
public AzureAISearchMemoryException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,41 @@ public void Validate(bool vectorSizeRequired = false)
{
if (this.Fields.Count == 0)
{
throw new KernelMemoryException("The schema is empty");
throw new AzureAISearchMemoryException("The schema is empty", isTransient: false);
}

if (this.Fields.All(x => x.Type != MemoryDbField.FieldType.Vector))
{
throw new KernelMemoryException("The schema doesn't contain a vector field");
throw new AzureAISearchMemoryException("The schema doesn't contain a vector field", isTransient: false);
}

int keys = this.Fields.Count(x => x.IsKey);
switch (keys)
{
case 0:
throw new KernelMemoryException("The schema doesn't contain a key field");
throw new AzureAISearchMemoryException("The schema doesn't contain a key field", isTransient: false);
case > 1:
throw new KernelMemoryException("The schema cannot contain more than one key");
throw new AzureAISearchMemoryException("The schema cannot contain more than one key", isTransient: false);
}

if (vectorSizeRequired && this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, VectorSize: 0 }))
{
throw new KernelMemoryException("Vector fields must have a size greater than zero defined");
throw new AzureAISearchMemoryException("Vector fields must have a size greater than zero defined", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Bool, IsKey: true }))
{
throw new KernelMemoryException("Boolean fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Boolean fields cannot be used as unique keys", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.ListOfStrings, IsKey: true }))
{
throw new KernelMemoryException("Collection fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Collection fields cannot be used as unique keys", isTransient: false);
}

if (this.Fields.Any(x => x is { Type: MemoryDbField.FieldType.Vector, IsKey: true }))
{
throw new KernelMemoryException("Vector fields cannot be used as unique keys");
throw new AzureAISearchMemoryException("Vector fields cannot be used as unique keys", isTransient: false);
}
}
}
26 changes: 26 additions & 0 deletions extensions/AzureOpenAI/AzureOpenAI/AzureOpenAIException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.AI.AzureOpenAI;

public class AzureOpenAIException : KernelMemoryException
{
/// <inheritdoc />
public AzureOpenAIException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureOpenAIException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public AzureOpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken can
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}
}

Expand All @@ -144,7 +144,7 @@ public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> t
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}

await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken))
Expand Down
6 changes: 3 additions & 3 deletions extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,14 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
await this.DeleteMessageAsync(message, cancellationToken: default).ConfigureAwait(false);
break;
case ResultType.RetriableError:
case ResultType.TransientError:
var backoffDelay = TimeSpan.FromSeconds(1 * message.DequeueCount);
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue with a delay of {1} msecs",
message.MessageId, backoffDelay.TotalMilliseconds);
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
break;
case ResultType.NonRetriableError:
case ResultType.UnrecoverableError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
break;
Expand All @@ -224,7 +224,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
}
}
catch (NonRetriableException e)
catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value)
{
this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
Expand Down
26 changes: 26 additions & 0 deletions extensions/OpenAI/OpenAI/OpenAIException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Microsoft. All rights reserved.

using System;

namespace Microsoft.KernelMemory.AI.OpenAI;

public class OpenAIException : KernelMemoryException
{
/// <inheritdoc />
public OpenAIException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OpenAIException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OpenAIException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
4 changes: 2 additions & 2 deletions extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken can
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new OpenAIException(e.Message, e, isTransient: false);
}
}

Expand All @@ -145,7 +145,7 @@ public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> t
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new OpenAIException(e.Message, e, isTransient: false);
}
}
}
2 changes: 1 addition & 1 deletion extensions/OpenAI/OpenAI/OpenAITextGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
{
throw new NonRetriableException(e.Message, e);
throw new OpenAIException(e.Message, e, isTransient: false);
}

await foreach (StreamingTextContent x in result.WithCancellation(cancellationToken))
Expand Down
2 changes: 1 addition & 1 deletion extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static async Task Main()
{
Console.WriteLine($"{++counter} Received message: {msg}");
await Task.Delay(0);
return ResultType.RetriableError;
return ResultType.TransientError;
});

await pipeline.ConnectToQueueAsync(QueueName, QueueOptions.PubSub);
Expand Down
6 changes: 3 additions & 3 deletions extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
this._channel.BasicAck(args.DeliveryTag, multiple: false);
break;
case ResultType.RetriableError:
case ResultType.TransientError:
if (attemptNumber < this._maxAttempts)
{
this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue",
Expand All @@ -220,7 +220,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
break;
case ResultType.NonRetriableError:
case ResultType.UnrecoverableError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false);
break;
Expand All @@ -229,7 +229,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
throw new ArgumentOutOfRangeException($"Unknown {resultType:G} result");
}
}
catch (NonRetriableException e)
catch (KernelMemoryException e) when (e.IsTransient.HasValue && !e.IsTransient.Value)
{
this._log.LogError(e, "Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false);
Expand Down
14 changes: 11 additions & 3 deletions service/Abstractions/KernelMemoryException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,35 @@ namespace Microsoft.KernelMemory;
/// </summary>
public class KernelMemoryException : Exception
{
public bool? IsTransient { get; protected init; } = null;

/// <summary>
/// Initializes a new instance of the <see cref="KernelMemoryException"/> class with a default message.
/// </summary>
public KernelMemoryException()
/// <param name="isTransient">Optional parameter to indicate if the error is temporary and might disappear by retrying.</param>
public KernelMemoryException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <summary>
/// Initializes a new instance of the <see cref="KernelMemoryException"/> class with its message set to <paramref name="message"/>.
/// </summary>
/// <param name="message">A string that describes the error.</param>
public KernelMemoryException(string? message) : base(message)
/// <param name="isTransient">Optional parameter to indicate if the error is temporary and might disappear by retrying.</param>
public KernelMemoryException(string? message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <summary>
/// Initializes a new instance of the <see cref="KernelMemoryException"/> class with its message set to <paramref name="message"/>.
/// </summary>
/// <param name="message">A string that describes the error.</param>
/// <param name="innerException">The exception that is the cause of the current exception.</param>
public KernelMemoryException(string? message, Exception? innerException) : base(message, innerException)
/// <param name="isTransient">Optional parameter to indicate if the error is temporary and might disappear by retrying.</param>
public KernelMemoryException(string? message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
20 changes: 0 additions & 20 deletions service/Abstractions/Pipeline/NonRetriableException.cs

This file was deleted.

15 changes: 12 additions & 3 deletions service/Abstractions/Pipeline/OrchestrationException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ namespace Microsoft.KernelMemory.Pipeline;
public class OrchestrationException : KernelMemoryException
{
/// <inheritdoc />
public OrchestrationException() { }
public OrchestrationException(bool? isTransient = null)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OrchestrationException(string message) : base(message) { }
public OrchestrationException(string message, bool? isTransient = null) : base(message)
{
this.IsTransient = isTransient;
}

/// <inheritdoc />
public OrchestrationException(string message, Exception? innerException) : base(message, innerException) { }
public OrchestrationException(string message, Exception? innerException, bool? isTransient = null) : base(message, innerException)
{
this.IsTransient = isTransient;
}
}
4 changes: 2 additions & 2 deletions service/Abstractions/Pipeline/ResultType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Microsoft.KernelMemory.Pipeline;
public enum ResultType
{
Success = 0,
RetriableError = 1,
NonRetriableError = 2,
TransientError = 1,
UnrecoverableError = 2,
}
Loading

0 comments on commit cd0d270

Please sign in to comment.