Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix proxy) fix logic for caching virtual keys in memory / redis #7285

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 155 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,154 @@ jobs:
- store_test_results:
path: test-results

multi_instance_proxy_tests:
machine:
image: ubuntu-2204:2023.10.1
resource_class: xlarge
working_directory: ~/project
steps:
- checkout
- run:
name: Install Docker CLI
command: |
sudo apt-get update
sudo apt-get install -y docker-ce docker-ce-cli containerd.io
- run:
name: Install Python 3.9
command: |
curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh --output miniconda.sh
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
conda init bash
source ~/.bashrc
conda create -n myenv python=3.9 -y
conda activate myenv
python --version
- run:
name: Install Python Dependencies
command: |
pip install "pytest==7.3.1"
pip install "pytest-asyncio==0.21.1"
pip install aiohttp
python -m pip install --upgrade pip
python -m pip install -r .circleci/requirements.txt
pip install "pytest==7.3.1"
pip install "pytest-retry==1.6.3"
pip install "pytest-mock==3.12.0"
pip install "pytest-asyncio==0.21.1"
pip install mypy
pip install "google-generativeai==0.3.2"
pip install "google-cloud-aiplatform==1.43.0"
pip install pyarrow
pip install "boto3==1.34.34"
pip install "aioboto3==12.3.0"
pip install langchain
pip install "langfuse>=2.0.0"
pip install "logfire==0.29.0"
pip install numpydoc
pip install prisma
pip install fastapi
pip install jsonschema
pip install "httpx==0.24.1"
pip install "gunicorn==21.2.0"
pip install "anyio==3.7.1"
pip install "aiodynamo==23.10.1"
pip install "asyncio==3.4.3"
pip install "PyGithub==1.59.1"
pip install "openai==1.54.0"
- run:
name: Build Docker image
command: docker build -t my-app:latest -f ./docker/Dockerfile.database .
- run:
name: Run First Proxy Instance (Port 4000)
command: |
docker run -d \
-p 4000:4000 \
-e DATABASE_URL=$PROXY_DATABASE_URL \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e REDIS_PORT=$REDIS_PORT \
-e LITELLM_MASTER_KEY="sk-1234" \
-e OPENAI_API_KEY=$OPENAI_API_KEY \
-e LITELLM_LICENSE=$LITELLM_LICENSE \
-e OTEL_EXPORTER="in_memory" \
-e APORIA_API_BASE_2=$APORIA_API_BASE_2 \
-e APORIA_API_KEY_2=$APORIA_API_KEY_2 \
-e APORIA_API_BASE_1=$APORIA_API_BASE_1 \
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
-e AWS_REGION_NAME=$AWS_REGION_NAME \
-e APORIA_API_KEY_1=$APORIA_API_KEY_1 \
-e COHERE_API_KEY=$COHERE_API_KEY \
-e GCS_FLUSH_INTERVAL="1" \
--name proxy-instance-1 \
-v $(pwd)/litellm/proxy/example_config_yaml/otel_test_config.yaml:/app/config.yaml \
-v $(pwd)/litellm/proxy/example_config_yaml/custom_guardrail.py:/app/custom_guardrail.py \
my-app:latest \
--config /app/config.yaml \
--port 4000 \
--detailed_debug
- run:
name: Run Second Proxy Instance (Port 4001)
command: |
docker run -d \
-p 4001:4001 \
-e DATABASE_URL=$PROXY_DATABASE_URL \
-e REDIS_HOST=$REDIS_HOST \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e REDIS_PORT=$REDIS_PORT \
-e LITELLM_MASTER_KEY="sk-1234" \
-e OPENAI_API_KEY=$OPENAI_API_KEY \
-e LITELLM_LICENSE=$LITELLM_LICENSE \
-e OTEL_EXPORTER="in_memory" \
-e APORIA_API_BASE_2=$APORIA_API_BASE_2 \
-e APORIA_API_KEY_2=$APORIA_API_KEY_2 \
-e APORIA_API_BASE_1=$APORIA_API_BASE_1 \
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
-e AWS_REGION_NAME=$AWS_REGION_NAME \
-e APORIA_API_KEY_1=$APORIA_API_KEY_1 \
-e COHERE_API_KEY=$COHERE_API_KEY \
-e GCS_FLUSH_INTERVAL="1" \
--name proxy-instance-2 \
-v $(pwd)/litellm/proxy/example_config_yaml/otel_test_config.yaml:/app/config.yaml \
-v $(pwd)/litellm/proxy/example_config_yaml/custom_guardrail.py:/app/custom_guardrail.py \
my-app:latest \
--config /app/config.yaml \
--port 4001 \
--detailed_debug
- run:
name: Install curl and dockerize
command: |
sudo apt-get update
sudo apt-get install -y curl
sudo wget https://github.com/jwilder/dockerize/releases/download/v0.6.1/dockerize-linux-amd64-v0.6.1.tar.gz
sudo tar -C /usr/local/bin -xzvf dockerize-linux-amd64-v0.6.1.tar.gz
sudo rm dockerize-linux-amd64-v0.6.1.tar.gz
- run:
name: Start outputting logs for first instance
command: docker logs -f proxy-instance-1
background: true
- run:
name: Start outputting logs for second instance
command: docker logs -f proxy-instance-2
background: true
- run:
name: Wait for both instances to be ready
command: |
dockerize -wait http://localhost:4000 -timeout 5m
dockerize -wait http://localhost:4001 -timeout 5m
- run:
name: Run Multi-Instance Tests
command: |
pwd
ls
python -m pytest -vv tests/multi_instance_tests -x -s -v --junitxml=test-results/junit.xml --durations=5
no_output_timeout: 120m
# Store test results
- store_test_results:
path: test-results

upload-coverage:
docker:
- image: cimg/python:3.9
Expand Down Expand Up @@ -1582,6 +1730,12 @@ workflows:
only:
- main
- /litellm_.*/
- multi_instance_proxy_tests:
filters:
branches:
only:
- main
- /litellm_.*/
- upload-coverage:
requires:
- llm_translation_testing
Expand Down Expand Up @@ -1623,6 +1777,7 @@ workflows:
requires:
- local_testing
- build_and_test
- multi_instance_proxy_tests
- load_testing
- test_bad_database_url
- llm_translation_testing
Expand All @@ -1645,4 +1800,3 @@ workflows:
branches:
only:
- main

35 changes: 28 additions & 7 deletions litellm/caching/dual_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def __init__(
) -> None:
super().__init__()
# If in_memory_cache is not provided, use the default InMemoryCache
self.in_memory_cache = in_memory_cache or InMemoryCache()
self.in_memory_cache = in_memory_cache or InMemoryCache(
default_ttl=default_in_memory_ttl
)
# If redis_cache is not provided, use the default RedisCache
self.redis_cache = redis_cache
self.last_redis_batch_access_time = LimitedSizeOrderedDict(
Expand Down Expand Up @@ -299,40 +301,59 @@ async def async_batch_get_cache(
except Exception:
verbose_logger.error(traceback.format_exc())

async def async_set_cache(self, key, value, local_only: bool = False, **kwargs):
async def async_set_cache(
self, key, value, local_only: bool = False, keepttl: bool = False, **kwargs
):
print_verbose(
f"async set cache: cache key: {key}; local_only: {local_only}; value: {value}"
)
try:
if self.in_memory_cache is not None:
await self.in_memory_cache.async_set_cache(key, value, **kwargs)
await self.in_memory_cache.async_set_cache(
key, value, keepttl=keepttl, **kwargs
)

if self.redis_cache is not None and local_only is False:
await self.redis_cache.async_set_cache(key, value, **kwargs)
await self.redis_cache.async_set_cache(
key, value, keepttl=keepttl, **kwargs
)
except Exception as e:
verbose_logger.exception(
f"LiteLLM Cache: Excepton async add_cache: {str(e)}"
)

# async_batch_set_cache
async def async_set_cache_pipeline(
self, cache_list: list, local_only: bool = False, **kwargs
self,
cache_list: list,
local_only: bool = False,
keepttl: bool = False,
**kwargs,
):
"""
Batch write values to the cache

Args:
cache_list: list
local_only: bool = False
keepttl: bool. if True, retain the time to live associated with the key. (keepttl is a Redis parameter. we use the same parameter name as Redis)
**kwargs:
"""
print_verbose(
f"async batch set cache: cache keys: {cache_list}; local_only: {local_only}"
)
try:
if self.in_memory_cache is not None:
await self.in_memory_cache.async_set_cache_pipeline(
cache_list=cache_list, **kwargs
cache_list=cache_list, keepttl=keepttl, **kwargs
)

if self.redis_cache is not None and local_only is False:
await self.redis_cache.async_set_cache_pipeline(
cache_list=cache_list, ttl=kwargs.pop("ttl", None), **kwargs
cache_list=cache_list,
ttl=kwargs.pop("ttl", None),
keepttl=keepttl,
**kwargs,
)
except Exception as e:
verbose_logger.exception(
Expand Down
48 changes: 38 additions & 10 deletions litellm/caching/in_memory_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@

import json
import time
from typing import List, Optional
from typing import List, Optional, Union

from .base_cache import BaseCache

IN_MEMORY_CACHE_DEFAULT_TTL: int = 600
IN_MEMORY_CACHE_MAX_SIZE: int = 200


class InMemoryCache(BaseCache):
def __init__(
self,
max_size_in_memory: Optional[int] = 200,
default_ttl: Optional[
int
] = 600, # default ttl is 10 minutes. At maximum litellm rate limiting logic requires objects to be in memory for 1 minute
Union[int, float]
] = None, # default ttl is 10 minutes. At maximum litellm rate limiting logic requires objects to be in memory for 1 minute
):
"""
max_size_in_memory [int]: Maximum number of items in cache. done to prevent memory leaks. Use 200 items as a default
"""
self.max_size_in_memory = (
max_size_in_memory or 200
max_size_in_memory or IN_MEMORY_CACHE_MAX_SIZE
) # set an upper bound of 200 items in-memory
self.default_ttl = default_ttl or 600
self.default_ttl: int = int(default_ttl or IN_MEMORY_CACHE_DEFAULT_TTL)

# in-memory cache
self.cache_dict: dict = {}
Expand Down Expand Up @@ -57,26 +60,51 @@ def evict_cache(self):
# One of the most common causes of memory leaks in Python is the retention of objects that are no longer being used.
# This can occur when an object is referenced by another object, but the reference is never removed.

def set_cache(self, key, value, **kwargs):
def set_cache(self, key, value, keepttl: bool = False, **kwargs):
"""
Set cache value

Args:
key: str
value: Any
keepttl: bool. if True, retain the time to live associated with the key. (keepttl is a Redis parameter. we use the same parameter name as Redis)
**kwargs:
"""
if len(self.cache_dict) >= self.max_size_in_memory:
# only evict when cache is full
self.evict_cache()

self.cache_dict[key] = value
if "ttl" in kwargs and kwargs["ttl"] is not None:

if keepttl and key in self.ttl_dict:
pass
elif "ttl" in kwargs and kwargs["ttl"] is not None:
self.ttl_dict[key] = time.time() + kwargs["ttl"]
else:
self.ttl_dict[key] = time.time() + self.default_ttl

async def async_set_cache(self, key, value, **kwargs):
self.set_cache(key=key, value=value, **kwargs)

async def async_set_cache_pipeline(self, cache_list, ttl=None, **kwargs):
async def async_set_cache_pipeline(
self, cache_list, ttl: Optional[float] = None, keepttl: bool = False, **kwargs
):
"""
Use in-memory cache for bulk write operations

Args:
cache_list: List[Tuple[Any, Any]]
ttl: Optional[float] = None
keepttl: bool = False. if True, retain the time to live associated with the key. (keepttl is a Redis parameter. we use the same parameter name as Redis)
**kwargs:
"""
for cache_key, cache_value in cache_list:
if ttl is not None:
self.set_cache(key=cache_key, value=cache_value, ttl=ttl)
self.set_cache(
key=cache_key, value=cache_value, ttl=ttl, keepttl=keepttl
)
else:
self.set_cache(key=cache_key, value=cache_value)
self.set_cache(key=cache_key, value=cache_value, keepttl=keepttl)

async def async_set_cache_sadd(self, key, value: List, ttl: Optional[float]):
"""
Expand Down
Loading
Loading