Skip to content

Commit

Permalink
Saved topics to topics.json
Browse files Browse the repository at this point in the history
  • Loading branch information
elraphty committed Nov 1, 2024
1 parent 4c88156 commit 294274e
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 884 deletions.
3 changes: 0 additions & 3 deletions README.md

This file was deleted.

8 changes: 3 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from src.create_topics_csv import create_topics_csv
from src.topics_request import get_ops_topics, get_stack_topics
from src.topics_request import TopicsIndex

if __name__ == '__main__':
get_ops_topics()
get_stack_topics()
create_topics_csv()
topics_index = TopicsIndex()
topics_index.get_ops_topics()
40 changes: 0 additions & 40 deletions src/create_topics_csv.py

This file was deleted.

Empty file removed src/create_topics_curriculum.py
Empty file.
Empty file removed src/create_unique_topics.py
Empty file.
188 changes: 56 additions & 132 deletions src/topics_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,147 +8,71 @@
load_dotenv()


def get_ops_topics():
try:
headers = {"User-Agent": "Mozilla/5.0"}
url = "https://bitcoinops.org/topics.json"
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an HTTPError if the response status is 4xx or 5xx

# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()

# Filter only the required fields (title, tags, and links)
transformed_data = [
{
"title": item.get("title"),
"tags": item.get("categories"),
"link": item.get("optech_url"),
"synonyms": item.get("aliases")
}
for item in data
]
save_topics_to_json(transformed_data)
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")

except requests.exceptions.RequestException as e:
logger.error(f"Failed to download the repo: {e}")
except json.JSONDecodeError:
print("Failed to parse JSON response.")
except Exception as e:
print(f"An unexpected error occurred: {e}")


def get_stack_topics():
try:
headers = {"User-Agent": "Mozilla/5.0"}
url = "https://api.stackexchange.com/questions?site=bitcoin.stackexchange"
# Fetch the JSON data
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an HTTPError if the response status is 4xx or 5xx

# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()

# Filter only the required fields (title, link, and tags)
transformed_data = [
{
"title": item.get("title"),
"link": item.get("link"),
"tags": item.get("tags"),
"synonyms": None,
}
for item in data.get("items", [])
]

save_topics_to_json(transformed_data)
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")

except requests.exceptions.RequestException as e:
print(f"An error occurred while fetching data: {e}")
except json.JSONDecodeError:
print("Failed to parse JSON response.")
except Exception as e:
print(f"An unexpected error occurred: {e}")


def add_new_api_endpoint(url: str, title_key: str, link_key: str, tags_key, synonyms_key: str, topics_data_key: str):
try:
headers = {"User-Agent": "Mozilla/5.0"}
# Fetch the JSON data
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an HTTPError if the response status is 4xx or 5xx

# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()

# Filter only the required fields (title, link, and tags)
if(topics_data_key):
transformed_data = [
{
"title": item.get(title_key),
"link": item.get(link_key),
"tags": item.get(tags_key),
"synonyms": item.get(synonyms_key),
}
for item in data.get(topics_data_key, [])
]
else:
class TopicsIndex:
def get_ops_topics(self):
try:
headers = {"User-Agent": "Mozilla/5.0"}

url = "https://bitcoinops.org/topics.json"

response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an HTTPError if the response status is 4xx or 5xx

# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()

# Filter only the required fields (title, tags, and links)
transformed_data = [
{
"title": item.get(title_key),
"link": item.get(link_key),
"tags": item.get(tags_key),
"synonyms": item.get(synonyms_key),
"title": item.get("title"),
"categories": item.get("categories"),
"link": item.get("optech_url"),
"synonyms": item.get("aliases")
}
for item in data
]

save_topics_to_json(transformed_data)
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")

except requests.exceptions.RequestException as e:
print(f"An error occurred while fetching data: {e}")
except json.JSONDecodeError:
print("Failed to parse JSON response.")
except Exception as e:
print(f"An unexpected error occurred: {e}")


def save_topics_to_json(transformed_data: []):
# Check if the file already exists
if os.path.exists("topics.json"):
# Check if the file is empty
if os.path.getsize("topics.json") > 0:
# Load existing data if file is non-empty
with open("topics.json", "r") as file:
existing_data = json.load(file)
self.save_topics_to_json(transformed_data)
else:
print(f"Failed to retrieve data. Status code: {response.status_code}")

except requests.exceptions.RequestException as e:
logger.error(f"Failed to download the repo: {e}")
except json.JSONDecodeError:
print("Failed to parse JSON response.")
except Exception as e:
print(f"An unexpected error occurred: {e}")

@staticmethod
def save_topics_to_json(transformed_data: []):
# Check if the file already exists
if os.path.exists("topics.json"):
# Check if the file is empty
if os.path.getsize("topics.json") > 0:
# Load existing data if file is non-empty
with open("topics.json", "r") as file:
existing_data = json.load(file)
else:
# Initialize empty list if the file is empty
existing_data = []
else:
# Initialize empty list if the file is empty
existing_data = []
else:
existing_data = []

# Create a set of existing titles for quick duplicate check
existing_titles = {item["title"] for item in existing_data}
# Create a set of existing titles for quick duplicate check
existing_titles = {item["title"] for item in existing_data}

# Filter out items that already exist in topics.json based on title
new_data = [item for item in transformed_data if item["title"] not in existing_titles]

# Append the new data to existing data
existing_data.extend(new_data)

# Save the updated data back to topics.json
with open("topics.json", "w") as file:
json.dump(existing_data, file, indent=4)

# Filter out items that already exist in topics.json based on title
new_data = [item for item in transformed_data if item["title"] not in existing_titles]
print("New data has been added to 'topics.json'")

# Append the new data to existing data
existing_data.extend(new_data)

# Save the updated data back to topics.json
with open("topics.json", "w") as file:
json.dump(existing_data, file, indent=4)

print("New data has been added to 'topics.json'")
Loading

0 comments on commit 294274e

Please sign in to comment.