Skip to content

Commit

Permalink
Refactor ServiceFactory and Dependencies (#1560)
Browse files Browse the repository at this point in the history
* Update dependencies for OpenTelemetry

* Update service dependency logic and add first version of telemetry service

* Remove telemetry service and related code
  • Loading branch information
ogabrielluiz committed Mar 25, 2024
1 parent f672a11 commit e8630da
Show file tree
Hide file tree
Showing 17 changed files with 246 additions and 133 deletions.
36 changes: 36 additions & 0 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pyproject.toml
Expand Up @@ -105,6 +105,12 @@ python-socketio = "^5.11.0"
llama-index = "^0.10.13"
langchain-openai = "^0.0.5"
unstructured = { extras = ["md"], version = "^0.12.4" }
opentelemetry-api = "^1.23.0"
opentelemetry-sdk = "^1.23.0"
opentelemetry-exporter-otlp = "^1.23.0"
opentelemetry-instrumentation-fastapi = "^0.44b0"
opentelemetry-instrumentation-httpx = "^0.44b0"
opentelemetry-instrumentation-asgi = "^0.44b0"
dspy-ai = "^2.4.0"
crewai = "^0.22.5"
langchain-anthropic = "^0.1.4"
Expand Down
12 changes: 9 additions & 3 deletions src/backend/langflow/main.py
Expand Up @@ -20,7 +20,9 @@
def get_lifespan(fix_migration=False, socketio_server=None):
@asynccontextmanager
async def lifespan(app: FastAPI):
initialize_services(fix_migration=fix_migration, socketio_server=socketio_server)
initialize_services(
fix_migration=fix_migration, socketio_server=socketio_server
)
setup_llm_caching()
LangfuseInstance.update()
create_or_update_starter_projects()
Expand All @@ -34,7 +36,9 @@ def create_app():
"""Create the FastAPI app and include the router."""

configure()
socketio_server = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*", logger=True)
socketio_server = socketio.AsyncServer(
async_mode="asgi", cors_allowed_origins="*", logger=True
)
lifespan = get_lifespan(socketio_server=socketio_server)
app = FastAPI(lifespan=lifespan)
origins = ["*"]
Expand Down Expand Up @@ -101,7 +105,9 @@ def get_static_files_dir():
return frontend_path / "frontend"


def setup_app(static_files_dir: Optional[Path] = None, backend_only: bool = False) -> FastAPI:
def setup_app(
static_files_dir: Optional[Path] = None, backend_only: bool = False
) -> FastAPI:
"""Setup the FastAPI app."""
# get the directory of the current file
if not static_files_dir:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/langflow/services/cache/base.py
Expand Up @@ -6,7 +6,7 @@
from langflow.services.base import Service


class BaseCacheService(Service):
class CacheService(Service):
"""
Abstract base class for a cache.
"""
Expand Down
2 changes: 1 addition & 1 deletion src/backend/langflow/services/cache/factory.py
Expand Up @@ -15,7 +15,7 @@

class CacheServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(BaseCacheService)
super().__init__(CacheService)

def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a CacheService
Expand Down
27 changes: 21 additions & 6 deletions src/backend/langflow/services/cache/service.py
Expand Up @@ -68,7 +68,10 @@ def _get_without_lock(self, key):
Retrieve an item from the cache without acquiring the lock.
"""
if item := self._cache.get(key):
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time:
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
# Check if the value is pickled
Expand Down Expand Up @@ -113,7 +116,11 @@ def upsert(self, key, value, lock: Optional[threading.Lock] = None):
"""
with lock or self._lock:
existing_value = self._get_without_lock(key)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value

Expand Down Expand Up @@ -179,7 +186,7 @@ def __repr__(self):
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"


class RedisCache(BaseCacheService, Service):
class RedisCache(CacheService):
"""
A Redis-based cache implementation.
Expand All @@ -202,7 +209,9 @@ class RedisCache(BaseCacheService, Service):
b = cache["b"]
"""

def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60):
def __init__(
self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60
):
"""
Initialize a new RedisCache instance.
Expand Down Expand Up @@ -270,7 +279,9 @@ def set(self, key, value):
if not result:
raise ValueError("RedisCache could not set the value.")
except TypeError as exc:
raise TypeError("RedisCache only accepts values that can be pickled. ") from exc
raise TypeError(
"RedisCache only accepts values that can be pickled. "
) from exc

def upsert(self, key, value):
"""
Expand All @@ -282,7 +293,11 @@ def upsert(self, key, value):
value: The value to insert or update.
"""
existing_value = self.get(key)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value

Expand Down
7 changes: 4 additions & 3 deletions src/backend/langflow/services/deps.py
Expand Up @@ -4,7 +4,9 @@
from langflow.services import ServiceType, service_manager

if TYPE_CHECKING:
from langflow.services.cache.service import BaseCacheService
from sqlmodel import Session

from langflow.services.cache.service import CacheService
from langflow.services.chat.service import ChatService
from langflow.services.credentials.service import CredentialService
from langflow.services.database.service import DatabaseService
Expand All @@ -16,11 +18,10 @@
from langflow.services.storage.service import StorageService
from langflow.services.store.service import StoreService
from langflow.services.task.service import TaskService
from sqlmodel import Session


def get_socket_service() -> "SocketIOService":
return service_manager.get(ServiceType.SOCKET_IO_SERVICE) # type: ignore
return service_manager.get(ServiceType.SOCKETIO_SERVICE) # type: ignore


def get_storage_service() -> "StorageService":
Expand Down
85 changes: 82 additions & 3 deletions src/backend/langflow/services/factory.py
@@ -1,12 +1,91 @@
from typing import TYPE_CHECKING
import importlib
import inspect
from typing import TYPE_CHECKING, Type, get_type_hints

from cachetools import LRUCache, cached
from loguru import logger

from langflow.services.schema import ServiceType

if TYPE_CHECKING:
from langflow.services.base import Service


class ServiceFactory:
def __init__(self, service_class):
def __init__(
self,
service_class,
):
self.service_class = service_class
self.dependencies = infer_service_types(self, import_all_services_into_a_dict())

def create(self, *args, **kwargs) -> "Service":
raise NotImplementedError
raise self.service_class(*args, **kwargs)


def hash_factory(factory: ServiceFactory) -> str:
return factory.service_class.__name__


def hash_dict(d: dict) -> str:
return str(d)


def hash_infer_service_types_args(
factory_class: Type[ServiceFactory], available_services=None
) -> str:
factory_hash = hash_factory(factory_class)
services_hash = hash_dict(available_services)
return f"{factory_hash}_{services_hash}"


@cached(cache=LRUCache(maxsize=10), key=hash_infer_service_types_args)
def infer_service_types(
factory_class: Type[ServiceFactory], available_services=None
) -> "ServiceType":
create_method = factory_class.create
type_hints = get_type_hints(create_method, globalns=available_services)
service_types = []
for param_name, param_type in type_hints.items():
# Skip the return type if it's included in type hints
if param_name == "return":
continue

# Convert the type to the expected enum format directly without appending "_SERVICE"
type_name = param_type.__name__.upper().replace("SERVICE", "_SERVICE")

try:
# Attempt to find a matching enum value
service_type = ServiceType[type_name]
service_types.append(service_type)
except KeyError:
raise ValueError(
f"No matching ServiceType for parameter type: {param_type.__name__}"
)
return service_types


@cached(cache=LRUCache(maxsize=1))
def import_all_services_into_a_dict():
# Services are all in langflow.services.{service_name}.service
# and are subclass of Service
# We want to import all of them and put them in a dict
# to use as globals
from langflow.services.base import Service

services = {}
for service_type in ServiceType:
try:
service_name = ServiceType(service_type).value.replace("_service", "")
module_name = f"langflow.services.{service_name}.service"
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, Service) and obj is not Service:
services[name] = obj
break
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
f"Could not initialize services. Please check your settings."
) from exc
return services

0 comments on commit e8630da

Please sign in to comment.