Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ServiceFactory and Dependencies #1560

Merged
merged 4 commits into from Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 36 additions & 0 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pyproject.toml
Expand Up @@ -105,6 +105,12 @@ python-socketio = "^5.11.0"
llama-index = "^0.10.13"
langchain-openai = "^0.0.5"
unstructured = { extras = ["md"], version = "^0.12.4" }
opentelemetry-api = "^1.23.0"
opentelemetry-sdk = "^1.23.0"
opentelemetry-exporter-otlp = "^1.23.0"
opentelemetry-instrumentation-fastapi = "^0.44b0"
opentelemetry-instrumentation-httpx = "^0.44b0"
opentelemetry-instrumentation-asgi = "^0.44b0"
dspy-ai = "^2.4.0"
crewai = "^0.22.5"
langchain-anthropic = "^0.1.4"
Expand Down
12 changes: 9 additions & 3 deletions src/backend/langflow/main.py
Expand Up @@ -20,7 +20,9 @@
def get_lifespan(fix_migration=False, socketio_server=None):
@asynccontextmanager
async def lifespan(app: FastAPI):
initialize_services(fix_migration=fix_migration, socketio_server=socketio_server)
initialize_services(
fix_migration=fix_migration, socketio_server=socketio_server
)
setup_llm_caching()
LangfuseInstance.update()
create_or_update_starter_projects()
Expand All @@ -34,7 +36,9 @@ def create_app():
"""Create the FastAPI app and include the router."""

configure()
socketio_server = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*", logger=True)
socketio_server = socketio.AsyncServer(
async_mode="asgi", cors_allowed_origins="*", logger=True
)
lifespan = get_lifespan(socketio_server=socketio_server)
app = FastAPI(lifespan=lifespan)
origins = ["*"]
Expand Down Expand Up @@ -101,7 +105,9 @@ def get_static_files_dir():
return frontend_path / "frontend"


def setup_app(static_files_dir: Optional[Path] = None, backend_only: bool = False) -> FastAPI:
def setup_app(
static_files_dir: Optional[Path] = None, backend_only: bool = False
) -> FastAPI:
"""Setup the FastAPI app."""
# get the directory of the current file
if not static_files_dir:
Expand Down
2 changes: 1 addition & 1 deletion src/backend/langflow/services/cache/base.py
Expand Up @@ -6,7 +6,7 @@
from langflow.services.base import Service


class BaseCacheService(Service):
class CacheService(Service):
"""
Abstract base class for a cache.
"""
Expand Down
2 changes: 1 addition & 1 deletion src/backend/langflow/services/cache/factory.py
Expand Up @@ -15,7 +15,7 @@

class CacheServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(BaseCacheService)
super().__init__(CacheService)

def create(self, settings_service: "SettingsService"):
# Here you would have logic to create and configure a CacheService
Expand Down
27 changes: 21 additions & 6 deletions src/backend/langflow/services/cache/service.py
Expand Up @@ -68,7 +68,10 @@ def _get_without_lock(self, key):
Retrieve an item from the cache without acquiring the lock.
"""
if item := self._cache.get(key):
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time:
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
# Check if the value is pickled
Expand Down Expand Up @@ -113,7 +116,11 @@ def upsert(self, key, value, lock: Optional[threading.Lock] = None):
"""
with lock or self._lock:
existing_value = self._get_without_lock(key)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value

Expand Down Expand Up @@ -179,7 +186,7 @@ def __repr__(self):
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"


class RedisCache(BaseCacheService, Service):
class RedisCache(CacheService):
"""
A Redis-based cache implementation.

Expand All @@ -202,7 +209,9 @@ class RedisCache(BaseCacheService, Service):
b = cache["b"]
"""

def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60):
def __init__(
self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60
):
"""
Initialize a new RedisCache instance.

Expand Down Expand Up @@ -270,7 +279,9 @@ def set(self, key, value):
if not result:
raise ValueError("RedisCache could not set the value.")
except TypeError as exc:
raise TypeError("RedisCache only accepts values that can be pickled. ") from exc
raise TypeError(
"RedisCache only accepts values that can be pickled. "
) from exc

def upsert(self, key, value):
"""
Expand All @@ -282,7 +293,11 @@ def upsert(self, key, value):
value: The value to insert or update.
"""
existing_value = self.get(key)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value

Expand Down
7 changes: 4 additions & 3 deletions src/backend/langflow/services/deps.py
Expand Up @@ -4,7 +4,9 @@
from langflow.services import ServiceType, service_manager

if TYPE_CHECKING:
from langflow.services.cache.service import BaseCacheService
from sqlmodel import Session

from langflow.services.cache.service import CacheService
from langflow.services.chat.service import ChatService
from langflow.services.credentials.service import CredentialService
from langflow.services.database.service import DatabaseService
Expand All @@ -16,11 +18,10 @@
from langflow.services.storage.service import StorageService
from langflow.services.store.service import StoreService
from langflow.services.task.service import TaskService
from sqlmodel import Session


def get_socket_service() -> "SocketIOService":
return service_manager.get(ServiceType.SOCKET_IO_SERVICE) # type: ignore
return service_manager.get(ServiceType.SOCKETIO_SERVICE) # type: ignore


def get_storage_service() -> "StorageService":
Expand Down
85 changes: 82 additions & 3 deletions src/backend/langflow/services/factory.py
@@ -1,12 +1,91 @@
from typing import TYPE_CHECKING
import importlib
import inspect
from typing import TYPE_CHECKING, Type, get_type_hints

from cachetools import LRUCache, cached
from loguru import logger

from langflow.services.schema import ServiceType

if TYPE_CHECKING:
from langflow.services.base import Service


class ServiceFactory:
def __init__(self, service_class):
def __init__(
self,
service_class,
):
self.service_class = service_class
self.dependencies = infer_service_types(self, import_all_services_into_a_dict())

def create(self, *args, **kwargs) -> "Service":
raise NotImplementedError
raise self.service_class(*args, **kwargs)


def hash_factory(factory: ServiceFactory) -> str:
return factory.service_class.__name__


def hash_dict(d: dict) -> str:
return str(d)


def hash_infer_service_types_args(
factory_class: Type[ServiceFactory], available_services=None
) -> str:
factory_hash = hash_factory(factory_class)
services_hash = hash_dict(available_services)
return f"{factory_hash}_{services_hash}"


@cached(cache=LRUCache(maxsize=10), key=hash_infer_service_types_args)
def infer_service_types(
factory_class: Type[ServiceFactory], available_services=None
) -> "ServiceType":
create_method = factory_class.create
type_hints = get_type_hints(create_method, globalns=available_services)
service_types = []
for param_name, param_type in type_hints.items():
# Skip the return type if it's included in type hints
if param_name == "return":
continue

# Convert the type to the expected enum format directly without appending "_SERVICE"
type_name = param_type.__name__.upper().replace("SERVICE", "_SERVICE")

try:
# Attempt to find a matching enum value
service_type = ServiceType[type_name]
service_types.append(service_type)
except KeyError:
raise ValueError(
f"No matching ServiceType for parameter type: {param_type.__name__}"
)
return service_types


@cached(cache=LRUCache(maxsize=1))
def import_all_services_into_a_dict():
# Services are all in langflow.services.{service_name}.service
# and are subclass of Service
# We want to import all of them and put them in a dict
# to use as globals
from langflow.services.base import Service

services = {}
for service_type in ServiceType:
try:
service_name = ServiceType(service_type).value.replace("_service", "")
module_name = f"langflow.services.{service_name}.service"
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, Service) and obj is not Service:
services[name] = obj
break
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
f"Could not initialize services. Please check your settings."
) from exc
return services