Improve Anthropic rate limit reporting
This commit is contained in:
@@ -20,6 +20,9 @@ pub enum ApiError {
|
||||
message: Option<String>,
|
||||
body: String,
|
||||
retryable: bool,
|
||||
request_id: Option<String>,
|
||||
retry_after: Option<Duration>,
|
||||
rate_limit_reset_after: Option<Duration>,
|
||||
},
|
||||
RetriesExhausted {
|
||||
attempts: u32,
|
||||
@@ -57,6 +60,15 @@ impl ApiError {
|
||||
| Self::BackoffOverflow { .. } => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn retry_after(&self) -> Option<Duration> {
|
||||
match self {
|
||||
Self::Api { retry_after, .. } => *retry_after,
|
||||
Self::RetriesExhausted { last_error, .. } => last_error.retry_after(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ApiError {
|
||||
@@ -85,12 +97,19 @@ impl Display for ApiError {
|
||||
error_type,
|
||||
message,
|
||||
body,
|
||||
request_id,
|
||||
retry_after,
|
||||
rate_limit_reset_after,
|
||||
..
|
||||
} => match (error_type, message) {
|
||||
(Some(error_type), Some(message)) => {
|
||||
write!(f, "api returned {status} ({error_type}): {message}")
|
||||
write!(f, "api returned {status} ({error_type}): {message}")?;
|
||||
write_api_error_hints(f, *status, request_id, *retry_after, *rate_limit_reset_after)
|
||||
}
|
||||
_ => {
|
||||
write!(f, "api returned {status}: {body}")?;
|
||||
write_api_error_hints(f, *status, request_id, *retry_after, *rate_limit_reset_after)
|
||||
}
|
||||
_ => write!(f, "api returned {status}: {body}"),
|
||||
},
|
||||
Self::RetriesExhausted {
|
||||
attempts,
|
||||
@@ -110,6 +129,26 @@ impl Display for ApiError {
|
||||
|
||||
impl std::error::Error for ApiError {}
|
||||
|
||||
fn write_api_error_hints(
|
||||
f: &mut Formatter<'_>,
|
||||
status: reqwest::StatusCode,
|
||||
request_id: &Option<String>,
|
||||
retry_after: Option<Duration>,
|
||||
rate_limit_reset_after: Option<Duration>,
|
||||
) -> std::fmt::Result {
|
||||
if let Some(request_id) = request_id {
|
||||
write!(f, " [request id {request_id}]")?;
|
||||
}
|
||||
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
|
||||
if let Some(retry_after) = retry_after {
|
||||
write!(f, " [retry after ~{}s]", retry_after.as_secs())?;
|
||||
} else if let Some(reset_after) = rate_limit_reset_after {
|
||||
write!(f, " [quota resets in ~{}s]", reset_after.as_secs())?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for ApiError {
|
||||
fn from(value: reqwest::Error) -> Self {
|
||||
Self::Http(value)
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use reqwest::header::HeaderMap;
|
||||
use runtime::format_usd;
|
||||
use runtime::{
|
||||
load_oauth_credentials, save_oauth_credentials, OAuthConfig, OAuthRefreshRequest,
|
||||
@@ -21,6 +22,10 @@ use crate::types::{MessageDeltaEvent, MessageRequest, MessageResponse, StreamEve
|
||||
pub const DEFAULT_BASE_URL: &str = "https://api.anthropic.com";
|
||||
const REQUEST_ID_HEADER: &str = "request-id";
|
||||
const ALT_REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const RETRY_AFTER_HEADER: &str = "retry-after";
|
||||
const RATE_LIMIT_RESET_HEADER: &str = "anthropic-ratelimit-unified-reset";
|
||||
const RATE_LIMIT_5H_RESET_HEADER: &str = "anthropic-ratelimit-unified-5h-reset";
|
||||
const RATE_LIMIT_7D_RESET_HEADER: &str = "anthropic-ratelimit-unified-7d-reset";
|
||||
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200);
|
||||
const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(2);
|
||||
const DEFAULT_MAX_RETRIES: u32 = 2;
|
||||
@@ -470,7 +475,11 @@ impl AnthropicClient {
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::time::sleep(self.backoff_for_attempt(attempts)?).await;
|
||||
let retry_delay = last_error
|
||||
.as_ref()
|
||||
.and_then(ApiError::retry_after)
|
||||
.unwrap_or(self.backoff_for_attempt(attempts)?);
|
||||
tokio::time::sleep(retry_delay).await;
|
||||
}
|
||||
|
||||
Err(ApiError::RetriesExhausted {
|
||||
@@ -826,6 +835,9 @@ async fn expect_success(response: reqwest::Response) -> Result<reqwest::Response
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
let request_id = request_id_from_headers(response.headers());
|
||||
let retry_after = retry_after_from_headers(response.headers());
|
||||
let rate_limit_reset_after = rate_limit_reset_after_from_headers(response.headers());
|
||||
let body = response.text().await.unwrap_or_else(|_| String::new());
|
||||
let parsed_error = serde_json::from_str::<AnthropicErrorEnvelope>(&body).ok();
|
||||
let retryable = is_retryable_status(status);
|
||||
@@ -840,9 +852,37 @@ async fn expect_success(response: reqwest::Response) -> Result<reqwest::Response
|
||||
.map(|error| error.error.message.clone()),
|
||||
body,
|
||||
retryable,
|
||||
request_id,
|
||||
retry_after,
|
||||
rate_limit_reset_after,
|
||||
})
|
||||
}
|
||||
|
||||
fn retry_after_from_headers(headers: &HeaderMap) -> Option<Duration> {
|
||||
headers
|
||||
.get(RETRY_AFTER_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.trim().parse::<f64>().ok())
|
||||
.filter(|seconds| *seconds >= 0.0)
|
||||
.map(Duration::from_secs_f64)
|
||||
}
|
||||
|
||||
fn rate_limit_reset_after_from_headers(headers: &HeaderMap) -> Option<Duration> {
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs();
|
||||
[
|
||||
RATE_LIMIT_RESET_HEADER,
|
||||
RATE_LIMIT_5H_RESET_HEADER,
|
||||
RATE_LIMIT_7D_RESET_HEADER,
|
||||
]
|
||||
.into_iter()
|
||||
.filter_map(|name| headers.get(name))
|
||||
.filter_map(|value| value.to_str().ok())
|
||||
.filter_map(|value| value.trim().parse::<u64>().ok())
|
||||
.filter(|epoch| *epoch > now)
|
||||
.min()
|
||||
.map(|epoch| Duration::from_secs(epoch - now))
|
||||
}
|
||||
|
||||
const fn is_retryable_status(status: reqwest::StatusCode) -> bool {
|
||||
matches!(status.as_u16(), 408 | 409 | 429 | 500 | 502 | 503 | 504)
|
||||
}
|
||||
@@ -1246,6 +1286,40 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry_after_prefers_header_seconds() {
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
headers.insert(super::RETRY_AFTER_HEADER, "7".parse().expect("header"));
|
||||
assert_eq!(
|
||||
super::retry_after_from_headers(&headers),
|
||||
Some(Duration::from_secs(7))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limit_reset_uses_soonest_future_reset_header() {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("time")
|
||||
.as_secs();
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
headers.insert(
|
||||
super::RATE_LIMIT_7D_RESET_HEADER,
|
||||
(now + 3600).to_string().parse().expect("header"),
|
||||
);
|
||||
headers.insert(
|
||||
super::RATE_LIMIT_5H_RESET_HEADER,
|
||||
(now + 30).to_string().parse().expect("header"),
|
||||
);
|
||||
headers.insert(
|
||||
super::RATE_LIMIT_RESET_HEADER,
|
||||
(now + 90).to_string().parse().expect("header"),
|
||||
);
|
||||
let reset_after = super::rate_limit_reset_after_from_headers(&headers)
|
||||
.expect("reset after should parse");
|
||||
assert!(reset_after.as_secs() <= 30);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_source_applies_headers() {
|
||||
let auth = AuthSource::ApiKeyAndBearer {
|
||||
|
||||
@@ -918,6 +918,9 @@ async fn expect_success(response: reqwest::Response) -> Result<reqwest::Response
|
||||
.and_then(|error| error.error.message.clone()),
|
||||
body,
|
||||
retryable,
|
||||
request_id: None,
|
||||
retry_after: None,
|
||||
rate_limit_reset_after: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user