From 2209184601f5c62a77db8e527a9a76b9270429a8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 09:32:21 -0300 Subject: [PATCH 1/3] Update dependencies for OpenTelemetry --- poetry.lock | 61 +++++++++++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 6 +++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 2ec2d889e2..c53930bb02 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4943,6 +4943,21 @@ files = [ deprecated = ">=1.2.6" importlib-metadata = ">=6.0,<7.0" +[[package]] +name = "opentelemetry-exporter-otlp" +version = "1.23.0" +description = "OpenTelemetry Collector Exporters" +optional = false +python-versions = ">=3.8" +files = [ + {file = "opentelemetry_exporter_otlp-1.23.0-py3-none-any.whl", hash = "sha256:92371fdc8d7803465a45801fe30cd8c522ef355a385b0a1d5346d32f77511ea2"}, + {file = "opentelemetry_exporter_otlp-1.23.0.tar.gz", hash = "sha256:4af8798f9bc3bddb92fcbb5b4aa9d0e955d962aa1d9bceaab08891c355a9f907"}, +] + +[package.dependencies] +opentelemetry-exporter-otlp-proto-grpc = "1.23.0" +opentelemetry-exporter-otlp-proto-http = "1.23.0" + [[package]] name = "opentelemetry-exporter-otlp-proto-common" version = "1.23.0" @@ -4980,6 +4995,29 @@ opentelemetry-sdk = ">=1.23.0,<1.24.0" [package.extras] test = ["pytest-grpc"] +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.23.0" +description = "OpenTelemetry Collector Protobuf over HTTP Exporter" +optional = false +python-versions = ">=3.8" +files = [ + {file = "opentelemetry_exporter_otlp_proto_http-1.23.0-py3-none-any.whl", hash = "sha256:ad853b58681df8efcb2cfc93be2b5fd86351c99ff4ab47dc917da384b8650d91"}, + {file = "opentelemetry_exporter_otlp_proto_http-1.23.0.tar.gz", hash = "sha256:088eac2320f4a604e2d9ff71aced71fdae601ac6457005fb0303d6bbbf44e6ca"}, +] + +[package.dependencies] +deprecated = ">=1.2.6" +googleapis-common-protos = ">=1.52,<2.0" +opentelemetry-api = ">=1.15,<2.0" +opentelemetry-exporter-otlp-proto-common = "1.23.0" +opentelemetry-proto = "1.23.0" +opentelemetry-sdk = ">=1.23.0,<1.24.0" +requests = ">=2.7,<3.0" + +[package.extras] +test = ["responses (>=0.22.0,<0.25)"] + [[package]] name = "opentelemetry-instrumentation" version = "0.44b0" @@ -5040,6 +5078,27 @@ opentelemetry-util-http = "0.44b0" instruments = ["fastapi (>=0.58,<1.0)"] test = ["httpx (>=0.22,<1.0)", "opentelemetry-instrumentation-fastapi[instruments]", "opentelemetry-test-utils (==0.44b0)", "requests (>=2.23,<3.0)"] +[[package]] +name = "opentelemetry-instrumentation-httpx" +version = "0.44b0" +description = "OpenTelemetry HTTPX Instrumentation" +optional = false +python-versions = ">=3.8" +files = [ + {file = "opentelemetry_instrumentation_httpx-0.44b0-py3-none-any.whl", hash = "sha256:a4f1121b6212b018e719ef6a9a2f8317c329edd01a61452b7250f574f7d95a91"}, + {file = "opentelemetry_instrumentation_httpx-0.44b0.tar.gz", hash = "sha256:6cc81c4182f54dfb0d15774e3e48bb90d3ed44e9ad8bf5eef2a64a7197f945d8"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.44b0" +opentelemetry-semantic-conventions = "0.44b0" +opentelemetry-util-http = "0.44b0" + +[package.extras] +instruments = ["httpx (>=0.18.0)"] +test = ["opentelemetry-instrumentation-httpx[instruments]", "opentelemetry-sdk (>=1.12,<2.0)", "opentelemetry-test-utils (==0.44b0)"] + [[package]] name = "opentelemetry-proto" version = "1.23.0" @@ -9388,4 +9447,4 @@ local = ["ctransformers", "llama-cpp-python", "sentence-transformers"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "524743a03769e472c83325641463bddb09f51dfc6ad50f8454c204bba8f484b5" +content-hash = "d941419d3946450bac60d834576e6a13f40a51fb758d3520cf14874c134f3e32" diff --git a/pyproject.toml b/pyproject.toml index c0b41cec76..2ace6f25e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,6 +106,12 @@ python-socketio = "^5.11.0" llama-index = "^0.10.13" langchain-openai = "^0.0.6" unstructured = { extras = ["md"], version = "^0.12.4" } +opentelemetry-api = "^1.23.0" +opentelemetry-sdk = "^1.23.0" +opentelemetry-exporter-otlp = "^1.23.0" +opentelemetry-instrumentation-fastapi = "^0.44b0" +opentelemetry-instrumentation-httpx = "^0.44b0" +opentelemetry-instrumentation-asgi = "^0.44b0" [tool.poetry.group.dev.dependencies] pytest-asyncio = "^0.23.1" From 4e8c5f79c261eecfc8160aeee11af2e2a6e88a2d Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 11:00:14 -0300 Subject: [PATCH 2/3] Update service dependency logic and add first version of telemetry service --- src/backend/langflow/main.py | 14 ++- src/backend/langflow/services/cache/base.py | 2 +- .../langflow/services/cache/factory.py | 8 +- .../langflow/services/cache/service.py | 33 +++++-- src/backend/langflow/services/deps.py | 9 +- src/backend/langflow/services/factory.py | 85 ++++++++++++++++- src/backend/langflow/services/manager.py | 69 +++++--------- .../langflow/services/monitor/factory.py | 1 + src/backend/langflow/services/schema.py | 6 +- .../langflow/services/session/factory.py | 7 +- .../langflow/services/session/service.py | 4 +- .../langflow/services/socket/factory.py | 9 +- .../langflow/services/socket/service.py | 6 +- .../langflow/services/storage/factory.py | 13 ++- .../langflow/services/telemetry/__init__.py | 0 .../langflow/services/telemetry/factory.py | 17 ++++ .../langflow/services/telemetry/service.py | 94 +++++++++++++++++++ .../langflow/services/telemetry/setup.py | 29 ++++++ .../langflow/services/telemetry/telemetry.py | 80 ++++++++++++++++ src/backend/langflow/services/utils.py | 85 +++++++---------- 20 files changed, 431 insertions(+), 140 deletions(-) create mode 100644 src/backend/langflow/services/telemetry/__init__.py create mode 100644 src/backend/langflow/services/telemetry/factory.py create mode 100644 src/backend/langflow/services/telemetry/service.py create mode 100644 src/backend/langflow/services/telemetry/setup.py create mode 100644 src/backend/langflow/services/telemetry/telemetry.py diff --git a/src/backend/langflow/main.py b/src/backend/langflow/main.py index 4b4c19a254..c37b28efcb 100644 --- a/src/backend/langflow/main.py +++ b/src/backend/langflow/main.py @@ -8,6 +8,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles + from langflow.api import router from langflow.interface.utils import setup_llm_caching from langflow.services.plugins.langfuse_plugin import LangfuseInstance @@ -18,7 +19,9 @@ def get_lifespan(fix_migration=False, socketio_server=None): @asynccontextmanager async def lifespan(app: FastAPI): - initialize_services(fix_migration=fix_migration, socketio_server=socketio_server) + initialize_services( + fix_migration=fix_migration, socketio_server=socketio_server + ) setup_llm_caching() LangfuseInstance.update() yield @@ -31,7 +34,9 @@ def create_app(): """Create the FastAPI app and include the router.""" configure() - socketio_server = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*", logger=True) + socketio_server = socketio.AsyncServer( + async_mode="asgi", cors_allowed_origins="*", logger=True + ) lifespan = get_lifespan(socketio_server=socketio_server) app = FastAPI(lifespan=lifespan) origins = ["*"] @@ -98,7 +103,9 @@ def get_static_files_dir(): return frontend_path / "frontend" -def setup_app(static_files_dir: Optional[Path] = None, backend_only: bool = False) -> FastAPI: +def setup_app( + static_files_dir: Optional[Path] = None, backend_only: bool = False +) -> FastAPI: """Setup the FastAPI app.""" # get the directory of the current file if not static_files_dir: @@ -114,6 +121,7 @@ def setup_app(static_files_dir: Optional[Path] = None, backend_only: bool = Fals if __name__ == "__main__": import uvicorn + from langflow.__main__ import get_number_of_workers configure() diff --git a/src/backend/langflow/services/cache/base.py b/src/backend/langflow/services/cache/base.py index 3b34e12f6f..0eeaf04023 100644 --- a/src/backend/langflow/services/cache/base.py +++ b/src/backend/langflow/services/cache/base.py @@ -3,7 +3,7 @@ from langflow.services.base import Service -class BaseCacheService(Service): +class CacheService(Service): """ Abstract base class for a cache. """ diff --git a/src/backend/langflow/services/cache/factory.py b/src/backend/langflow/services/cache/factory.py index 145f4e6533..9cc3dee36a 100644 --- a/src/backend/langflow/services/cache/factory.py +++ b/src/backend/langflow/services/cache/factory.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING -from langflow.services.cache.service import BaseCacheService, InMemoryCache, RedisCache +from langflow.services.cache.service import CacheService, InMemoryCache, RedisCache from langflow.services.factory import ServiceFactory from langflow.utils.logger import logger @@ -10,7 +10,7 @@ class CacheServiceFactory(ServiceFactory): def __init__(self): - super().__init__(BaseCacheService) + super().__init__(CacheService) def create(self, settings_service: "SettingsService"): # Here you would have logic to create and configure a CacheService @@ -28,7 +28,9 @@ def create(self, settings_service: "SettingsService"): if redis_cache.is_connected(): logger.debug("Redis cache is connected") return redis_cache - logger.warning("Redis cache is not connected, falling back to in-memory cache") + logger.warning( + "Redis cache is not connected, falling back to in-memory cache" + ) return InMemoryCache() elif settings_service.settings.CACHE_TYPE == "memory": diff --git a/src/backend/langflow/services/cache/service.py b/src/backend/langflow/services/cache/service.py index ced3458514..c4ec1e644e 100644 --- a/src/backend/langflow/services/cache/service.py +++ b/src/backend/langflow/services/cache/service.py @@ -5,12 +5,10 @@ from loguru import logger -from langflow.services.base import Service -from langflow.services.cache.base import BaseCacheService +from langflow.services.cache.base import CacheService -class InMemoryCache(BaseCacheService, Service): - +class InMemoryCache(CacheService): """ A simple in-memory cache using an OrderedDict. @@ -67,7 +65,10 @@ def _get_without_lock(self, key): Retrieve an item from the cache without acquiring the lock. """ if item := self._cache.get(key): - if self.expiration_time is None or time.time() - item["time"] < self.expiration_time: + if ( + self.expiration_time is None + or time.time() - item["time"] < self.expiration_time + ): # Move the key to the end to make it recently used self._cache.move_to_end(key) # Check if the value is pickled @@ -114,7 +115,11 @@ def upsert(self, key, value): """ with self._lock: existing_value = self._get_without_lock(key) - if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): + if ( + existing_value is not None + and isinstance(existing_value, dict) + and isinstance(value, dict) + ): existing_value.update(value) value = existing_value @@ -180,7 +185,7 @@ def __repr__(self): return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" -class RedisCache(BaseCacheService, Service): +class RedisCache(CacheService): """ A Redis-based cache implementation. @@ -203,7 +208,9 @@ class RedisCache(BaseCacheService, Service): b = cache["b"] """ - def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60): + def __init__( + self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60 + ): """ Initialize a new RedisCache instance. @@ -271,7 +278,9 @@ def set(self, key, value): if not result: raise ValueError("RedisCache could not set the value.") except TypeError as exc: - raise TypeError("RedisCache only accepts values that can be pickled. ") from exc + raise TypeError( + "RedisCache only accepts values that can be pickled. " + ) from exc def upsert(self, key, value): """ @@ -283,7 +292,11 @@ def upsert(self, key, value): value: The value to insert or update. """ existing_value = self.get(key) - if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): + if ( + existing_value is not None + and isinstance(existing_value, dict) + and isinstance(value, dict) + ): existing_value.update(value) value = existing_value diff --git a/src/backend/langflow/services/deps.py b/src/backend/langflow/services/deps.py index 19f3dcbf0f..c2eee9a4ab 100644 --- a/src/backend/langflow/services/deps.py +++ b/src/backend/langflow/services/deps.py @@ -3,7 +3,9 @@ from langflow.services import ServiceType, service_manager if TYPE_CHECKING: - from langflow.services.cache.service import BaseCacheService + from sqlmodel import Session + + from langflow.services.cache.service import CacheService from langflow.services.chat.service import ChatService from langflow.services.credentials.service import CredentialService from langflow.services.database.service import DatabaseService @@ -15,11 +17,10 @@ from langflow.services.storage.service import StorageService from langflow.services.store.service import StoreService from langflow.services.task.service import TaskService - from sqlmodel import Session def get_socket_service() -> "SocketIOService": - return service_manager.get(ServiceType.SOCKET_IO_SERVICE) # type: ignore + return service_manager.get(ServiceType.SOCKETIO_SERVICE) # type: ignore def get_storage_service() -> "StorageService": @@ -54,7 +55,7 @@ def get_session() -> Generator["Session", None, None]: yield from db_service.get_session() -def get_cache_service() -> "BaseCacheService": +def get_cache_service() -> "CacheService": return service_manager.get(ServiceType.CACHE_SERVICE) # type: ignore diff --git a/src/backend/langflow/services/factory.py b/src/backend/langflow/services/factory.py index 874d7374c8..360d07eb24 100644 --- a/src/backend/langflow/services/factory.py +++ b/src/backend/langflow/services/factory.py @@ -1,12 +1,91 @@ -from typing import TYPE_CHECKING +import importlib +import inspect +from typing import TYPE_CHECKING, Type, get_type_hints + +from cachetools import LRUCache, cached +from loguru import logger + +from langflow.services.schema import ServiceType if TYPE_CHECKING: from langflow.services.base import Service class ServiceFactory: - def __init__(self, service_class): + def __init__( + self, + service_class, + ): self.service_class = service_class + self.dependencies = infer_service_types(self, import_all_services_into_a_dict()) def create(self, *args, **kwargs) -> "Service": - raise NotImplementedError + raise self.service_class(*args, **kwargs) + + +def hash_factory(factory: ServiceFactory) -> str: + return factory.service_class.__name__ + + +def hash_dict(d: dict) -> str: + return str(d) + + +def hash_infer_service_types_args( + factory_class: Type[ServiceFactory], available_services=None +) -> str: + factory_hash = hash_factory(factory_class) + services_hash = hash_dict(available_services) + return f"{factory_hash}_{services_hash}" + + +@cached(cache=LRUCache(maxsize=10), key=hash_infer_service_types_args) +def infer_service_types( + factory_class: Type[ServiceFactory], available_services=None +) -> "ServiceType": + create_method = factory_class.create + type_hints = get_type_hints(create_method, globalns=available_services) + service_types = [] + for param_name, param_type in type_hints.items(): + # Skip the return type if it's included in type hints + if param_name == "return": + continue + + # Convert the type to the expected enum format directly without appending "_SERVICE" + type_name = param_type.__name__.upper().replace("SERVICE", "_SERVICE") + + try: + # Attempt to find a matching enum value + service_type = ServiceType[type_name] + service_types.append(service_type) + except KeyError: + raise ValueError( + f"No matching ServiceType for parameter type: {param_type.__name__}" + ) + return service_types + + +@cached(cache=LRUCache(maxsize=1)) +def import_all_services_into_a_dict(): + # Services are all in langflow.services.{service_name}.service + # and are subclass of Service + # We want to import all of them and put them in a dict + # to use as globals + from langflow.services.base import Service + + services = {} + for service_type in ServiceType: + try: + service_name = ServiceType(service_type).value.replace("_service", "") + module_name = f"langflow.services.{service_name}.service" + module = importlib.import_module(module_name) + for name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, Service) and obj is not Service: + services[name] = obj + break + except Exception as exc: + logger.exception(exc) + raise RuntimeError( + f"Could not initialize services. Please check your settings." + ) from exc + return services diff --git a/src/backend/langflow/services/manager.py b/src/backend/langflow/services/manager.py index 0adeefd294..470efe7e41 100644 --- a/src/backend/langflow/services/manager.py +++ b/src/backend/langflow/services/manager.py @@ -1,11 +1,11 @@ -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict -from langflow.services.schema import ServiceType from loguru import logger if TYPE_CHECKING: from langflow.services.base import Service from langflow.services.factory import ServiceFactory + from langflow.services.schema import ServiceType class ServiceManager: @@ -16,23 +16,19 @@ class ServiceManager: def __init__(self): self.services: Dict[str, "Service"] = {} self.factories = {} - self.dependencies = {} def register_factory( self, service_factory: "ServiceFactory", - dependencies: Optional[List[ServiceType]] = None, ): """ Registers a new factory with dependencies. """ - if dependencies is None: - dependencies = [] + service_name = service_factory.service_class.name self.factories[service_name] = service_factory - self.dependencies[service_name] = dependencies - def get(self, service_name: ServiceType) -> "Service": + def get(self, service_name: "ServiceType") -> "Service": """ Get (or create) a service by its name. """ @@ -41,7 +37,7 @@ def get(self, service_name: ServiceType) -> "Service": return self.services[service_name] - def _create_service(self, service_name: ServiceType): + def _create_service(self, service_name: "ServiceType"): """ Create a new service given its name, handling dependencies. """ @@ -49,25 +45,32 @@ def _create_service(self, service_name: ServiceType): self._validate_service_creation(service_name) # Create dependencies first - for dependency in self.dependencies.get(service_name, []): + factory = self.factories.get(service_name) + for dependency in factory.dependencies: if dependency not in self.services: self._create_service(dependency) # Collect the dependent services - dependent_services = {dep.value: self.services[dep] for dep in self.dependencies.get(service_name, [])} + dependent_services = { + dep.value: self.services[dep] for dep in factory.dependencies + } # Create the actual service - self.services[service_name] = self.factories[service_name].create(**dependent_services) + self.services[service_name] = self.factories[service_name].create( + **dependent_services + ) self.services[service_name].set_ready() - def _validate_service_creation(self, service_name: ServiceType): + def _validate_service_creation(self, service_name: "ServiceType"): """ Validate whether the service can be created. """ if service_name not in self.factories: - raise ValueError(f"No factory registered for the service class '{service_name.name}'") + raise ValueError( + f"No factory registered for the service class '{service_name.name}'" + ) - def update(self, service_name: ServiceType): + def update(self, service_name: "ServiceType"): """ Update a service by its name. """ @@ -90,36 +93,11 @@ def teardown(self): logger.exception(exc) self.services = {} self.factories = {} - self.dependencies = {} service_manager = ServiceManager() -def reinitialize_services(): - """ - Reinitialize all the services needed. - """ - - service_manager.update(ServiceType.SETTINGS_SERVICE) - service_manager.update(ServiceType.DATABASE_SERVICE) - service_manager.update(ServiceType.CACHE_SERVICE) - service_manager.update(ServiceType.CHAT_SERVICE) - service_manager.update(ServiceType.SESSION_SERVICE) - service_manager.update(ServiceType.AUTH_SERVICE) - service_manager.update(ServiceType.TASK_SERVICE) - - # Test cache connection - service_manager.get(ServiceType.CACHE_SERVICE) - # Test database connection - service_manager.get(ServiceType.DATABASE_SERVICE) - - # Test cache connection - service_manager.get(ServiceType.CACHE_SERVICE) - # Test database connection - service_manager.get(ServiceType.DATABASE_SERVICE) - - def initialize_settings_service(): """ Initialize the settings manager. @@ -134,13 +112,12 @@ def initialize_session_service(): Initialize the session manager. """ from langflow.services.cache import factory as cache_factory - from langflow.services.session import factory as session_service_factory # type: ignore + from langflow.services.session import ( + factory as session_service_factory, + ) # type: ignore initialize_settings_service() - service_manager.register_factory(cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE]) + service_manager.register_factory(cache_factory.CacheServiceFactory()) - service_manager.register_factory( - session_service_factory.SessionServiceFactory(), - dependencies=[ServiceType.CACHE_SERVICE], - ) + service_manager.register_factory(session_service_factory.SessionServiceFactory()) diff --git a/src/backend/langflow/services/monitor/factory.py b/src/backend/langflow/services/monitor/factory.py index d9e6caf72e..58bf0b278c 100644 --- a/src/backend/langflow/services/monitor/factory.py +++ b/src/backend/langflow/services/monitor/factory.py @@ -1,5 +1,6 @@ from langflow.services.factory import ServiceFactory from langflow.services.monitor.service import MonitorService +from langflow.services.schema import ServiceType class MonitorServiceFactory(ServiceFactory): diff --git a/src/backend/langflow/services/schema.py b/src/backend/langflow/services/schema.py index 7e78fc178d..b5104767a4 100644 --- a/src/backend/langflow/services/schema.py +++ b/src/backend/langflow/services/schema.py @@ -14,9 +14,9 @@ class ServiceType(str, Enum): CHAT_SERVICE = "chat_service" SESSION_SERVICE = "session_service" TASK_SERVICE = "task_service" - PLUGIN_SERVICE = "plugin_service" + PLUGINS_SERVICE = "plugins_service" STORE_SERVICE = "store_service" - CREDENTIAL_SERVICE = "credential_service" + CREDENTIALS_SERVICE = "credentials_service" STORAGE_SERVICE = "storage_service" MONITOR_SERVICE = "monitor_service" - SOCKET_IO_SERVICE = "socket_io_service" + SOCKETIO_SERVICE = "socket_service" diff --git a/src/backend/langflow/services/session/factory.py b/src/backend/langflow/services/session/factory.py index beb0bd6bd5..d55bd5b468 100644 --- a/src/backend/langflow/services/session/factory.py +++ b/src/backend/langflow/services/session/factory.py @@ -1,14 +1,15 @@ from typing import TYPE_CHECKING -from langflow.services.session.service import SessionService + from langflow.services.factory import ServiceFactory +from langflow.services.session.service import SessionService if TYPE_CHECKING: - from langflow.services.cache.service import BaseCacheService + from langflow.services.cache.service import CacheService class SessionServiceFactory(ServiceFactory): def __init__(self): super().__init__(SessionService) - def create(self, cache_service: "BaseCacheService"): + def create(self, cache_service: "CacheService"): return SessionService(cache_service) diff --git a/src/backend/langflow/services/session/service.py b/src/backend/langflow/services/session/service.py index 68fec04308..2360deb354 100644 --- a/src/backend/langflow/services/session/service.py +++ b/src/backend/langflow/services/session/service.py @@ -5,14 +5,14 @@ from langflow.services.session.utils import compute_dict_hash, session_id_generator if TYPE_CHECKING: - from langflow.services.cache.base import BaseCacheService + from langflow.services.cache.base import CacheService class SessionService(Service): name = "session_service" def __init__(self, cache_service): - self.cache_service: "BaseCacheService" = cache_service + self.cache_service: "CacheService" = cache_service async def load_session(self, key, flow_id: str, data_graph: Optional[dict] = None): # Check if the data is cached diff --git a/src/backend/langflow/services/socket/factory.py b/src/backend/langflow/services/socket/factory.py index dddb3c4fec..6d37a638c6 100644 --- a/src/backend/langflow/services/socket/factory.py +++ b/src/backend/langflow/services/socket/factory.py @@ -1,15 +1,18 @@ from typing import TYPE_CHECKING from langflow.services.factory import ServiceFactory +from langflow.services.schema import ServiceType from langflow.services.socket.service import SocketIOService if TYPE_CHECKING: - from langflow.services.cache.service import BaseCacheService + from langflow.services.cache.service import CacheService class SocketIOFactory(ServiceFactory): def __init__(self): - super().__init__(service_class=SocketIOService) + super().__init__( + service_class=SocketIOService, + ) - def create(self, cache_service: "BaseCacheService"): + def create(self, cache_service: "CacheService"): return SocketIOService(cache_service) diff --git a/src/backend/langflow/services/socket/service.py b/src/backend/langflow/services/socket/service.py index fd2c236d55..7aa6aef69d 100644 --- a/src/backend/langflow/services/socket/service.py +++ b/src/backend/langflow/services/socket/service.py @@ -8,13 +8,13 @@ from langflow.services.socket.utils import build_vertex, get_vertices if TYPE_CHECKING: - from langflow.services.cache.service import BaseCacheService + from langflow.services.cache.service import CacheService class SocketIOService(Service): - name = "socket_io_service" + name = "socket_service" - def __init__(self, cache_service: "BaseCacheService"): + def __init__(self, cache_service: "CacheService"): self.cache_service = cache_service def init(self, sio: socketio.AsyncServer): diff --git a/src/backend/langflow/services/storage/factory.py b/src/backend/langflow/services/storage/factory.py index be407d7510..ffa4ed6cf9 100644 --- a/src/backend/langflow/services/storage/factory.py +++ b/src/backend/langflow/services/storage/factory.py @@ -1,6 +1,7 @@ from typing import TYPE_CHECKING from langflow.services.factory import ServiceFactory +from langflow.services.schema import ServiceType from langflow.services.storage.service import StorageService from loguru import logger @@ -11,9 +12,13 @@ class StorageServiceFactory(ServiceFactory): def __init__(self): - super().__init__(StorageService) + super().__init__( + StorageService, + ) - def create(self, session_service: "SessionService", settings_service: "SettingsService"): + def create( + self, session_service: "SessionService", settings_service: "SettingsService" + ): storage_type = settings_service.settings.STORAGE_TYPE if storage_type.lower() == "local": from .local import LocalStorageService @@ -24,7 +29,9 @@ def create(self, session_service: "SessionService", settings_service: "SettingsS return S3StorageService(session_service, settings_service) else: - logger.warning(f"Storage type {storage_type} not supported. Using local storage.") + logger.warning( + f"Storage type {storage_type} not supported. Using local storage." + ) from .local import LocalStorageService return LocalStorageService(session_service, settings_service) diff --git a/src/backend/langflow/services/telemetry/__init__.py b/src/backend/langflow/services/telemetry/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/backend/langflow/services/telemetry/factory.py b/src/backend/langflow/services/telemetry/factory.py new file mode 100644 index 0000000000..3a2ed6dd50 --- /dev/null +++ b/src/backend/langflow/services/telemetry/factory.py @@ -0,0 +1,17 @@ +from typing import TYPE_CHECKING + +from langflow.services.factory import ServiceFactory +from langflow.services.telemetry.service import TelemetryService + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService + + +class TelemetryServiceFactory(ServiceFactory): + def __init__(self): + super().__init__( + TelemetryService, + ) + + def create(self, settings_service: "SettingsService"): + return super().create(settings_service=settings_service) diff --git a/src/backend/langflow/services/telemetry/service.py b/src/backend/langflow/services/telemetry/service.py new file mode 100644 index 0000000000..3e15cadefd --- /dev/null +++ b/src/backend/langflow/services/telemetry/service.py @@ -0,0 +1,94 @@ +import os +import platform +from typing import TYPE_CHECKING, Any, Dict + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import Status, StatusCode + +from langflow.services.base import Service + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService + + +def setup_tracing(app: FastAPI, telemetry_service: TelemetryService): + # Configure the tracer to export traces + trace.set_tracer_provider(telemetry_service.provider) + tracer_provider = trace.get_tracer_provider() + + # Configure the OTLP exporter + otlp_exporter = telemetry_service.provider.get_span_processor().exporter + + # Add the OTLP exporter to the tracer provider + span_processor = BatchSpanProcessor(otlp_exporter) + tracer_provider.add_span_processor(span_processor) + + # Instrument FastAPI app + FastAPIInstrumentor.instrument_app(app) + + +class TelemetryService(Service): + name = "telemetry_service" + + def __init__(self, settings_service: "SettingsService"): + super().__init__(service_name="telemetry_service") + self.settings_service = settings_service + settings = self.settings_service.settings + self.init( + service_name=settings.SERVICE_NAME, + telemetry_endpoint=settings.TELEMETRY_ENDPOINT, + ) + self.set_ready() + + def init( + self, + service_name: str, + telemetry_endpoint: str = "http://telemetry.example.com:4318", + ): + self.ready = False + try: + self.resource = Resource(attributes={SERVICE_NAME: service_name}) + self.provider = TracerProvider(resource=self.resource) + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint=f"{telemetry_endpoint}/v1/traces", timeout=15) + ) + self.provider.add_span_processor(processor) + trace.set_tracer_provider(self.provider) + self.ready = True + except Exception as e: + print(f"Failed to initialize telemetry: {e}") + + def teardown(self): + raise NotImplementedError + + def record_event(self, event_name: str, attributes: Dict[str, Any]): + """Records a generic event with specified attributes.""" + if self.ready: + tracer = trace.get_tracer("generic.telemetry") + with tracer.start_as_current_span(event_name) as span: + for key, value in attributes.items(): + self._add_attribute(span, key, value) + span.set_status(Status(StatusCode.OK)) + + def _add_attribute(self, span, key: str, value: Any): + """Safely adds an attribute to a span.""" + try: + span.set_attribute(key, value) + except Exception as e: + print(f"Failed to add attribute {key}: {e}") + + @staticmethod + def gather_system_info() -> Dict[str, Any]: + """Collects generic system information.""" + return { + "python_version": platform.python_version(), + "platform": platform.platform(), + "platform_release": platform.release(), + "platform_system": platform.system(), + "platform_version": platform.version(), + "cpus": os.cpu_count(), + } diff --git a/src/backend/langflow/services/telemetry/setup.py b/src/backend/langflow/services/telemetry/setup.py new file mode 100644 index 0000000000..4b1c72e717 --- /dev/null +++ b/src/backend/langflow/services/telemetry/setup.py @@ -0,0 +1,29 @@ +from fastapi import FastAPI +from langflow.services.telemetry.service import TelemetryService +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from langflow.service.deps import get_telemetry_service +from langflow.services.deps import get_settings_service + + +def setup_tracing( + app: FastAPI, +): + # Configure the tracer to export traces + telemetry_service = get_telemetry_service() + trace.set_tracer_provider(telemetry_service.provider) + tracer_provider = trace.get_tracer_provider() + + # Configure the OTLP exporter + otlp_exporter = telemetry_service.provider.get_span_processor().exporter + + # Add the OTLP exporter to the tracer provider + span_processor = BatchSpanProcessor(otlp_exporter) + tracer_provider.add_span_processor(span_processor) + + # Instrument FastAPI app + FastAPIInstrumentor.instrument_app(app) diff --git a/src/backend/langflow/services/telemetry/telemetry.py b/src/backend/langflow/services/telemetry/telemetry.py new file mode 100644 index 0000000000..77aacc29e0 --- /dev/null +++ b/src/backend/langflow/services/telemetry/telemetry.py @@ -0,0 +1,80 @@ +import os +import platform +from typing import Any, Dict + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import Status, StatusCode + + +class GenericTelemetry: + """A class to handle anonymous telemetry for a generic package or application. + + The data being collected is for development purposes, and all data is anonymous. + + Users can customize the data points collected according to their needs. + """ + + def __init__( + self, + service_name: str, + telemetry_endpoint: str = "http://telemetry.example.com:4318", + ): + self.ready = False + try: + self.resource = Resource(attributes={SERVICE_NAME: service_name}) + self.provider = TracerProvider(resource=self.resource) + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint=f"{telemetry_endpoint}/v1/traces", timeout=15) + ) + self.provider.add_span_processor(processor) + trace.set_tracer_provider(self.provider) + self.ready = True + except Exception as e: + print(f"Failed to initialize telemetry: {e}") + + def record_event(self, event_name: str, attributes: Dict[str, Any]): + """Records a generic event with specified attributes.""" + if self.ready: + tracer = trace.get_tracer("generic.telemetry") + with tracer.start_as_current_span(event_name) as span: + for key, value in attributes.items(): + self._add_attribute(span, key, value) + span.set_status(Status(StatusCode.OK)) + + def _add_attribute(self, span, key: str, value: Any): + """Safely adds an attribute to a span.""" + try: + span.set_attribute(key, value) + except Exception as e: + print(f"Failed to add attribute {key}: {e}") + + @staticmethod + def gather_system_info() -> Dict[str, Any]: + """Collects generic system information.""" + return { + "python_version": platform.python_version(), + "platform": platform.platform(), + "platform_release": platform.release(), + "platform_system": platform.system(), + "platform_version": platform.version(), + "cpus": os.cpu_count(), + } + + +# Example usage +telemetry = GenericTelemetry(service_name="MyAPI") + +# Record a generic event +telemetry.record_event( + "API Request", + { + "endpoint": "/api/data", + "method": "GET", + "status_code": 200, + **telemetry.gather_system_info(), + }, +) diff --git a/src/backend/langflow/services/utils.py b/src/backend/langflow/services/utils.py index 34f1a042d2..f7004d4a9b 100644 --- a/src/backend/langflow/services/utils.py +++ b/src/backend/langflow/services/utils.py @@ -1,8 +1,12 @@ +import importlib +import inspect + from loguru import logger from sqlmodel import Session, select from langflow.services.auth.utils import create_super_user, verify_password from langflow.services.database.utils import initialize_database +from langflow.services.factory import ServiceFactory from langflow.services.manager import service_manager from langflow.services.schema import ServiceType from langflow.services.settings.constants import ( @@ -14,56 +18,32 @@ from .deps import get_db_service, get_session, get_settings_service -def get_factories_and_deps(): - from langflow.services.auth import factory as auth_factory - from langflow.services.cache import factory as cache_factory - from langflow.services.chat import factory as chat_factory - from langflow.services.credentials import factory as credentials_factory - from langflow.services.database import factory as database_factory - from langflow.services.monitor import factory as monitor_factory - from langflow.services.plugins import factory as plugins_factory - from langflow.services.session import ( - factory as session_service_factory, - ) # type: ignore - from langflow.services.settings import factory as settings_factory - from langflow.services.socket import factory as socket_factory - from langflow.services.storage import factory as storage_factory - from langflow.services.store import factory as store_factory - from langflow.services.task import factory as task_factory - - return [ - (settings_factory.SettingsServiceFactory(), []), - ( - auth_factory.AuthServiceFactory(), - [ServiceType.SETTINGS_SERVICE], - ), - ( - database_factory.DatabaseServiceFactory(), - [ServiceType.SETTINGS_SERVICE], - ), - ( - cache_factory.CacheServiceFactory(), - [ServiceType.SETTINGS_SERVICE], - ), - (chat_factory.ChatServiceFactory(), []), - (task_factory.TaskServiceFactory(), []), - ( - session_service_factory.SessionServiceFactory(), - [ServiceType.CACHE_SERVICE], - ), - (plugins_factory.PluginServiceFactory(), [ServiceType.SETTINGS_SERVICE]), - (store_factory.StoreServiceFactory(), [ServiceType.SETTINGS_SERVICE]), - ( - credentials_factory.CredentialServiceFactory(), - [ServiceType.SETTINGS_SERVICE], - ), - ( - storage_factory.StorageServiceFactory(), - [ServiceType.SESSION_SERVICE, ServiceType.SETTINGS_SERVICE], - ), - (monitor_factory.MonitorServiceFactory(), [ServiceType.SETTINGS_SERVICE]), - (socket_factory.SocketIOFactory(), [ServiceType.CACHE_SERVICE]), +def get_factories(): + service_names = [ + ServiceType(service_type).value.replace("_service", "") + for service_type in ServiceType ] + base_module = "langflow.services" + factories = [] + + for name in service_names: + try: + module_name = f"{base_module}.{name}.factory" + module = importlib.import_module(module_name) + + # Find all classes in the module that are subclasses of ServiceFactory + for name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, ServiceFactory) and obj is not ServiceFactory: + factories.append(obj()) + break + + except Exception as exc: + logger.exception(exc) + raise RuntimeError( + f"Could not initialize services. Please check your settings." + ) from exc + + return factories def get_or_create_super_user(session: Session, username, password, is_default): @@ -211,12 +191,11 @@ def initialize_session_service(): initialize_settings_service() service_manager.register_factory( - cache_factory.CacheServiceFactory(), dependencies=[ServiceType.SETTINGS_SERVICE] + cache_factory.CacheServiceFactory(), ) service_manager.register_factory( session_service_factory.SessionServiceFactory(), - dependencies=[ServiceType.CACHE_SERVICE], ) @@ -224,9 +203,9 @@ def initialize_services(fix_migration: bool = False, socketio_server=None): """ Initialize all the services needed. """ - for factory, dependencies in get_factories_and_deps(): + for factory in get_factories(): try: - service_manager.register_factory(factory, dependencies=dependencies) + service_manager.register_factory(factory) except Exception as exc: logger.exception(exc) raise RuntimeError( From 260c5d8fcc559fad315a9c8531b555f428cec44e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 25 Mar 2024 09:49:44 -0300 Subject: [PATCH 3/3] Remove telemetry service and related code --- .../langflow/services/telemetry/__init__.py | 0 .../langflow/services/telemetry/factory.py | 17 ---- .../langflow/services/telemetry/service.py | 94 ------------------- .../langflow/services/telemetry/setup.py | 29 ------ .../langflow/services/telemetry/telemetry.py | 80 ---------------- 5 files changed, 220 deletions(-) delete mode 100644 src/backend/langflow/services/telemetry/__init__.py delete mode 100644 src/backend/langflow/services/telemetry/factory.py delete mode 100644 src/backend/langflow/services/telemetry/service.py delete mode 100644 src/backend/langflow/services/telemetry/setup.py delete mode 100644 src/backend/langflow/services/telemetry/telemetry.py diff --git a/src/backend/langflow/services/telemetry/__init__.py b/src/backend/langflow/services/telemetry/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/backend/langflow/services/telemetry/factory.py b/src/backend/langflow/services/telemetry/factory.py deleted file mode 100644 index 3a2ed6dd50..0000000000 --- a/src/backend/langflow/services/telemetry/factory.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import TYPE_CHECKING - -from langflow.services.factory import ServiceFactory -from langflow.services.telemetry.service import TelemetryService - -if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService - - -class TelemetryServiceFactory(ServiceFactory): - def __init__(self): - super().__init__( - TelemetryService, - ) - - def create(self, settings_service: "SettingsService"): - return super().create(settings_service=settings_service) diff --git a/src/backend/langflow/services/telemetry/service.py b/src/backend/langflow/services/telemetry/service.py deleted file mode 100644 index 3e15cadefd..0000000000 --- a/src/backend/langflow/services/telemetry/service.py +++ /dev/null @@ -1,94 +0,0 @@ -import os -import platform -from typing import TYPE_CHECKING, Any, Dict - -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import Status, StatusCode - -from langflow.services.base import Service - -if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService - - -def setup_tracing(app: FastAPI, telemetry_service: TelemetryService): - # Configure the tracer to export traces - trace.set_tracer_provider(telemetry_service.provider) - tracer_provider = trace.get_tracer_provider() - - # Configure the OTLP exporter - otlp_exporter = telemetry_service.provider.get_span_processor().exporter - - # Add the OTLP exporter to the tracer provider - span_processor = BatchSpanProcessor(otlp_exporter) - tracer_provider.add_span_processor(span_processor) - - # Instrument FastAPI app - FastAPIInstrumentor.instrument_app(app) - - -class TelemetryService(Service): - name = "telemetry_service" - - def __init__(self, settings_service: "SettingsService"): - super().__init__(service_name="telemetry_service") - self.settings_service = settings_service - settings = self.settings_service.settings - self.init( - service_name=settings.SERVICE_NAME, - telemetry_endpoint=settings.TELEMETRY_ENDPOINT, - ) - self.set_ready() - - def init( - self, - service_name: str, - telemetry_endpoint: str = "http://telemetry.example.com:4318", - ): - self.ready = False - try: - self.resource = Resource(attributes={SERVICE_NAME: service_name}) - self.provider = TracerProvider(resource=self.resource) - processor = BatchSpanProcessor( - OTLPSpanExporter(endpoint=f"{telemetry_endpoint}/v1/traces", timeout=15) - ) - self.provider.add_span_processor(processor) - trace.set_tracer_provider(self.provider) - self.ready = True - except Exception as e: - print(f"Failed to initialize telemetry: {e}") - - def teardown(self): - raise NotImplementedError - - def record_event(self, event_name: str, attributes: Dict[str, Any]): - """Records a generic event with specified attributes.""" - if self.ready: - tracer = trace.get_tracer("generic.telemetry") - with tracer.start_as_current_span(event_name) as span: - for key, value in attributes.items(): - self._add_attribute(span, key, value) - span.set_status(Status(StatusCode.OK)) - - def _add_attribute(self, span, key: str, value: Any): - """Safely adds an attribute to a span.""" - try: - span.set_attribute(key, value) - except Exception as e: - print(f"Failed to add attribute {key}: {e}") - - @staticmethod - def gather_system_info() -> Dict[str, Any]: - """Collects generic system information.""" - return { - "python_version": platform.python_version(), - "platform": platform.platform(), - "platform_release": platform.release(), - "platform_system": platform.system(), - "platform_version": platform.version(), - "cpus": os.cpu_count(), - } diff --git a/src/backend/langflow/services/telemetry/setup.py b/src/backend/langflow/services/telemetry/setup.py deleted file mode 100644 index 4b1c72e717..0000000000 --- a/src/backend/langflow/services/telemetry/setup.py +++ /dev/null @@ -1,29 +0,0 @@ -from fastapi import FastAPI -from langflow.services.telemetry.service import TelemetryService -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from langflow.service.deps import get_telemetry_service -from langflow.services.deps import get_settings_service - - -def setup_tracing( - app: FastAPI, -): - # Configure the tracer to export traces - telemetry_service = get_telemetry_service() - trace.set_tracer_provider(telemetry_service.provider) - tracer_provider = trace.get_tracer_provider() - - # Configure the OTLP exporter - otlp_exporter = telemetry_service.provider.get_span_processor().exporter - - # Add the OTLP exporter to the tracer provider - span_processor = BatchSpanProcessor(otlp_exporter) - tracer_provider.add_span_processor(span_processor) - - # Instrument FastAPI app - FastAPIInstrumentor.instrument_app(app) diff --git a/src/backend/langflow/services/telemetry/telemetry.py b/src/backend/langflow/services/telemetry/telemetry.py deleted file mode 100644 index 77aacc29e0..0000000000 --- a/src/backend/langflow/services/telemetry/telemetry.py +++ /dev/null @@ -1,80 +0,0 @@ -import os -import platform -from typing import Any, Dict - -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import Status, StatusCode - - -class GenericTelemetry: - """A class to handle anonymous telemetry for a generic package or application. - - The data being collected is for development purposes, and all data is anonymous. - - Users can customize the data points collected according to their needs. - """ - - def __init__( - self, - service_name: str, - telemetry_endpoint: str = "http://telemetry.example.com:4318", - ): - self.ready = False - try: - self.resource = Resource(attributes={SERVICE_NAME: service_name}) - self.provider = TracerProvider(resource=self.resource) - processor = BatchSpanProcessor( - OTLPSpanExporter(endpoint=f"{telemetry_endpoint}/v1/traces", timeout=15) - ) - self.provider.add_span_processor(processor) - trace.set_tracer_provider(self.provider) - self.ready = True - except Exception as e: - print(f"Failed to initialize telemetry: {e}") - - def record_event(self, event_name: str, attributes: Dict[str, Any]): - """Records a generic event with specified attributes.""" - if self.ready: - tracer = trace.get_tracer("generic.telemetry") - with tracer.start_as_current_span(event_name) as span: - for key, value in attributes.items(): - self._add_attribute(span, key, value) - span.set_status(Status(StatusCode.OK)) - - def _add_attribute(self, span, key: str, value: Any): - """Safely adds an attribute to a span.""" - try: - span.set_attribute(key, value) - except Exception as e: - print(f"Failed to add attribute {key}: {e}") - - @staticmethod - def gather_system_info() -> Dict[str, Any]: - """Collects generic system information.""" - return { - "python_version": platform.python_version(), - "platform": platform.platform(), - "platform_release": platform.release(), - "platform_system": platform.system(), - "platform_version": platform.version(), - "cpus": os.cpu_count(), - } - - -# Example usage -telemetry = GenericTelemetry(service_name="MyAPI") - -# Record a generic event -telemetry.record_event( - "API Request", - { - "endpoint": "/api/data", - "method": "GET", - "status_code": 200, - **telemetry.gather_system_info(), - }, -)