-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
359 lines (296 loc) · 13.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import os
import numpy as np
# import pandas pd
import requests
import logging
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from transformers import AutoModelForCausalLM, AutoTokenizer, BertModel, BertTokenizer
from swarms import Agent, Anthropic, ChromaDB, ToolAgent
from swarms.models import LlamaModel
from swarms.structs import SequentialWorkflow, ConcurrentWorkflow, MixtureOfAgents
from swarms.utils.json_utils import base_model_to_json
from llama_cpp import Llama
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(filename='real_estate_ai_agent.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Initialize different models
claude_model = Anthropic(
temperature=0.5,
anthropic_api_key=os.environ.get("ANTHROPIC_API_KEY"),
model="claude-3.5"
)
llama_model = LlamaModel(model_path="path/to/llama/model.bin")
# Load Dolly model
dolly_model = AutoModelForCausalLM.from_pretrained(
"databricks/dolly-v2-12b",
load_in_4bit=True,
device_map="auto",
)
dolly_tokenizer = AutoTokenizer.from_pretrained("databricks/dolly-v2-12b")
# Initialize long-term memory
memory = ChromaDB(
metric="cosine",
n_results=3,
output_dir="real_estate_data",
docs_folder="property_docs",
)
# Define data schemas
class PropertySchema(BaseModel):
address: str = Field(..., title="Property address")
total_units: int = Field(..., title="Total number of units", ge=400)
price: float = Field(..., title="Total property price")
price_per_unit: float = Field(..., title="Price per unit")
occupancy_rate: float = Field(..., title="Current occupancy rate", ge=0, le=1)
average_rent: float = Field(..., title="Average monthly rent per unit")
estimated_roi: float = Field(..., title="Estimated annual ROI", ge=0)
neighborhood_score: int = Field(..., title="Neighborhood quality score (1-10)", ge=1, le=10)
proximity_to_services: int = Field(..., title="Proximity to essential services score (1-10)", ge=1, le=10)
condition_score: int = Field(..., title="Property condition score (1-10)", ge=1, le=10)
class MarketAnalysisSchema(BaseModel):
location: str = Field(..., title="Location analyzed")
population_growth: float = Field(..., title="Annual population growth rate")
job_market_score: int = Field(..., title="Job market strength (1-10)", ge=1, le=10)
average_income: float = Field(..., title="Average annual income in the area")
affordability_index: float = Field(..., title="Housing affordability index")
demand_forecast: str = Field(..., title="Forecasted demand for affordable housing")
# Define specialized agents
class RealEstateAgent(Agent):
def __init__(self, agent_name, system_prompt, model, *args, **kwargs):
super().__init__(
llm=model,
agent_name=agent_name,
system_prompt=system_prompt,
max_loops=5,
autosave=True,
dashboard=True,
long_term_memory=memory,
*args,
**kwargs
)
# Create specialized agents with different models
market_analyst = RealEstateAgent(
"Market Analyst",
"Analyze real estate markets focusing on multi-family properties (400+ units) for low-income housing. Provide detailed insights on market trends, growth potential, and affordability factors.",
claude_model
)
financial_analyst = RealEstateAgent(
"Financial Analyst",
"Analyze the financial aspects of multi-family properties (400+ units), focusing on affordability for low-income families. Provide ROI projections, cash flow analysis, and investment recommendations.",
llama_model
)
property_inspector = RealEstateAgent(
"Property Inspector",
"Evaluate property conditions and potential renovation needs for multi-family units (400+ units). Focus on cost-effective improvements that can enhance living conditions for low-income residents.",
claude_model
)
community_impact_analyst = ToolAgent(
name="Community Impact Analyst",
description="Assess the impact of affordable housing projects (400+ units) on local communities. Provide recommendations for maximizing positive community impact and addressing potential challenges.",
model=dolly_model,
tokenizer=dolly_tokenizer,
json_schema=base_model_to_json(PropertySchema)
)
# Define tool functions (implementations are assumed to be provided)
def search_properties(location: str, min_units: int, max_price: float) -> List[PropertySchema]:
# Placeholder implementation
return []
def analyze_financials(property_data: PropertySchema) -> Dict[str, float]:
# Placeholder implementation
return {}
def assess_property_condition(property_data: PropertySchema) -> Dict[str, Any]:
# Placeholder implementation
return {}
def evaluate_community_impact(property_data: PropertySchema, location: str) -> Dict[str, Any]:
# Placeholder implementation
return {}
# Add tools to agents
market_analyst.add_tool(search_properties)
financial_analyst.add_tool(analyze_financials)
property_inspector.add_tool(assess_property_condition)
community_impact_analyst.add_tool(evaluate_community_impact)
# Create workflows
sequential_workflow = SequentialWorkflow(
agents=[market_analyst, financial_analyst, property_inspector, community_impact_analyst],
max_loops=1
)
concurrent_workflow = ConcurrentWorkflow(max_workers=4)
concurrent_workflow.add(tasks=[
market_analyst.run("Analyze the multi-family property market in New York City for low-income housing"),
financial_analyst.run("Evaluate the financial viability of a 400-unit property priced at $50 million for low-income housing"),
property_inspector.run("Assess the condition and renovation needs of a 400-unit property at 123 Main St"),
community_impact_analyst.run("Evaluate the impact of a new 400-unit affordable housing project in Brooklyn")
])
mixture_of_agents = MixtureOfAgents(
name="Real Estate Investment Team",
agents=[market_analyst, financial_analyst, property_inspector, community_impact_analyst],
layers=3,
final_agent=market_analyst
)
# Main execution function
def optimize_real_estate_investment(location: str, budget: float, min_units: int = 400):
# Placeholder implementation
return "Recommendation: Invest in property at 123 Main St with a projected ROI of 8%."
if __name__ == "__main__":
location = "New York City"
budget = 100000000 # $100 million
recommendation = optimize_real_estate_investment(location, budget)
print("Final Investment Recommendation:")
print(recommendation)
# Save recommendation to file
with open("investment_recommendation.txt", "w") as f:
f.write(recommendation)
# Additional Components from the second repository
class Swarm(ABC):
def __init__(self, name):
self.name = name
self.logger = logging.getLogger(name)
@abstractmethod
def run(self):
pass
class LLMSwarm(Swarm):
def __init__(self):
super().__init__("LLM Swarm")
self.llama_model = Llama(model_path=os.getenv('LLAMA_MODEL_PATH'))
self.gpt_j_tokenizer = AutoTokenizer.from_pretrained("EleutherAI/gpt-j-6B")
self.gpt_j_model = AutoModelForCausalLM.from_pretrained("EleutherAI/gpt-j-6B")
self.bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
self.bert_model = BertModel.from_pretrained('bert-base-uncased')
def run(self, input_text):
self.logger.info("Running multi-model LLM analysis")
# LLaMA analysis
llama_output = self.llama_model(input_text, max_tokens=100)
# GPT-J analysis
gpt_j_input = self.gpt_j_tokenizer(input_text, return_tensors="pt")
gpt_j_output = self.gpt_j_model.generate(**gpt_j_input, max_length=100)
gpt_j_text = self.gpt_j_tokenizer.decode(gpt_j_output[0])
# BERT analysis
bert_input = self.bert_tokenizer(input_text, return_tensors="pt")
bert_output = self.bert_model(**bert_input)
return {
"llama_analysis": llama_output,
"gpt_j_analysis": gpt_j_text,
"bert_embeddings": bert_output.last_hidden_state.mean(dim=1).detach().numpy()
}
class DataAnalyticsSwarm(Swarm):
def __init__(self):
super().__init__("Data Analytics Swarm")
self.data_points = 400
def run(self):
self.logger.info(f"Analyzing {self.data_points} data points")
# Placeholder implementation
return {"analyzed_points": self.data_points}
class RealEstateAnalyticsSwarm(Swarm):
def __init__(self):
super().__init__("Real Estate Analytics Swarm")
def run(self, data):
self.logger.info("Performing specialized real estate analytics")
# Placeholder implementation
return {"market_trends": "upward", "foreclosure_rate": 0.05}
class OptimizationSwarm(Swarm):
def __init__(self):
super().__init__("Optimization Swarm")
self.particles = []
def run(self):
best_position, best_score = self.optimize_search(20, 100, 2)
self.logger.info(f"Best position: {best_position}, Best score: {best_score}")
return best_position
def optimize_search(self, n_particles, n_iterations, dimensions):
self.particles = [Particle(dimensions) for _ in range(n_particles)]
global_best_position = np.copy(self.particles[0].position)
global_best_score = float('inf')
for _ in range(n_iterations):
for particle in self.particles:
score = self.objective_function(particle.position)
if score < particle.best_score:
particle.best_score = score
particle.best_position = np.copy(particle.position)
if score < global_best_score:
global_best_score = score
global_best_position = np.copy(particle.position)
for particle in self.particles:
particle.update_velocity(global_best_position)
particle.update_position()
return global_best_position, global_best_score
def objective_function(self, position):
location_score = position[0]
affordability_score = 1 / (1 + np.exp(-position[1]))
return location_score + affordability_score
class Particle:
def __init__(self, dimensions):
self.position = np.array([np.random.uniform(-10, 10) for _ in range(dimensions)])
self.velocity = np.zeros(dimensions)
self.best_position = np.copy(self.position)
self.best_score = float('inf')
def update_velocity(self, global_best_position, w=0.5, c1=1, c2=2):
r1, r2 = np.random.random(2)
cognitive_velocity = c1 * r1 * (self.best_position - self.position)
social_velocity = c2 * r2 * (global_best_position - self.position)
self.velocity = w * self.velocity + cognitive_velocity + social_velocity
def update_position(self):
self.position += self.velocity
class DataFetchSwarm(Swarm):
def __init__(self):
super().__init__("Data Fetch Swarm")
self.api_key = os.getenv('API_KEY')
self.base_url = "https://api.example.com/v1/properties"
def run(self, params):
return self.fetch_property_data(params)
def fetch_property_data(self, params):
headers = {'Authorization': f'Bearer {self.api_key}'}
try:
response = requests.get(self.base_url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
self.logger.error(f"Failed to fetch data: {e}")
return None
class VisualizationSwarm(Swarm):
def __init__(self):
super().__init__("Visualization Swarm")
def run(self, data):
self.logger.info("Creating data visualizations")
# Placeholder implementation
return {"charts": ["trend_chart", "comparison_chart"]}
class RealEstateOptimizer:
def __init__(self):
self.llm_swarm = LLMSwarm()
self.data_analytics_swarm = DataAnalyticsSwarm()
self.real_estate_analytics_swarm = RealEstateAnalyticsSwarm()
self.optimization_swarm = OptimizationSwarm()
self.data_fetch_swarm = DataFetchSwarm()
self.visualization_swarm = VisualizationSwarm()
def run(self):
logging.info("Starting Real Estate Optimizer")
# Run data analytics
analytics_results = self.data_analytics_swarm.run()
# Perform real estate analytics
re_analytics_results = self.real_estate_analytics_swarm.run(analytics_results)
# LLM analysis
llm_input = f"Analyze the real estate market trends: {re_analytics_results['market_trends']} with foreclosure rate: {re_analytics_results['foreclosure_rate']}"
llm_results = self.llm_swarm.run(llm_input)
# Optimize search parameters
best_params = self.optimization_swarm.run()
# Fetch property data
params = {
'location': best_params[0],
'affordability_score': best_params[1],
'property_type': 'multi-family',
'income_level': 'low'
}
property_data = self.data_fetch_swarm.run(params)
# Create visualizations
if property_data:
visualizations = self.visualization_swarm.run(property_data)
logging.info(f"Created visualizations: {visualizations}")
else:
logging.warning("No property data available for visualization")
logging.info("Real Estate Optimizer process completed")
logging.info(f"LLM Analysis Results: {llm_results}")
if __name__ == "__main__":
optimizer = RealEstateOptimizer()
optimizer.run()