diff --git a/extensions/Anthropic/Client/RawAnthropicClient.cs b/extensions/Anthropic/Client/RawAnthropicClient.cs index a0b1249b8..d24945e8c 100644 --- a/extensions/Anthropic/Client/RawAnthropicClient.cs +++ b/extensions/Anthropic/Client/RawAnthropicClient.cs @@ -64,8 +64,8 @@ internal async IAsyncEnumerable CallClaudeStreamingAsy if (!response.IsSuccessStatusCode) { var responseError = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); - var isTransient = (new List { 500, 502, 503, 504 }).Contains((int)response.StatusCode); - throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}", isTransient: isTransient); + throw new KernelMemoryException($"Failed to send request: {response.StatusCode} - {responseError}", + isTransient: response.StatusCode.IsTransientError()); } var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); diff --git a/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs b/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs index 7b5cdb262..0f28ae663 100644 --- a/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs +++ b/extensions/AzureAIDocIntel/AzureAIDocIntelEngine.cs @@ -68,7 +68,7 @@ public async Task ExtractTextFromImageAsync(Stream imageContent, Cancell return operationResponse.Value.Content; } - catch (RequestFailedException e) when (e.Status is >= 400 and < 500) + catch (RequestFailedException e) when (HttpErrors.IsFatalError(e.Status)) { throw new AzureAIDocIntelException(e.Message, e, isTransient: false); } diff --git a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs index 51631ce8f..f600c7dd2 100644 --- a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs +++ b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextEmbeddingGenerator.cs @@ -126,7 +126,7 @@ public Task GenerateEmbeddingAsync(string text, CancellationToken can { return this._client.GenerateEmbeddingAsync(text, cancellationToken); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new AzureOpenAIException(e.Message, e, isTransient: false); } @@ -142,7 +142,7 @@ public async Task GenerateEmbeddingBatchAsync(IEnumerable t IList> embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); return embeddings.Select(e => new Embedding(e)).ToArray(); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new AzureOpenAIException(e.Message, e, isTransient: false); } diff --git a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs index 3a38238c2..47896fd0e 100644 --- a/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs +++ b/extensions/AzureOpenAI/AzureOpenAI/AzureOpenAITextGenerator.cs @@ -146,7 +146,7 @@ public async IAsyncEnumerable GenerateTextAsync( { result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new AzureOpenAIException(e.Message, e, isTransient: false); } diff --git a/extensions/AzureQueues/AzureQueuesPipeline.cs b/extensions/AzureQueues/AzureQueuesPipeline.cs index 27e8183c8..be4d2282c 100644 --- a/extensions/AzureQueues/AzureQueuesPipeline.cs +++ b/extensions/AzureQueues/AzureQueuesPipeline.cs @@ -209,7 +209,7 @@ public void OnDequeue(Func> processMessageAction) await this.UnlockMessageAsync(message, backoffDelay, cancellationToken: default).ConfigureAwait(false); break; - case ResultType.UnrecoverableError: + case ResultType.FatalError: this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.MessageId); await this.MoveMessageToPoisonQueueAsync(message, cancellationToken: default).ConfigureAwait(false); break; diff --git a/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs b/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs index 0ea58cc55..d36423abb 100644 --- a/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs +++ b/extensions/OpenAI/OpenAI/OpenAITextEmbeddingGenerator.cs @@ -127,7 +127,7 @@ public Task GenerateEmbeddingAsync(string text, CancellationToken can { return this._client.GenerateEmbeddingAsync(text, cancellationToken); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new OpenAIException(e.Message, e, isTransient: false); } @@ -143,7 +143,7 @@ public async Task GenerateEmbeddingBatchAsync(IEnumerable t var embeddings = await this._client.GenerateEmbeddingsAsync(list, cancellationToken: cancellationToken).ConfigureAwait(false); return embeddings.Select(e => new Embedding(e)).ToArray(); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new OpenAIException(e.Message, e, isTransient: false); } diff --git a/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs b/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs index caccc43df..ef069df88 100644 --- a/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs +++ b/extensions/OpenAI/OpenAI/OpenAITextGenerator.cs @@ -146,7 +146,7 @@ public async IAsyncEnumerable GenerateTextAsync( { result = this._client.GetStreamingTextContentsAsync(prompt, skOptions, cancellationToken: cancellationToken); } - catch (HttpOperationException e) when (e.StatusCode.HasValue && (int)e.StatusCode >= 400 && (int)e.StatusCode < 500) + catch (HttpOperationException e) when (e.StatusCode.IsFatalError()) { throw new OpenAIException(e.Message, e, isTransient: false); } diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs index 63af63a39..3168e66a5 100644 --- a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs +++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs @@ -220,7 +220,7 @@ public void OnDequeue(Func> processMessageAction) this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true); break; - case ResultType.UnrecoverableError: + case ResultType.FatalError: this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", args.BasicProperties?.MessageId); this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: false); break; diff --git a/service/Abstractions/Diagnostics/HttpErrors.cs b/service/Abstractions/Diagnostics/HttpErrors.cs new file mode 100644 index 000000000..7d52e9718 --- /dev/null +++ b/service/Abstractions/Diagnostics/HttpErrors.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Net; + +namespace Microsoft.KernelMemory.Diagnostics; + +public static class HttpErrors +{ + // Errors that might disappear by retrying + private static readonly HashSet s_transientErrors = + [ + (int)HttpStatusCode.InternalServerError, + (int)HttpStatusCode.BadGateway, + (int)HttpStatusCode.ServiceUnavailable, + (int)HttpStatusCode.GatewayTimeout, + (int)HttpStatusCode.InsufficientStorage + ]; + + public static bool IsTransientError(this HttpStatusCode statusCode) + { + return s_transientErrors.Contains((int)statusCode); + } + + public static bool IsTransientError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && s_transientErrors.Contains((int)statusCode.Value); + } + + public static bool IsTransientError(int statusCode) + { + return s_transientErrors.Contains(statusCode); + } + + public static bool IsFatalError(this HttpStatusCode statusCode) + { + return IsError(statusCode) && !IsTransientError(statusCode); + } + + public static bool IsFatalError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && IsError(statusCode) && !IsTransientError(statusCode); + } + + public static bool IsFatalError(int statusCode) + { + return IsError(statusCode) && !IsTransientError(statusCode); + } + + private static bool IsError(this HttpStatusCode? statusCode) + { + return statusCode.HasValue && (int)statusCode.Value >= 400; + } + + private static bool IsError(int statusCode) + { + return statusCode >= 400; + } +} diff --git a/service/Abstractions/Pipeline/ResultType.cs b/service/Abstractions/Pipeline/ResultType.cs index 09064614a..b949fe2ca 100644 --- a/service/Abstractions/Pipeline/ResultType.cs +++ b/service/Abstractions/Pipeline/ResultType.cs @@ -6,5 +6,5 @@ public enum ResultType { Success = 0, TransientError = 1, - UnrecoverableError = 2, + FatalError = 2, } diff --git a/service/Core/Pipeline/DistributedPipelineOrchestrator.cs b/service/Core/Pipeline/DistributedPipelineOrchestrator.cs index 8c25592e8..98214ab5b 100644 --- a/service/Core/Pipeline/DistributedPipelineOrchestrator.cs +++ b/service/Core/Pipeline/DistributedPipelineOrchestrator.cs @@ -93,7 +93,7 @@ public override async Task AddHandlerAsync( if (pipelinePointer == null) { this.Log.LogError("Pipeline pointer deserialization failed, queue `{0}`. Message discarded.", handler.StepName); - return ResultType.UnrecoverableError; + return ResultType.FatalError; } DataPipeline? pipeline; @@ -121,7 +121,7 @@ public override async Task AddHandlerAsync( } this.Log.LogError("Pipeline `{0}/{1}` not found, cancelling step `{2}`", pipelinePointer.Index, pipelinePointer.DocumentId, handler.StepName); - return ResultType.UnrecoverableError; + return ResultType.FatalError; } catch (InvalidPipelineDataException) { @@ -232,7 +232,7 @@ private async Task RunPipelineStepAsync( this.Log.LogError("Handler {0} failed to process pipeline {1}", currentStepName, pipeline.DocumentId); break; - case ResultType.UnrecoverableError: + case ResultType.FatalError: this.Log.LogError("Handler {0} failed to process pipeline {1} due to an unrecoverable error", currentStepName, pipeline.DocumentId); break; diff --git a/service/Core/Pipeline/InProcessPipelineOrchestrator.cs b/service/Core/Pipeline/InProcessPipelineOrchestrator.cs index 40df40790..c8dbc9a95 100644 --- a/service/Core/Pipeline/InProcessPipelineOrchestrator.cs +++ b/service/Core/Pipeline/InProcessPipelineOrchestrator.cs @@ -189,7 +189,7 @@ public override async Task RunPipelineAsync(DataPipeline pipeline, CancellationT this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}'", currentStepName, pipeline.Index, pipeline.DocumentId); throw new OrchestrationException($"Pipeline error, step {currentStepName} failed", isTransient: true); - case ResultType.UnrecoverableError: + case ResultType.FatalError: this.Log.LogError("Handler '{0}' failed to process pipeline '{1}/{2}' due to an unrecoverable error", currentStepName, pipeline.Index, pipeline.DocumentId); throw new OrchestrationException($"Unrecoverable pipeline error, step {currentStepName} failed and cannot be retried", isTransient: false); diff --git a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs index 0d6a0156e..8fe2ed713 100644 --- a/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs +++ b/service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs @@ -216,7 +216,7 @@ public void OnDequeue(Func> processMessageAction) break; - case ResultType.UnrecoverableError: + case ResultType.FatalError: this._log.LogError("Message '{0}' failed to process due to a non-recoverable error, moving to poison queue", message.Id); poison = true; break;