-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
main.py
137 lines (106 loc) · 3.94 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from urllib.parse import urlencode
import socketio # type: ignore
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from langflow.api import router
from langflow.initial_setup.setup import create_or_update_starter_projects
from langflow.interface.utils import setup_llm_caching
from langflow.services.plugins.langfuse_plugin import LangfuseInstance
from langflow.services.utils import initialize_services, teardown_services
from langflow.utils.logger import configure
def get_lifespan(fix_migration=False, socketio_server=None):
@asynccontextmanager
async def lifespan(app: FastAPI):
initialize_services(
fix_migration=fix_migration, socketio_server=socketio_server
)
setup_llm_caching()
LangfuseInstance.update()
create_or_update_starter_projects()
yield
teardown_services()
return lifespan
def create_app():
"""Create the FastAPI app and include the router."""
configure()
socketio_server = socketio.AsyncServer(
async_mode="asgi", cors_allowed_origins="*", logger=True
)
lifespan = get_lifespan(socketio_server=socketio_server)
app = FastAPI(lifespan=lifespan)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def flatten_query_string_lists(request: Request, call_next):
flattened: list[tuple[str, str]] = []
for key, value in request.query_params.multi_items():
flattened.extend((key, entry) for entry in value.split(","))
request.scope["query_string"] = urlencode(flattened, doseq=True).encode("utf-8")
return await call_next(request)
@app.get("/health")
def health():
return {"status": "ok"}
app.include_router(router)
app = mount_socketio(app, socketio_server)
return app
def mount_socketio(app: FastAPI, socketio_server: socketio.AsyncServer):
app.mount("/sio", socketio.ASGIApp(socketio_server, socketio_path=""))
return app
def setup_static_files(app: FastAPI, static_files_dir: Path):
"""
Setup the static files directory.
Args:
app (FastAPI): FastAPI app.
path (str): Path to the static files directory.
"""
app.mount(
"/",
StaticFiles(directory=static_files_dir, html=True),
name="static",
)
@app.exception_handler(404)
async def custom_404_handler(request, __):
path = static_files_dir / "index.html"
if not path.exists():
raise RuntimeError(f"File at path {path} does not exist.")
return FileResponse(path)
def get_static_files_dir():
"""Get the static files directory relative to Langflow's main.py file."""
frontend_path = Path(__file__).parent
return frontend_path / "frontend"
def setup_app(
static_files_dir: Optional[Path] = None, backend_only: bool = False
) -> FastAPI:
"""Setup the FastAPI app."""
# get the directory of the current file
if not static_files_dir:
static_files_dir = get_static_files_dir()
if not backend_only and (not static_files_dir or not static_files_dir.exists()):
raise RuntimeError(f"Static files directory {static_files_dir} does not exist.")
app = create_app()
if not backend_only and static_files_dir is not None:
setup_static_files(app, static_files_dir)
return app
if __name__ == "__main__":
import uvicorn
from langflow.__main__ import get_number_of_workers
configure()
uvicorn.run(
"langflow.main:create_app",
host="127.0.0.1",
port=7860,
workers=get_number_of_workers(),
log_level="debug",
reload=True,
)