Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dluc committed Nov 18, 2024
1 parent cd0d270 commit 87ef910
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 17 deletions.
4 changes: 2 additions & 2 deletions extensions/Anthropic/Client/RawAnthropicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ internal async IAsyncEnumerable<StreamingResponseMessage> CallClaudeStreamingAsy
if (!response.IsSuccessStatusCode)
{
var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
var isTransient = (new List<int> { 500, 502, 503, 504 }).Contains((int)response.StatusCode);
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}", isTransient: isTransient);
throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}",
isTransient: response.StatusCode.IsTransientError());

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Build (8.0.x, ubuntu-latest, Debug)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Build (8.0.x, ubuntu-latest, Debug)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (8.0.x, ubuntu-latest)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (8.0.x, ubuntu-latest)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Build (8.0.x, ubuntu-latest, Release)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Build (8.0.x, ubuntu-latest, Release)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (8.0.x, windows-latest)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'

Check failure on line 68 in extensions/Anthropic/Client/RawAnthropicClient.cs

View workflow job for this annotation

GitHub Actions / Unit Tests (8.0.x, windows-latest)

'HttpStatusCode' does not contain a definition for 'IsTransientError' and the best extension method overload 'HttpErrors.IsTransientError(HttpStatusCode?)' requires a receiver of type 'System.Net.HttpStatusCode?'
}

var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task<string> ExtractTextFromImageAsync(Stream imageContent, Cancell

return operationResponse.Value.Content;
}
catch (RequestFailedException e) when (e.Status is >= 400 and < 500)
catch (RequestFailedException e) when (HttpErrors.IsFatalError(e.Status))
{
throw new AzureAIDocIntelException(e.Message, e, isTransient: false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken can
{
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}
Expand All @@ -142,7 +142,7 @@ public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> t
IList<ReadOnlyMemory<float>> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
return embeddings.Select(e => new Embedding(e)).ToArray();
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
{
result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new AzureOpenAIException(e.Message, e, isTransient: false);
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false);
break;
case ResultType.UnrecoverableError:
case ResultType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId);
await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false);
break;
Expand Down
4 changes: 2 additions & 2 deletions extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Task<Embedding> GenerateEmbeddingAsync(string text, CancellationToken can
{
return this._client.GenerateEmbeddingAsync(text, cancellationToken);
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new OpenAIException(e.Message, e, isTransient: false);
}
Expand All @@ -143,7 +143,7 @@ public async Task<Embedding[]> GenerateEmbeddingBatchAsync(IEnumerable<string> t
var embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false);
return embeddings.Select(e => new Embedding(e)).ToArray();
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new OpenAIException(e.Message, e, isTransient: false);
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/OpenAI/OpenAI/OpenAITextGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public async IAsyncEnumerable<string> GenerateTextAsync(
{
result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken);
}
catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500)
catch (HttpOperationException e) when (e.StatusCode.IsFatalError())
{
throw new OpenAIException(e.Message, e, isTransient: false);
}
Expand Down
2 changes: 1 addition & 1 deletion extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
break;
case ResultType.UnrecoverableError:
case ResultType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false);
break;
Expand Down
49 changes: 49 additions & 0 deletions service/Abstractions/Diagnostics/HttpErrors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Net;

namespace Microsoft.KernelMemory.Diagnostics;

public static class HttpErrors
{
// Errors that might disappear by retrying
private static readonly HashSet<int> s_transientErrors =
[
(int)HttpStatusCode.InternalServerError,
(int)HttpStatusCode.BadGateway,
(int)HttpStatusCode.ServiceUnavailable,
(int)HttpStatusCode.GatewayTimeout,
(int)HttpStatusCode.InsufficientStorage
];

public static bool IsTransientError(this HttpStatusCode? statusCode)
{
return statusCode.HasValue && s_transientErrors.Contains((int)statusCode.Value);
}

public static bool IsTransientError(int statusCode)
{
return s_transientErrors.Contains(statusCode);
}

public static bool IsFatalError(this HttpStatusCode? statusCode)
{
return statusCode.HasValue && IsError(statusCode) && !IsTransientError(statusCode);
}

public static bool IsFatalError(int statusCode)
{
return IsError(statusCode) && !IsTransientError(statusCode);
}

private static bool IsError(this HttpStatusCode? statusCode)
{
return statusCode.HasValue && (int)statusCode.Value >= 400;
}

private static bool IsError(int statusCode)
{
return statusCode >= 400;
}
}
2 changes: 1 addition & 1 deletion service/Abstractions/Pipeline/ResultType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public enum ResultType
{
Success = 0,
TransientError = 1,
UnrecoverableError = 2,
FatalError = 2,
}
6 changes: 3 additions & 3 deletions service/Core/Pipeline/DistributedPipelineOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public override async Task AddHandlerAsync(
if (pipelinePointer == null)
{
this.Log.LogError("Pipeline pointer deserialization failed, queue `{0}`. Message discarded.", handler.StepName);
return ResultType.UnrecoverableError;
return ResultType.FatalError;
}
DataPipeline? pipeline;
Expand Down Expand Up @@ -121,7 +121,7 @@ public override async Task AddHandlerAsync(
}
this.Log.LogError("Pipeline `{0}/{1}` not found, cancelling step `{2}`", pipelinePointer.Index, pipelinePointer.DocumentId, handler.StepName);
return ResultType.UnrecoverableError;
return ResultType.FatalError;
}
catch (InvalidPipelineDataException)
{
Expand Down Expand Up @@ -232,7 +232,7 @@ private async Task<ResultType> RunPipelineStepAsync(
this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId);
break;

case ResultType.UnrecoverableError:
case ResultType.FatalError:
this.Log.LogError("Handler {0} failed to process pipeline {1} due to an unrecoverable error", currentStepName, pipeline.DocumentId);
break;

Expand Down
2 changes: 1 addition & 1 deletion service/Core/Pipeline/InProcessPipelineOrchestrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationT
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", isTransient: true);

case ResultType.UnrecoverableError:
case ResultType.FatalError:
this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}' due to an unrecoverable error", currentStepName, pipeline.Index, pipeline.DocumentId);
throw new OrchestrationException($"Unrecoverable pipeline error, step {currentStepName} failed and cannot be retried", isTransient: false);

Expand Down
2 changes: 1 addition & 1 deletion service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void OnDequeue(Func<string, Task<ResultType>> processMessageAction)
break;
case ResultType.UnrecoverableError:
case ResultType.FatalError:
this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.Id);
poison = true;
break;
Expand Down

0 comments on commit 87ef910

Please sign in to comment.