diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml new file mode 100644 index 0000000..7fecd28 --- /dev/null +++ b/.github/workflows/docs.yaml @@ -0,0 +1,49 @@ +name: docs + +permissions: + contents: write + +on: + workflow_dispatch: + push: + branches: + - main + paths: + - 'justfile' + - 'website/**' + - 'dev-requirements.txt' + - '.github/workflows/docs.yaml' + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + + - uses: extractions/setup-just@v1 + + - uses: actions/setup-python@v4 + with: + python-version: 3.12 + + - uses: quarto-dev/quarto-actions/setup@v2 + + - name: install requirements + run: | + pip install uv + just setup + + - name: build site + run: | + . .venv/bin/activate + just docs-build + + - name: deploy + uses: peaceiris/actions-gh-pages@v3 + if: github.ref == 'refs/heads/main' + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: website/_output diff --git a/.github/workflows/etl.yaml b/.github/workflows/etl.yaml new file mode 100644 index 0000000..2bbfc5a --- /dev/null +++ b/.github/workflows/etl.yaml @@ -0,0 +1,44 @@ +name: etl + +on: + workflow_dispatch: + schedule: + - cron: "0 9 * * *" + +jobs: + + ingest-etl: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + + - uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCLOUD_JSON }} + + - uses: extractions/setup-just@v1 + + - uses: actions/setup-python@v4 + with: + python-version: 3.12 + + - uses: quarto-dev/quarto-actions/setup@v2 + + - name: install requirements + run: | + pip install uv + just setup + + - name: ingest and etl + run: | + . .venv/bin/activate + ia ingest + ia etl + env: + BQ_PROJECT_ID: voltrondata-demo + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + ZULIP_KEY: ${{ secrets.ZULIP_KEY }} + GOAT_TOKEN: ${{ secrets.GOAT_TOKEN }} diff --git a/.gitignore b/.gitignore index 693eddb..441a6cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,14 @@ -.env .venv -venv -data -target -.streamlit/secrets.toml -*.ddb* +.env +datalake + +.quarto +pres.html +pres.ipynb +pres_files + +_output + +dist + +tmp* diff --git a/.streamlit/config.toml b/.streamlit/config.toml deleted file mode 100644 index adc7490..0000000 --- a/.streamlit/config.toml +++ /dev/null @@ -1,16 +0,0 @@ -[theme] - -primaryColor="#c61b6e" -backgroundColor="#330066" -secondaryBackgroundColor="#440077" -textColor="#ffffff" -font="sans serif" - -#[theme] -# -#primaryColor="#d33682" -#backgroundColor="#002b36" -#secondaryBackgroundColor="#586e75" -#textColor="#fafafa" -#font="sans serif" - diff --git a/config.toml b/config.toml deleted file mode 100644 index a07a66d..0000000 --- a/config.toml +++ /dev/null @@ -1,38 +0,0 @@ -[app] -#database = "data/app.ddb" -database = "md:ibis_analytics" - -[eda] -database = "md:ibis_analytics" -#database = "data/data.ddb" - -[ingest.pypi] -packages = [ - "ibis-framework", - "ibis-examples", - "ibis-substrait", - "ibisml", - "ibis-birdbrain", -] -[ingest.github] -repos = [ - "ibis-project/ibis", - "ibis-project/ibis-examples", - "ibis-project/ibis-substrait", - "ibis-project/ibisml", - "ibis-project/ibis-birdbrain", -] -endpoints = [ - "repo", - "stargazers", - "subscribers", - "commits", - "releases", - "forks", - "issues", - "contributors", -] -[ingest.zulip] -url = "https://ibis-project.zulipchat.com" -[ingest.docs] -url = "https://ibis.goatcounter.com" diff --git a/dag/__init__.py b/dag/__init__.py deleted file mode 100644 index eaa9925..0000000 --- a/dag/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -# imports -import ibis - -import logging as log - -from dagster import Definitions - -from dag.jobs import jobs -from dag.assets import assets -from dag.resources import resources -from dag import functions as f - -# configure logging -log.basicConfig( - level=log.INFO, -) - -# config -backend = "duckdb" -ibis.set_backend(backend) - -defs = Definitions( - assets=assets, - resources=resources, - jobs=jobs, -) diff --git a/dag/assets/__init__.py b/dag/assets/__init__.py deleted file mode 100644 index 6bd870f..0000000 --- a/dag/assets/__init__.py +++ /dev/null @@ -1,46 +0,0 @@ -# imports -from dagster import load_assets_from_modules - -from dag.assets.extract import docs as extract_docs -from dag.assets.extract import pypi as extract_pypi -from dag.assets.extract import zulip as extract_zulip -from dag.assets.extract import github as extract_github -from dag.assets.extract import backends as extract_backends - -from dag.assets.transform import docs as transform_docs -from dag.assets.transform import pypi as transform_pypi -from dag.assets.transform import zulip as transform_zulip -from dag.assets.transform import github as transform_github -from dag.assets.transform import backends as transform_backends - -from dag.assets.load import docs as load_docs -from dag.assets.load import pypi as load_pypi -from dag.assets.load import zulip as load_zulip -from dag.assets.load import github as load_github -from dag.assets.load import backends as load_backends - -from dag.constants import EXTRACT, TRANSFORM, LOAD - -# load assets -extract_modules = [ - # extract_docs, - extract_pypi, - extract_zulip, - extract_github, - extract_backends, -] -extract_assets = load_assets_from_modules(extract_modules, group_name=EXTRACT) - -transform_modules = [ - # transform_docs, - transform_pypi, - transform_zulip, - transform_github, - transform_backends, -] -transform_assets = load_assets_from_modules(transform_modules, group_name=TRANSFORM) - -load_modules = [load_pypi, load_zulip, load_github, load_backends] # load_docs -load_assets = load_assets_from_modules(load_modules, group_name=LOAD) - -assets = [*extract_assets, *transform_assets, *load_assets] diff --git a/dag/assets/extract/backends.py b/dag/assets/extract/backends.py deleted file mode 100644 index d4745c4..0000000 --- a/dag/assets/extract/backends.py +++ /dev/null @@ -1,20 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def extract_backends(): - """ - Extract the backend data. - """ - backends = ibis.util.backend_entry_points() - backend_data = { - "backends": [[b.name for b in backends]], - "num_backends": len(backends), - "ingested_at": f.now(), - } - return ibis.memtable(backend_data) diff --git a/dag/assets/extract/docs.py b/dag/assets/extract/docs.py deleted file mode 100644 index f525a10..0000000 --- a/dag/assets/extract/docs.py +++ /dev/null @@ -1,15 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def extract_docs(): - """ - Extract the ingested docs data. - """ - docs = f.clean_data(ibis.read_csv("data/ingest/docs/goatcounter.csv.gz")) - return docs diff --git a/dag/assets/extract/github.py b/dag/assets/extract/github.py deleted file mode 100644 index f825031..0000000 --- a/dag/assets/extract/github.py +++ /dev/null @@ -1,72 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def extract_stars(): - """ - Extract the ingested GitHub stars data. - """ - stars = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/stargazers.*.json") - ) - return stars - - -@dagster.asset -def extract_issues(): - """ - Extract the ingested GitHub issues data. - """ - issues = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/issues.*.json") - ) - return issues - - -@dagster.asset -def extract_pulls(): - """ - Extract the ingested GitHub pull requests data. - """ - pulls = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/pullRequests.*.json") - ) - return pulls - - -@dagster.asset -def extract_forks(): - """ - Extract the ingested GitHub forks data. - """ - forks = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/forks.*.json") - ) - return forks - - -@dagster.asset -def extract_watchers(): - """ - Extract the ingested GitHub watchers data. - """ - watchers = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/watchers.*.json") - ) - return watchers - - -@dagster.asset -def extract_commits(): - """ - Extract the ingested GitHub git commits data. - """ - commits = f.clean_data( - ibis.read_json("data/ingest/github/ibis-project/ibis/commits.*.json") - ) - return commits diff --git a/dag/assets/extract/pypi.py b/dag/assets/extract/pypi.py deleted file mode 100644 index 006b000..0000000 --- a/dag/assets/extract/pypi.py +++ /dev/null @@ -1,17 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def extract_downloads(): - """ - Extract the ingested PyPI downloads data. - """ - downloads = f.clean_data( - ibis.read_parquet("data/ingest/pypi/ibis-framework/*.parquet") - ) - return downloads diff --git a/dag/assets/extract/zulip.py b/dag/assets/extract/zulip.py deleted file mode 100644 index 0861fd0..0000000 --- a/dag/assets/extract/zulip.py +++ /dev/null @@ -1,24 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def extract_zulip_members(): - """ - Extract the ingested Zulip members data. - """ - members = f.clean_data(ibis.read_json("data/ingest/zulip/members.json")) - return members - - -@dagster.asset -def extract_zulip_messages(): - """ - Extract the ingested Zulip messages data. - """ - messages = f.clean_data(ibis.read_json("data/ingest/zulip/messages.json")) - return messages diff --git a/dag/assets/load/__init__.py b/dag/assets/load/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dag/assets/load/backends.py b/dag/assets/load/backends.py deleted file mode 100644 index bc781af..0000000 --- a/dag/assets/load/backends.py +++ /dev/null @@ -1,14 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def load_backends(transform_backends): - """ - Finalize the backend data. - """ - return transform_backends diff --git a/dag/assets/load/docs.py b/dag/assets/load/docs.py deleted file mode 100644 index ee1b190..0000000 --- a/dag/assets/load/docs.py +++ /dev/null @@ -1,14 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def load_docs(transform_docs): - """ - Finalize the docs data. - """ - return transform_docs diff --git a/dag/assets/load/github.py b/dag/assets/load/github.py deleted file mode 100644 index c62709a..0000000 --- a/dag/assets/load/github.py +++ /dev/null @@ -1,42 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def load_stars(transform_stars): - """Finalize the GitHub stars data.""" - return transform_stars - - -@dagster.asset -def load_issues(transform_issues): - """Finalize the GitHub issues data.""" - return transform_issues - - -@dagster.asset -def load_pulls(transform_pulls): - """Finalize the GitHub pull requests data.""" - return transform_pulls - - -@dagster.asset -def load_forks(transform_forks): - """Finalize the GitHub forks data.""" - return transform_forks - - -@dagster.asset -def load_watchers(transform_watchers): - """Finalize the GitHub watchers data.""" - return transform_watchers - - -@dagster.asset -def load_commits(transform_commits): - """Finalize the GitHub git commits data.""" - return transform_commits diff --git a/dag/assets/load/pypi.py b/dag/assets/load/pypi.py deleted file mode 100644 index a1dcd0a..0000000 --- a/dag/assets/load/pypi.py +++ /dev/null @@ -1,14 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def load_downloads(transform_downloads): - """ - Finalize the PyPI downloads data. - """ - return transform_downloads diff --git a/dag/assets/load/zulip.py b/dag/assets/load/zulip.py deleted file mode 100644 index e376912..0000000 --- a/dag/assets/load/zulip.py +++ /dev/null @@ -1,22 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def load_zulip_members(transform_zulip_members): - """ - Finalize the Zulip members data. - """ - return transform_zulip_members - - -@dagster.asset -def load_zulip_messages(transform_zulip_messages): - """ - Finalize the Zulip messages data. - """ - return transform_zulip_messages diff --git a/dag/assets/transform/__init__.py b/dag/assets/transform/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dag/assets/transform/backends.py b/dag/assets/transform/backends.py deleted file mode 100644 index 1ada6c4..0000000 --- a/dag/assets/transform/backends.py +++ /dev/null @@ -1,14 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def transform_backends(extract_backends): - """ - Transform the backend data. - """ - return extract_backends diff --git a/dag/assets/transform/docs.py b/dag/assets/transform/docs.py deleted file mode 100644 index 8b4231b..0000000 --- a/dag/assets/transform/docs.py +++ /dev/null @@ -1,15 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def transform_docs(extract_docs): - """ - Transform the docs data. - """ - docs = extract_docs.rename({"path": "2_path", "timestamp": "date"}) - return docs diff --git a/dag/assets/transform/github.py b/dag/assets/transform/github.py deleted file mode 100644 index ab81021..0000000 --- a/dag/assets/transform/github.py +++ /dev/null @@ -1,114 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def transform_stars(extract_stars): - """Transform the GitHub stars data.""" - stars = f.clean_data(extract_stars.unpack("node")) - stars = stars.order_by(ibis._.starred_at.desc()) - stars = stars.mutate(ibis._.company.fillna("Unknown").name("company")) - stars = stars.mutate(total_stars=ibis._.count().over(rows=(0, None))) - return stars - - -@dagster.asset -def transform_issues(extract_issues): - """Transform the GitHub issues data.""" - issues = f.clean_data(extract_issues.unpack("node").unpack("author")) - issues = issues.order_by(ibis._.created_at.desc()) - issues = issues.mutate((ibis._.closed_at != None).name("is_closed")) - issues = issues.mutate(ibis._.count().over(rows=(0, None)).name("total_issues")) - issue_state = ibis.case().when(issues.is_closed, "closed").else_("open").end() - issues = issues.mutate(issue_state.name("state")) - - # add first issues by login - issues = ( - issues.mutate( - is_first_issue=( - ibis.row_number().over( - ibis.window(group_by="login", order_by=issues["created_at"]) - ) - == 0 - ) - ) - .relocate("is_first_issue", "login", "created_at") - .order_by(issues["created_at"].desc()) - ) - return issues - - -@dagster.asset -def transform_pulls(extract_pulls): - """Transform the GitHub pull requests data.""" - pulls = f.clean_data(extract_pulls.unpack("node").unpack("author")) - pulls = pulls.order_by(ibis._.created_at.desc()) - pulls = pulls.mutate((ibis._.merged_at != None).name("is_merged")) - pulls = pulls.mutate((ibis._.closed_at != None).name("is_closed")) - pulls = pulls.mutate(total_pulls=ibis._.count().over(rows=(0, None))) - # to remove bots - # pulls = pulls.filter( - # ~( - # (ibis._.login == "ibis-squawk-bot") - # | (ibis._.login == "pre-commit-ci") - # | (ibis._.login == "renovate") - # ) - # ) - pull_state = ( - ibis.case() - .when(pulls.is_merged, "merged") - .when(pulls.is_closed, "closed") - .else_("open") - .end() - ) - pulls = pulls.mutate(pull_state.name("state")) - pulls = pulls.mutate( - merged_at=ibis._.merged_at.cast("timestamp") - ) # TODO: temporary fix - - # add first pull by login - pulls = ( - pulls.mutate( - is_first_pull=( - ibis.row_number().over( - ibis.window(group_by="login", order_by=pulls["created_at"]) - ) - == 0 - ) - ) - .relocate("is_first_pull", "login", "created_at") - .order_by(pulls["created_at"].desc()) - ) - return pulls - - -@dagster.asset -def transform_forks(extract_forks): - """Transform the GitHub forks data.""" - forks = f.clean_data(extract_forks.unpack("node").unpack("owner")) - forks = forks.order_by(ibis._.created_at.desc()) - forks = forks.mutate(total_forks=ibis._.count().over(rows=(0, None))) - return forks - - -@dagster.asset -def transform_watchers(extract_watchers): - """Transform the GitHub watchers data.""" - watchers = f.clean_data(extract_watchers.unpack("node")) - watchers = watchers.order_by(ibis._.updated_at.desc()) - watchers = watchers.mutate(total_watchers=ibis._.count().over(rows=(0, None))) - watchers = watchers.order_by(ibis._.updated_at.desc()) - return watchers - - -@dagster.asset -def transform_commits(extract_commits): - """Transform the GitHub git commits data.""" - commits = f.clean_data(extract_commits.unpack("node").unpack("author")) - commits = commits.order_by(ibis._.committed_date.desc()) - commits = commits.mutate(total_commits=ibis._.count().over(rows=(0, None))) - return commits diff --git a/dag/assets/transform/pypi.py b/dag/assets/transform/pypi.py deleted file mode 100644 index 3f6c45c..0000000 --- a/dag/assets/transform/pypi.py +++ /dev/null @@ -1,56 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def transform_downloads(extract_downloads): - """ - Transform the PyPI downloads data. - """ - downloads = f.clean_data( - extract_downloads.drop("project").unpack("file").unpack("details") - ) - downloads = downloads.mutate( - major_minor_patch=f.clean_version(downloads["version"], patch=True), - major_minor=f.clean_version(downloads["version"], patch=False).cast("float"), - ) - downloads = downloads.rename( - { - "version_raw": "version", - "version": "major_minor_patch", - } - ) - downloads = ( - downloads.group_by( - [ - ibis._.timestamp.truncate("D").name("timestamp"), - ibis._.country_code, - ibis._.version, - ibis._.python, - ibis._.system["name"].name("system"), - ] - ) - .agg( - ibis._.count().name("downloads"), - ) - .order_by(ibis._.timestamp.desc()) - .mutate( - ibis._.downloads.sum() - .over( - rows=(0, None), - group_by=["country_code", "version", "python", "system"], - order_by=ibis._.timestamp.desc(), - ) - .name("total_downloads") - ) - .order_by(ibis._.timestamp.desc()) - ) - downloads = downloads.mutate(ibis._["python"].fillna("").name("python_full")) - downloads = downloads.mutate( - f.clean_version(downloads["python_full"], patch=False).name("python") - ) - return downloads diff --git a/dag/assets/transform/zulip.py b/dag/assets/transform/zulip.py deleted file mode 100644 index 58819f6..0000000 --- a/dag/assets/transform/zulip.py +++ /dev/null @@ -1,48 +0,0 @@ -# imports -import ibis -import dagster - -from dag import functions as f - - -# assets -@dagster.asset -def transform_zulip_members(extract_zulip_members): - """ - Transform the Zulip members data. - """ - members = extract_zulip_members.mutate( - date_joined=ibis._.date_joined.cast("timestamp") - ) - members = members.filter(ibis._.is_bot == False) - members = members.order_by(ibis._.date_joined.desc()) - members = members.relocate("full_name", "date_joined", "timezone") - members = members.mutate(total_members=ibis._.count().over(rows=(0, None))) - return members - - -@dagster.asset -def transform_zulip_messages(extract_zulip_messages): - """ - Transform the Zulip messages data. - """ - messages = extract_zulip_messages.mutate( - timestamp=ibis._.timestamp.cast("timestamp"), - last_edit_timestamp=ibis._.last_edit_timestamp.cast("timestamp"), - ) - # TODO: either automatically filter out bot messages or do something better here - messages = messages.filter( - ibis._.stream_id != 405931 - ) # filter out the github-issues stream for now - messages = messages.order_by( - ibis._.timestamp.desc(), - ) - messages = messages.relocate( - "sender_full_name", - "display_recipient", - "subject", - "timestamp", - "last_edit_timestamp", - ) - messages = messages.mutate(total_messages=ibis._.count().over(rows=(0, None))) - return messages diff --git a/dag/constants.py b/dag/constants.py deleted file mode 100644 index a42ce31..0000000 --- a/dag/constants.py +++ /dev/null @@ -1,4 +0,0 @@ -# constants -EXTRACT = "extract" -TRANSFORM = "transform" -LOAD = "load" diff --git a/dag/deploy.py b/dag/deploy.py deleted file mode 100644 index d99a464..0000000 --- a/dag/deploy.py +++ /dev/null @@ -1,51 +0,0 @@ -# imports -import os -import toml -import ibis -import fnmatch - -import logging as log - -from datetime import datetime, timedelta, date - -import dag.functions as f - - -def main(): - deploy() - - -def deploy() -> None: - """ - Deploy the data. - """ - # constants - path = "data/system/duckdb" - - # configure logging - log.basicConfig( - level=log.INFO, - ) - - # load config - config = toml.load("config.toml")["app"] - log.info(f"Deploying to {config['database']}...") - - # connect to the database - target = ibis.duckdb.connect(f"{config['database']}") - - for root, dirs, files in os.walk(path): - for file in files: - if fnmatch.fnmatch(file, "load_*.ddb"): - full_path = os.path.join(root, file) - con = ibis.duckdb.connect(full_path) - tablename = file.replace(".ddb", "") - table = con.table(tablename) - tablename = tablename.replace("load_", "") - - log.info(f"\tDeploying {tablename} to {config['database']}...") - target.create_table(tablename, table.to_pyarrow(), overwrite=True) - - -if __name__ == "__main__": - main() diff --git a/dag/functions.py b/dag/functions.py deleted file mode 100644 index 7f802cf..0000000 --- a/dag/functions.py +++ /dev/null @@ -1,37 +0,0 @@ -# imports -import re -import ibis -import datetime - -import ibis.selectors as s - - -# udfs -@ibis.udf.scalar.python -def clean_version(version: str, patch: bool = True) -> str: - pattern = r"(\d+\.\d+\.\d+)" if patch else r"(\d+\.\d+)" - match = re.search(pattern, version) - if match: - return match.group(1) - else: - return version - - -# functions -def now(): - return datetime.datetime.now() - - -def today(): - return now().date() - - -def clean_data(t): - t = t.rename("snake_case") - # t = t.mutate(s.across(s.of_type("timestamp"), lambda x: x.cast("timestamp('')"))) - return t - - -def add_ingested_at(t, ingested_at=now()): - t = t.mutate(ingested_at=ingested_at).relocate("ingested_at") - return t diff --git a/dag/jobs.py b/dag/jobs.py deleted file mode 100644 index d4581a3..0000000 --- a/dag/jobs.py +++ /dev/null @@ -1,13 +0,0 @@ -from dagster import define_asset_job, AssetSelection - -from dag.constants import EXTRACT, TRANSFORM, LOAD - -all_job = define_asset_job("all_assets") -extract_job = define_asset_job( - "extract_assets", selection=AssetSelection.groups(EXTRACT) -) -transform_job = define_asset_job( - "transform_assets", selection=AssetSelection.groups(TRANSFORM) -) -load_job = define_asset_job("load_assets", selection=AssetSelection.groups(LOAD)) -jobs = [all_job, extract_job, transform_job, load_job] diff --git a/dag/postprocess.py b/dag/postprocess.py deleted file mode 100644 index 4345216..0000000 --- a/dag/postprocess.py +++ /dev/null @@ -1,62 +0,0 @@ -# imports -import os -import ibis -import shutil -import fnmatch - -import logging as log - -from datetime import datetime, timedelta, date - -## local imports -import functions as f - - -def main(): - postprocess() - - -def postprocess() -> None: - """ - Postprocess the data. - """ - # configure logger - log.basicConfig( - level=log.INFO, - ) - - # backup loaded data as Delta Lake tables and a DuckDB database - source_path = "data/system/duckdb" - target_path = "data/data.ddb" - app_path = "data/app.ddb" - - os.makedirs(source_path, exist_ok=True) - - target = ibis.duckdb.connect(target_path) - - ingested_at = f.now() - - for root, dirs, files in os.walk(source_path): - for file in files: - if fnmatch.fnmatch(file, "load_*.ddb"): - full_path = os.path.join(root, file) - con = ibis.duckdb.connect(full_path) - tablename = file.replace(".ddb", "") - table = con.table(tablename) - tablename = tablename.replace("load_", "") - - log.info(f"Backing up {tablename} to {target_path}...") - target.create_table(tablename, table.to_pyarrow(), overwrite=True) - - # TODO: disabling for now, causing issues, not used anywhere - # log.info(f"Backing up {tablename} to data/backup/{tablename}.delta...") - # table.mutate(ingested_at=ingested_at).to_delta( - # f"data/backup/{tablename}.delta", mode="overwrite" - # ) - - log.info(f"Copying {target_path} to {app_path}...") - shutil.copy(target_path, app_path) - - -if __name__ == "__main__": - main() diff --git a/dag/resources/__init__.py b/dag/resources/__init__.py deleted file mode 100644 index 477ab27..0000000 --- a/dag/resources/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# imports -from dag.resources import table_io_managers - -# load resources -# resources = {"io_manager": table_io_managers.DeltaIOManager()} -resources = {"io_manager": table_io_managers.DuckDBIOManager()} diff --git a/dag/resources/table_io_managers.py b/dag/resources/table_io_managers.py deleted file mode 100644 index 8b9f552..0000000 --- a/dag/resources/table_io_managers.py +++ /dev/null @@ -1,92 +0,0 @@ -import os -import ibis - -from dagster import ConfigurableIOManager - - -class ParquetIOManager(ConfigurableIOManager): - """ - Manage tables as parquet files. - """ - - extension: str = "parquet" - base_path: str = os.path.join("data", "system", "parquet") - - def handle_output(self, context, obj): - dirname, filename = self._get_paths(context) - os.makedirs(dirname, exist_ok=True) - output_path = os.path.join(dirname, filename) - obj.to_parquet(output_path) - - def load_input(self, context): - dirname, filename = self._get_paths(context) - input_path = os.path.join(dirname, filename) - return ibis.read_parquet(input_path) - - def _get_paths(self, context): - # group_name = context.step_context.job_def.asset_layer.assets_def_for_asset( - # context.asset_key - # ).group_names_by_key[context.asset_key] - dirname = os.path.join(self.base_path, *context.asset_key.path[:-1]) - filename = f"{context.asset_key.path[-1]}.{self.extension}" - return dirname, filename - - -class DeltaIOManager(ConfigurableIOManager): - """ - Manage tables as delta tables. - """ - - extension: str = "delta" - base_path: str = os.path.join("data", "system", "delta") - delta_write_mode: str = "overwrite" - - def handle_output(self, context, obj): - dirname, filename = self._get_paths(context) - os.makedirs(dirname, exist_ok=True) - output_path = os.path.join(dirname, filename) - obj.to_delta(output_path, mode=self.delta_write_mode) - - def load_input(self, context): - dirname, filename = self._get_paths(context) - input_path = os.path.join(dirname, filename) - return ibis.read_delta(input_path) - - def _get_paths(self, context): - # group_name = context.step_context.job_def.asset_layer.assets_def_for_asset( - # context.asset_key - # ).group_names_by_key[context.asset_key] - dirname = os.path.join(self.base_path, *context.asset_key.path) - filename = f"{context.asset_key.path[-1]}.{self.extension}" - return dirname, filename - - -class DuckDBIOManager(ConfigurableIOManager): - """ - Manage tables as duckdb files. - """ - - extension: str = "ddb" - base_path: str = os.path.join("data", "system", "duckdb") - - def handle_output(self, context, obj): - dirname, filename = self._get_paths(context) - os.makedirs(dirname, exist_ok=True) - output_path = os.path.join(dirname, filename) - con = ibis.duckdb.connect(output_path) - con.create_table(context.asset_key.path[-1], obj.to_pyarrow(), overwrite=True) - - def load_input(self, context): - dirname, filename = self._get_paths(context) - input_path = os.path.join(dirname, filename) - con = ibis.duckdb.connect(input_path) - return con.table(context.asset_key.path[-1]) - - def _get_paths(self, context): - # group_name = context.step_context.job_def.asset_layer.assets_def_for_asset( - # context.asset_key - # ).group_names_by_key[context.asset_key] - # group_name = context.assets_def.group_names_by_key[context.asset_key] - dirname = os.path.join(self.base_path, *context.asset_key.path) - filename = f"{context.asset_key.path[-1]}.{self.extension}" - return dirname, filename diff --git a/dashboard.py b/dashboard.py new file mode 100644 index 0000000..8b79f5b --- /dev/null +++ b/dashboard.py @@ -0,0 +1,604 @@ +import ibis +import plotly.express as px + +from shiny import reactive, render +from shinyswatch import theme +from shinywidgets import render_plotly +from shiny.express import input, ui + +from datetime import datetime, timedelta + +from ibis_analytics.metrics import ( + pulls_t, + stars_t, + forks_t, + issues_t, + commits_t, + downloads_t, + docs_t, + zulip_members_t, + zulip_messages_t, +) +from ibis_analytics.config import GH_REPO, PYPI_PACKAGE + + +# dark themes +px.defaults.template = "plotly_dark" +ui.page_opts(theme=theme.darkly) + +# page options +ui.page_opts( + title="Ibis analytics", + fillable=False, + full_width=True, +) + +# add page title and sidebar +with ui.sidebar(open="desktop"): + ui.input_date_range( + "date_range", + "Date range", + start=(datetime.now() - timedelta(days=28)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + ui.input_action_button("last_7d", "Last 7 days") + ui.input_action_button("last_14d", "Last 14 days") + ui.input_action_button("last_28d", "Last 28 days") + ui.input_action_button("last_91d", "Last 91 days") + ui.input_action_button("last_182d", "Last 182 days") + ui.input_action_button("last_365d", "Last 365 days") + ui.input_action_button("last_730d", "Last 730 days") + ui.input_action_button("last_all", "All available data") + + with ui.value_box(full_screen=True): + "Total days in range" + + @render.express + def total_days(): + start_date, end_date = date_range() + days = (end_date - start_date).days + f"{days:,}" + + +with ui.nav_panel("GitHub metrics"): + f"GitHub repo: {GH_REPO}" + with ui.layout_columns(): + with ui.value_box(): + "Total stars" + + @render.express + def total_stars(): + val = stars_data().count().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(): + "Total pulls" + + @render.express + def total_pulls(): + val = pulls_data().count().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(): + "Total issues" + + @render.express + def total_issues(): + val = issues_data().count().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(): + "Total forks" + + @render.express + def total_forks(): + val = forks_data().count().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(): + "Total commits" + + @render.express + def total_commits(): + val = commits_data().count().to_pyarrow().as_py() + f"{val:,}" + + with ui.layout_columns(): + with ui.card(full_screen=True): + "Total stars" + + @render_plotly + def stars_line(): + t = stars_data().order_by("starred_at") + + c = px.line( + t, + x="starred_at", + y="total_stars", + ) + + return c + + with ui.card(full_screen=True): + "Rolling 28d stars" + + @render_plotly + def stars_roll(): + t = stars_t + + t = ( + t.mutate(starred_at=t["starred_at"].truncate("D")) + .group_by("starred_at") + .agg(stars=ibis._.count()) + ) + t = t.select( + timestamp="starred_at", + rolling_stars=ibis._["stars"] + .sum() + .over( + ibis.window(order_by="starred_at", preceding=28, following=0) + ), + ).order_by("timestamp") + + c = px.line( + t, + x="timestamp", + y="rolling_stars", + range_x=[str(x) for x in date_range()], + ) + + return c + + with ui.card(full_screen=True): + "Stars" + + with ui.card_header(class_="d-flex justify-content-between align-items-center"): + with ui.layout_columns(): + ui.input_select( + "truncate_by_stars", + "Truncate to:", + ["D", "W", "M", "Y"], + selected="D", + ) + ui.input_select( + "group_by_stars", + "Group by:", + [None, "company"], + selected=None, + ) + + @render_plotly + def stars_flex(): + truncate_by = input.truncate_by_stars() + group_by = input.group_by_stars() + + t = stars_data().order_by("starred_at") + t = t.mutate(starred_at=t["starred_at"].truncate(truncate_by)) + t = t.group_by(["starred_at", group_by] if group_by else "starred_at").agg( + stars=ibis._.count() + ) + if group_by: + t = t.mutate(company=t["company"][:16]) + t = t.order_by("starred_at", ibis.desc("stars")) + + c = px.bar( + t, + x="starred_at", + y="stars", + color="company" if group_by else None, + barmode="stack", + ) + + return c + + +with ui.nav_panel("PyPI metrics"): + f"PyPI package: {PYPI_PACKAGE}" + with ui.layout_columns(): + with ui.value_box(full_screen=True): + "Total downloads" + + @render.express + def total_downloads(): + val = downloads_data()["count"].sum().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(full_screen=True): + "Total versions" + + @render.express + def total_versions(): + val = ( + downloads_data() + .distinct(on="version")["version"] + .to_pyarrow() + .to_pylist() + ) + f"{len(val):,}" + + with ui.card(full_screen=True): + "Downloads by version" + + @render.data_frame + def downloads_by_version(): + t = downloads_data() + + t = ( + t.mutate( + version=t["version"].split(".")[0], + ) + .filter(~ibis._["version"].startswith("v")) + .group_by("version") + .agg(downloads=ibis._["count"].sum()) + .order_by(ibis.desc("downloads")) + ) + + return render.DataGrid(t.to_polars()) + + with ui.layout_columns(): + with ui.card(full_screen=True): + "Rolling 28d downloads" + + @render_plotly + def downloads_roll(): + t = downloads_t + min_date, max_date = input.date_range() + + t = t.mutate( + timestamp=t["date"].cast("timestamp"), + ) + t = t.group_by("timestamp").agg(downloads=ibis._["count"].sum()) + t = ( + t.select( + "timestamp", + rolling_downloads=ibis._["downloads"] + .sum() + .over( + ibis.window( + order_by="timestamp", + preceding=28, + following=0, + ) + ), + ) + .filter(t["timestamp"] >= min_date, t["timestamp"] <= max_date) + .order_by("timestamp") + ) + + c = px.line( + t, + x="timestamp", + y="rolling_downloads", + ) + + return c + + with ui.card(full_screen=True): + "Rolling 28d downloads by version" + + with ui.card_header( + class_="d-flex justify-content-between align-items-center" + ): + with ui.layout_columns(): + ui.input_select( + "version_style", + "Version style", + ["major", "major.minor", "major.minor.patch"], + selected="major", + ) + + @render_plotly + def downloads_by_version_roll(): + version_style = input.version_style() + min_date, max_date = input.date_range() + + t = downloads_t + + t = t.mutate( + version=t["version"].split(".")[0] + if version_style == "major" + else t["version"].split(".")[0] + "." + t["version"].split(".")[1] + if version_style == "major.minor" + else t["version"], + timestamp=t["date"].cast("timestamp"), + ) + t = t.group_by("timestamp", "version").agg( + downloads=ibis._["count"].sum() + ) + t = t.filter(~t["version"].startswith("v")) + t = t.filter(~t["version"].contains("dev")) + t = ( + t.select( + "timestamp", + "version", + rolling_downloads=ibis._["downloads"] + .sum() + .over( + ibis.window( + order_by="timestamp", + group_by="version", + preceding=28, + following=0, + ) + ), + ) + .filter(t["timestamp"] >= min_date, t["timestamp"] <= max_date) + .order_by("timestamp") + ) + + c = px.line( + t, + x="timestamp", + y="rolling_downloads", + color="version", + category_orders={ + "version": reversed( + sorted( + t.distinct(on="version")["version"] + .to_pyarrow() + .to_pylist(), + # smartly convert string to float for sorting + # key=lambda x: int(x), + key=lambda x: tuple(map(float, x.split("."))), + ) + ) + }, + ) + + return c + + with ui.card(full_screen=True): + "Downloads" + + with ui.card_header(class_="d-flex justify-content-between align-items-center"): + with ui.layout_columns(): + ui.input_select( + "group_by_downloads", + "Group by:", + [None, "version", "country_code", "installer", "type"], + selected="version", + ) + + @render_plotly + def downloads_flex(): + group_by = input.group_by_downloads() + + t = downloads_data() + t = t.mutate(timestamp=t["date"].cast("timestamp")) + t = t.filter(~t["version"].startswith("v")) + t = t.mutate(version=t["version"].split(".")[0]) + t = t.group_by(["timestamp", group_by] if group_by else "timestamp").agg( + downloads=ibis._["count"].sum() + ) + t = t.order_by("timestamp", ibis.desc("downloads")) + + c = px.bar( + t, + x="timestamp", + y="downloads", + color=group_by if group_by else None, + barmode="stack", + category_orders={ + "version": reversed( + sorted( + t.distinct(on="version")["version"] + .to_pyarrow() + .to_pylist(), + key=lambda x: int(x), + ) + ) + } + if group_by == "version" + else None, + ) + + return c + + +with ui.nav_panel("Docs metrics"): + with ui.layout_columns(): + with ui.value_box(full_screen=True): + "Total docs" + + @render.express + def total_docs(): + val = docs_t.count().to_pyarrow().as_py() + f"{val:,}" + + with ui.card(full_screen=True): + "Docs" + + @render.data_frame + def docs_grid(): + t = docs_t + + return render.DataGrid(t.limit(10_000).to_polars()) + + +with ui.nav_panel("Zulip metrics"): + with ui.layout_columns(): + with ui.value_box(full_screen=True): + "Total messages" + + @render.express + def total_messages(): + val = zulip_messages_t.count().to_pyarrow().as_py() + f"{val:,}" + + with ui.value_box(full_screen=True): + "Total members" + + @render.express + def total_members(): + val = zulip_members_t.count().to_pyarrow().as_py() + f"{val:,}" + + +# reactive calculations and effects +@reactive.calc +def date_range(): + start_date, end_date = input.date_range() + + return start_date, end_date + + +@reactive.calc +def stars_data(stars_t=stars_t): + start_date, end_date = input.date_range() + + t = stars_t.filter( + stars_t["starred_at"] >= start_date, stars_t["starred_at"] <= end_date + ) + + return t + + +@reactive.calc +def pulls_data(pulls_t=pulls_t): + start_date, end_date = input.date_range() + + t = pulls_t.filter( + pulls_t["created_at"] >= start_date, pulls_t["created_at"] <= end_date + ) + + return t + + +@reactive.calc +def forks_data(forks_t=forks_t): + start_date, end_date = input.date_range() + + t = forks_t.filter( + forks_t["created_at"] >= start_date, forks_t["created_at"] <= end_date + ) + + return t + + +@reactive.calc +def downloads_data(downloads_t=downloads_t): + start_date, end_date = input.date_range() + + t = downloads_t.filter( + downloads_t["date"] >= start_date, downloads_t["date"] <= end_date + ) + + return t + + +@reactive.calc +def issues_data(issues_t=issues_t): + start_date, end_date = input.date_range() + + t = issues_t.filter( + issues_t["created_at"] >= start_date, issues_t["created_at"] <= end_date + ) + + return t + + +@reactive.calc +def commits_data(commits_t=commits_t): + start_date, end_date = input.date_range() + + t = commits_t.filter( + commits_t["committed_date"] >= start_date, + commits_t["committed_date"] <= end_date, + ) + + return t + + +@reactive.effect +@reactive.event(input.last_7d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_14d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=14)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_28d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=28)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_91d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=91)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_182d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=182)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_365d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_730d) +def _(): + ui.update_date_range( + "date_range", + start=(datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d"), + end=datetime.now().strftime("%Y-%m-%d"), + ) + + +@reactive.effect +@reactive.event(input.last_all) +def _(): + # TODO: pretty hacky + min_all_tables = [ + (col, t[col].cast("timestamp").min().to_pyarrow().as_py()) + for t in [stars_t, pulls_t, forks_t, issues_t, commits_t, downloads_t] + for col in t.columns + if ( + str(t[col].type()).startswith("timestamp") + or str(t[col].type()).startswith("date") + ) + # this in particular should be cleaned up in the DAG + and "created_at" not in col + ] + min_all_tables = min([x[1] for x in min_all_tables]) - timedelta(days=1) + max_now = datetime.now() + timedelta(days=1) + + ui.update_date_range( + "date_range", + start=(min_all_tables).strftime("%Y-%m-%d"), + end=max_now.strftime("%Y-%m-%d"), + ) diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 0000000..46f0277 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,9 @@ +ruff +build +twine +ipython +jupyter +nbclient +ipykernel + +-r requirements.txt diff --git a/docs_hack.py b/docs_hack.py deleted file mode 100644 index 304e61e..0000000 --- a/docs_hack.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Hacky manual upload of docs data until rate limiting in CI is resolved.""" - -# imports -import ibis -import toml -import shutil - -import logging as log - -from dotenv import load_dotenv -from dag.ingest import ingest_docs -from dag.assets.extract.docs import extract_docs -from dag.assets.transform.docs import transform_docs -from dag.assets.load.docs import load_docs - -# load .env -load_dotenv() - -# configure log -log.basicConfig( - level=log.INFO, -) - -# load config -config = toml.load("config.toml")["app"] -log.info(f"Deploying to {config['database']}...") - -# connect to the database -target = ibis.duckdb.connect(f"{config['database']}") - -# ensure fresh data -shutil.rmtree("data/system/duckdb", ignore_errors=True) - -# ingest -ingest_docs() - -# ETL -docs = load_docs(transform_docs(extract_docs())) - -# deploy -log.info(f"\tDeploying docs to {config['database']}...") -target.create_table("docs", docs.to_pyarrow(), overwrite=True) diff --git a/eda.py b/eda.py deleted file mode 100644 index 623cc1e..0000000 --- a/eda.py +++ /dev/null @@ -1,50 +0,0 @@ -# imports -import re -import os -import sys -import toml -import ibis -import httpx -import requests - -import logging as log -import plotly.io as pio -import ibis.selectors as s -import plotly.express as px - -from rich import print -from dotenv import load_dotenv -from datetime import datetime, timedelta, date - -## local imports -from dag import functions as f -from dag.assets import extract, load, transform - -# configuration -## logger -log.basicConfig(level=log.INFO) - -## config.toml -config = toml.load("config.toml")["eda"] - -## load .env file -load_dotenv() - -## ibis config -ibis.options.interactive = True -ibis.options.repr.interactive.max_rows = 20 -ibis.options.repr.interactive.max_columns = None - -# variables -NOW = datetime.now() -NOW_7 = NOW - timedelta(days=7) -NOW_30 = NOW - timedelta(days=30) -NOW_90 = NOW - timedelta(days=90) -NOW_180 = NOW - timedelta(days=180) -NOW_365 = NOW - timedelta(days=365) -NOW_10 = NOW - timedelta(days=3650) - -# connect to database -database = config["database"] -log.info(f"database: {database}") -con = ibis.connect(f"duckdb://{database}") diff --git a/eda.qmd b/eda.qmd new file mode 100644 index 0000000..650c45f --- /dev/null +++ b/eda.qmd @@ -0,0 +1,32 @@ +```{python} +import ibis + +import ibis.selectors as s +import plotly.express as px + +from ibis_analytics.etl.extract import ( + zulip_members as extract_zulip_members, + zulip_messages as extract_zulip_messages, +) +from ibis_analytics.etl.transform import ( + zulip_members as transform_zulip_members, + zulip_messages as transform_zulip_messages, +) + +ibis.options.interactive = True +ibis.options.repr.interactive.max_rows = 5 +ibis.options.repr.interactive.max_columns = None + +px.defaults.template = "plotly_dark" +``` + +```{python} +t = extract_zulip_messages() +t +``` + +```{python} +t = transform_zulip_messages(t) +t.head() +``` + diff --git a/justfile b/justfile index d16797e..4183f8d 100644 --- a/justfile +++ b/justfile @@ -4,93 +4,62 @@ set dotenv-load # variables -module := "dag" +package := "ibis-analytics" # aliases alias fmt:=format -alias etl:=run -alias open:=open-dash -alias dag-open:=open-dag -alias preview:=app +alias render:=docs-build +alias preview:=docs-preview # list justfile recipes default: just --list -# format -format: - @ruff format . - -# smoke-test -smoke-test: - @ruff format --check . +# build +build: + just clean-dist + @python -m build # setup setup: - @pip install --upgrade -r requirements.txt - @pip install -e . - -# eda -eda: - @ipython -i eda.py - -# ingest -ingest: - @python {{module}}/ingest.py - -# run -run: - @dagster job execute -j all_assets -m {{module}} + @uv venv + @. .venv/bin/activate + @uv pip install --upgrade --resolution=highest -r dev-requirements.txt -# postprocess -postprocess: - @python {{module}}/postprocess.py +# install +install: + @uv pip install -r dev-requirements.txt -# deploy -deploy: - @python {{module}}/deploy.py +# uninstall +uninstall: + @pip uninstall -y {{package}} -# test -test: - @python metrics.py - @python pages/0_github.py - @python pages/1_pypi.py - @python pages/2_zulip.py - @python pages/3_docs.py - @python pages/4_about.py - -# dag -dag: - @dagster dev -m {{module}} - -# streamlit stuff -app: - @streamlit run metrics.py - -# clean -clean: - @rm -r *.ddb* || true - @rm -r data/system || true - @rm -r data/backup || true - @rm data/backup.ddb || true +# format +format: + @ruff format . -# open dag -open-dag: - @open http://localhost:3000/asset-groups +# publish-test +release-test: + just build + @twine upload --repository testpypi dist/* -u __token__ -p ${PYPI_TEST_TOKEN} -# open dash -open-dash: - @open https://ibis-analytics.streamlit.app +# publish +release: + just build + @twine upload dist/* -u __token__ -p ${PYPI_TOKEN} -# cicd -cicd: - @gh workflow run cicd.yaml +# clean dist +clean-dist: + @rm -rf dist -# docs-hack: manually upload docs data -docs-hack: - @python docs_hack.py +# docs-build +docs-build: + @quarto render website -# ssh -ssh: - @gcloud compute ssh --zone "us-central1-c" --project "voltrondata-demo" --tunnel-through-iap "ibis-analytics" +# docs-preview +docs-preview: + @quarto preview website +# open +open: + @open https://lostmygithubaccount.github.io/ibis-analytics diff --git a/metrics.py b/metrics.py deleted file mode 100644 index 1ef59c4..0000000 --- a/metrics.py +++ /dev/null @@ -1,438 +0,0 @@ -# imports -import toml -import ibis - -import streamlit as st -import plotly.express as px - -from dotenv import load_dotenv -from datetime import datetime, timedelta - -# options -## load .env -load_dotenv() - -## config.toml -config = toml.load("config.toml")["app"] - -## streamlit config -st.set_page_config(layout="wide") - -## ibis config -con = ibis.connect(f"duckdb://{config['database']}", read_only=True) - -# use precomputed data -# TODO: remove dirty hack -try: - docs = con.table("docs") -except: - docs = None -stars = con.table("stars") -forks = con.table("forks") -pulls = con.table("pulls") -issues = con.table("issues") -backends = con.table("backends") -downloads = con.table("downloads") -members = con.table("zulip_members") -messages = con.table("zulip_messages") - -# display header stuff -with open("readme.md") as f: - readme_code = f.read() - -f""" -{readme_code} See [the about page](/about) for more details on tools and services used. - - -""" - -# TODO: remove after docs are working -st.warning( - "Documentation metrics are broken as I fight with rate limiting, check back soon!", - icon="⚠️", -) - -with open("metrics.py") as f: - metrics_code = f.read() - -with st.expander("Show source code", expanded=False): - st.code(metrics_code, line_numbers=True, language="python") - -""" ---- -""" - -""" -## supported backends -""" - - -def fmt_number(value): - return f"{value:,}" - - -current_backends_total = ( - backends.filter(backends.ingested_at == backends.ingested_at.max()) - .num_backends.max() - .to_pandas() -) -current_backends = backends.backends.unnest().name("backends").as_table() - -st.metric("Total", f"{current_backends_total:,}") -st.dataframe(current_backends, use_container_width=True) - -""" -## totals (all time) -""" - -total_stars_all_time = stars.login.nunique().to_pandas() -total_forks_all_time = forks.login.nunique().to_pandas() - -total_closed_issues_all_time = issues.number.nunique( - where=issues.state == "closed" -).to_pandas() - -total_merged_pulls_all_time, total_contributors_all_time = ( - pulls.agg( - total_merged_pulls_all_time=pulls.number.nunique(where=pulls.state == "merged"), - total_contributors_all_time=pulls.login.nunique( - where=pulls.merged_at.notnull() - ), - ) - .to_pandas() - .squeeze() -) - -downloads_all_time = downloads["downloads"].sum().to_pandas() - -total_members_all_time = members.user_id.nunique().to_pandas() -total_messages_all_time = messages.id.nunique().to_pandas() - -# TODO: remove dirty hack -if docs is not None: - total_visits_all_time = docs.count().to_pandas() - total_first_visits_all_time = docs.first_visit.sum().to_pandas() -else: - total_visits_all_time = 0 - total_first_visits_all_time = 0 - -col0, col1, col2, col3, col4 = st.columns(5) -with col0: - st.metric( - label="GitHub stars", - value=fmt_number(total_stars_all_time), - help=f"{total_stars_all_time:,}", - ) - st.metric( - label="PyPI downloads", - value=fmt_number(downloads_all_time), - help=f"{downloads_all_time:,}", - ) -with col1: - st.metric( - label="GitHub contributors", - value=fmt_number(total_contributors_all_time), - help=f"{total_contributors_all_time:,}", - ) - st.metric( - label="GitHub forks", - value=fmt_number(total_forks_all_time), - help=f"{total_forks_all_time:,}", - ) -with col2: - st.metric( - label="GitHub PRs merged", - value=fmt_number(total_merged_pulls_all_time), - help=f"{total_merged_pulls_all_time:,}", - ) - st.metric( - label="GitHub issues closed", - value=fmt_number(total_closed_issues_all_time), - help=f"{total_closed_issues_all_time:,}", - ) -with col3: - st.metric( - label="Zulip members", - value=fmt_number(total_members_all_time), - help=f"{total_members_all_time:,}", - ) - st.metric( - label="Zulip messages", - value=fmt_number(total_messages_all_time), - help=f"{total_messages_all_time:,}", - ) -with col4: - st.metric( - label="Unique docs visits", - value=fmt_number(total_visits_all_time), - help="Currently broken, check back soon!", - # help=f"{total_visits_all_time:,}", - ) - st.metric( - label="Unique docs first visits", - value=fmt_number(total_first_visits_all_time), - help="Currently broken, check back soon!", - # help=f"{total_first_visits_all_time:,}", - ) - -# variables -with st.form(key="app"): - days = st.number_input( - "X days", - min_value=1, - max_value=365, - value=90, - step=30, - format="%d", - ) - update_button = st.form_submit_button(label="update") - - -START = datetime.now() - timedelta(days=days * 2) -STOP = datetime.now() - timedelta(days=days) - - -# compute metrics -total_stars, total_stars_prev = ( - stars.agg( - total_stars=stars.login.nunique(where=stars.starred_at >= STOP), - total_stars_prev=stars.login.nunique( - where=stars.starred_at.between(START, STOP) - ), - ) - .to_pandas() - .squeeze() -) - -total_forks, total_forks_prev = ( - forks.agg( - total_forks=forks.login.nunique(where=forks.created_at >= STOP), - total_forks_prev=forks.login.nunique( - where=forks.created_at.between(START, STOP) - ), - ) - .to_pandas() - .squeeze() -) - -( - total_issues, - total_issues_prev, - total_issues_closed, - total_issues_closed_prev, -) = ( - issues.agg( - total_issues=issues.login.nunique(where=issues.created_at >= STOP), - total_issues_prev=issues.login.nunique( - where=issues.created_at.between(START, STOP) - ), - total_issues_closed=issues.number.nunique(where=issues.closed_at >= STOP), - total_issues_closed_prev=issues.number.nunique( - where=issues.closed_at.between(START, STOP) - ), - ) - .to_pandas() - .squeeze() -) - -( - total_pulls, - total_pulls_prev, - total_pulls_merged, - total_pulls_merged_prev, - total_contributors, - total_contributors_prev, -) = ( - pulls.agg( - total_pulls=pulls.number.nunique(where=pulls.created_at >= STOP), - total_pulls_prev=pulls.number.nunique( - where=pulls.created_at.between(START, STOP) - ), - total_pulls_merged=pulls.number.nunique(where=pulls.merged_at >= STOP), - total_pulls_merged_prev=pulls.number.nunique( - where=pulls.merged_at.between(START, STOP) - ), - total_contributors=pulls.login.nunique(where=pulls.merged_at >= STOP), - total_contributors_prev=pulls.login.nunique( - where=pulls.merged_at.between(START, STOP) - ), - ) - .to_pandas() - .squeeze() -) - -total_downloads, total_downloads_prev = ( - downloads.agg( - total_downloads=downloads.downloads.sum(where=downloads.timestamp >= STOP), - total_downloads_prev=downloads.downloads.sum( - where=downloads.timestamp.between(START, STOP) - ), - ) - .to_pandas() - .squeeze() -) - - -new_issue_contributors, new_issue_contributors_prev = ( - issues.agg( - new_issue_contributors=issues.login.nunique( - where=(issues.created_at >= STOP) & (issues.is_first_issue == True) - ), - new_issue_contributors_prev=issues.login.nunique( - where=(issues.created_at.between(START, STOP)) - & (issues.is_first_issue == True) - ), - ) - .to_pandas() - .squeeze() -) -new_pulls_contributors, new_pulls_contributors_prev = ( - pulls.agg( - new_pulls_contributors=pulls.login.nunique( - where=(pulls.created_at >= STOP) & (pulls.is_first_pull == True) - ), - new_pulls_contributors_prev=pulls.login.nunique( - where=(pulls.created_at.between(START, STOP)) - & (pulls.is_first_pull == True) - ), - ) - .to_pandas() - .squeeze() -) - - -def delta(current, previous): - delta = current - previous - pct_change = int(round(100.0 * delta / previous, 0)) - return f"{fmt_number(delta)} ({pct_change:d}%)" - - -f""" -## totals (last {days} days) -""" -col1, col2, col3, col4, col5 = st.columns(5) -with col1: - st.metric( - label="GitHub stars", - value=fmt_number(total_stars), - delta=delta(total_stars, total_stars_prev), - help=f"{total_stars:,}", - ) - st.metric( - label="PyPI downloads", - value=fmt_number(total_downloads), - delta=delta(total_downloads, total_downloads_prev), - help=f"{total_downloads:,}", - ) -with col2: - st.metric( - label="GitHub contributors", - value=fmt_number(total_contributors), - delta=delta(total_contributors, total_contributors_prev), - help=f"{total_contributors:,}", - ) - st.metric( - label="GitHub forks created", - value=fmt_number(total_forks), - delta=delta(total_forks, total_forks_prev), - help=f"{total_forks:,}", - ) -with col3: - st.metric( - label="GitHub PRs opened", - value=fmt_number(total_pulls), - delta=delta(total_pulls, total_pulls_prev), - help=f"{total_pulls:,}", - ) - st.metric( - label="GitHub issues opened", - value=fmt_number(total_issues), - delta=delta(total_issues, total_issues_prev), - help=f"{total_issues:,}", - ) -with col4: - st.metric( - label="GitHub PRs merged", - value=fmt_number(total_pulls_merged), - delta=delta(total_pulls_merged, total_pulls_merged_prev), - help=f"{total_pulls_merged:,}", - ) - st.metric( - label="GitHub issues closed", - value=fmt_number(total_issues_closed), - delta=delta(total_issues_closed, total_issues_closed_prev), - help=f"{total_issues_closed:,}", - ) -with col5: - st.metric( - label="New issue contributors", - value=fmt_number(new_issue_contributors), - delta=delta(new_issue_contributors, new_issue_contributors_prev), - help=f"{new_issue_contributors:,}", - ) - st.metric( - label="New PR contributors", - value=fmt_number(new_pulls_contributors), - delta=delta(new_pulls_contributors, new_pulls_contributors_prev), - help=f"{new_pulls_contributors:,}", - ) - -f""" -## data (last {days} days) -""" - -""" -### downloads by system and version -""" -c0 = px.bar( - downloads.group_by([ibis._.system, ibis._.version]) - .agg(downloads=lambda t: t.downloads.sum(where=t.timestamp > STOP)) - .order_by(ibis._.version.desc()), - x="version", - y="downloads", - color="system", - title="downloads by system and version", -) -st.plotly_chart(c0, use_container_width=True) - -""" -### stars by company -""" -st.dataframe( - stars.group_by(ibis._.company) - .agg(stars=lambda t: t.count(where=t.starred_at > STOP)) - .filter(ibis._.stars > 0) - .order_by(ibis._.stars.desc()) - .to_pandas(), - use_container_width=True, -) - -""" -### issues by login -""" -c1 = px.bar( - issues.group_by([ibis._.login, ibis._.state]) - .agg(issues=lambda t: t.count(where=t.created_at > STOP)) - .filter(ibis._.issues > 0) - .order_by(ibis._.issues.desc()), - x="login", - y="issues", - color="state", - title="issues by login", -) -st.plotly_chart(c1, use_container_width=True) - -""" -### PRs by login -""" -c2 = px.bar( - pulls.group_by([ibis._.login, ibis._.state]) - .agg(pulls=lambda t: t.count(where=t.created_at > STOP)) - .filter(ibis._.pulls > 0) - .order_by(ibis._.pulls.desc()), - x="login", - y="pulls", - color="state", - title="PRs by login", -) -st.plotly_chart(c2, use_container_width=True) diff --git a/pages/0_github.py b/pages/0_github.py deleted file mode 100644 index cef80bd..0000000 --- a/pages/0_github.py +++ /dev/null @@ -1,390 +0,0 @@ -# imports -import toml -import ibis - -import streamlit as st -import plotly.io as pio -import plotly.express as px -import ibis.selectors as s - -from dotenv import load_dotenv -from datetime import datetime, timedelta - -# options -## load .env -load_dotenv() - -## config.toml -config = toml.load("config.toml")["app"] - -## streamlit config -st.set_page_config(layout="wide") - -## ibis config -con = ibis.connect(f"duckdb://{config['database']}", read_only=True) - -# use precomputed data -stars = con.table("stars") -forks = con.table("forks") -pulls = con.table("pulls") -issues = con.table("issues") -commits = con.table("commits") -watchers = con.table("watchers") - -# display metrics -""" -# GitHub metrics -""" - -f""" ---- -""" - -with open("pages/0_github.py") as f: - github_code = f.read() - -with st.expander("Show source code", expanded=False): - st.code(github_code, line_numbers=True, language="python") - -""" ---- -""" - -total_stars_all_time = stars.select("login").distinct().count().to_pandas() -total_forks_all_time = forks.select("login").distinct().count().to_pandas() -total_closed_issues_all_time = ( - issues.filter(ibis._.state == "closed") - .select("number") - .distinct() - .count() - .to_pandas() -) -total_merged_pulls_all_time = ( - pulls.filter(ibis._.state == "merged") - .select("number") - .distinct() - .count() - .to_pandas() -) -total_contributors_all_time = ( - pulls.filter(ibis._.merged_at != None) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_watchers_all_time = watchers.select("login").distinct().count().to_pandas() -open_issues = ( - issues.filter(ibis._.is_closed != True) - .select("number") - .distinct() - .count() - .to_pandas() -) -open_pulls = ( - pulls.filter(ibis._.state == "open").select("number").distinct().count().to_pandas() -) - -f""" -## open items -""" -st.metric( - label="open issues", - value=f"{open_issues:,}", -) -st.metric( - label="open pull requests", - value=f"{open_pulls:,}", -) - -f""" -## totals (all time) -""" -col0, col1, col2 = st.columns(3) -with col0: - st.metric( - label="stars", - value=f"{total_stars_all_time:,}", - ) - st.metric("contributors", f"{total_contributors_all_time:,}") -with col1: - st.metric("forks", f"{total_forks_all_time:,}") - st.metric("watchers", f"{total_watchers_all_time:,}") -with col2: - st.metric("closed issues", f"{total_closed_issues_all_time:,}") - st.metric("merged pull requests", f"{total_merged_pulls_all_time:,}") - -# variables -with st.form(key="pypi"): - days = st.number_input( - "X days", - min_value=1, - max_value=3650, - value=90, - step=30, - format="%d", - ) - timescale = st.selectbox( - "timescale (plots)", - ["D", "W", "M", "Q", "Y"], - index=0, - ) - update_button = st.form_submit_button(label="update") - -# compute metrics -total_stars = ( - stars.filter(ibis._.starred_at >= datetime.now() - timedelta(days=days)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_stars_prev = ( - stars.filter(ibis._.starred_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.starred_at >= datetime.now() - timedelta(days=days * 2)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_forks = ( - forks.filter(ibis._.created_at >= datetime.now() - timedelta(days=days)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_forks_prev = ( - forks.filter(ibis._.created_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.created_at >= datetime.now() - timedelta(days=days * 2)) - .select("login") - .distinct() - .count() - .to_pandas() -) - -closed_issues = ( - issues.filter(ibis._.closed_at >= datetime.now() - timedelta(days=days)) - .select("number") - .distinct() - .count() - .to_pandas() -) -closed_issues_prev = ( - issues.filter(ibis._.closed_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.closed_at >= datetime.now() - timedelta(days=days * 2)) - .select("number") - .distinct() - .count() - .to_pandas() -) -merged_pulls = ( - pulls.filter(ibis._.merged_at >= datetime.now() - timedelta(days=days)) - .select("number") - .distinct() - .count() - .to_pandas() -) -merged_pulls_prev = ( - pulls.filter(ibis._.merged_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.merged_at >= datetime.now() - timedelta(days=days * 2)) - .select("number") - .distinct() - .count() - .to_pandas() -) -total_issues_prev = ( - issues.filter(ibis._.created_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.created_at >= datetime.now() - timedelta(days=days * 2)) - .select("number") - .distinct() - .count() - .to_pandas() -) -total_pulls = ( - pulls.filter(ibis._.created_at >= datetime.now() - timedelta(days=days)) - .select("number") - .distinct() - .count() - .to_pandas() -) -total_pulls_prev = ( - pulls.filter(ibis._.created_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.created_at >= datetime.now() - timedelta(days=days * 2)) - .select("number") - .distinct() - .count() - .to_pandas() -) -total_contributors = ( - pulls.filter(ibis._.merged_at != None) - .filter(ibis._.merged_at >= datetime.now() - timedelta(days=days)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_contributors_prev = ( - pulls.filter(ibis._.merged_at != None) - .filter(ibis._.merged_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.merged_at >= datetime.now() - timedelta(days=days * 2)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_watchers = ( - watchers.filter(ibis._.updated_at >= datetime.now() - timedelta(days=days)) - .select("login") - .distinct() - .count() - .to_pandas() -) -total_watchers_prev = ( - watchers.filter(ibis._.updated_at <= datetime.now() - timedelta(days=days)) - .filter(ibis._.updated_at >= datetime.now() - timedelta(days=days * 2)) - .select("login") - .distinct() - .count() - .to_pandas() -) - -f""" -## totals (last {days} days) -""" -col2, col3, col4 = st.columns(3) -with col2: - st.metric( - label="stars", - value=f"{total_stars:,}", - delta=f"{total_stars - total_stars_prev:,}", - ) - st.metric( - label="contributors", - value=f"{total_contributors:,}", - delta=f"{total_contributors - total_contributors_prev:,}", - ) -with col3: - st.metric( - label="forks", - value=f"{total_forks:,}", - delta=f"{total_forks - total_forks_prev:,}", - ) - st.metric( - label="watchers", - value=f"{total_watchers:,}", - delta=f"{total_watchers - total_watchers_prev:,}", - ) -with col4: - st.metric( - label="closed issues", - value=f"{closed_issues:,}", - delta=f"{closed_issues - closed_issues_prev:,}", - ) - st.metric( - label="merged pull requests", - value=f"{merged_pulls:,}", - delta=f"{merged_pulls - merged_pulls_prev:,}", - ) - -# viz -c0 = px.line( - stars.filter(ibis._.starred_at > datetime.now() - timedelta(days=days)).order_by( - ibis._.starred_at.desc() - ), - x="starred_at", - y="total_stars", - title="cumulative stars", -) -st.plotly_chart(c0, use_container_width=True) - -c1 = px.bar( - stars.filter(ibis._.starred_at > datetime.now() - timedelta(days=days)) - .mutate(ibis._.company[:20].name("company")) - .group_by( - [ibis._.starred_at.truncate(timescale).name("starred_at"), ibis._.company] - ) - .agg(ibis._.count().name("stars")) - .order_by(ibis._.stars.desc()), - x="starred_at", - y="stars", - color="company", - title="stars by company", -) -st.plotly_chart(c1, use_container_width=True) - -# cumulative stars by company -st.dataframe( - stars.filter(ibis._.starred_at > datetime.now() - timedelta(days=days)) - .group_by([ibis._.company]) - .agg(ibis._.count().name("stars")) - .order_by(ibis._.stars.desc()), - use_container_width=True, -) - -c3 = px.line( - forks.filter(ibis._.created_at > datetime.now() - timedelta(days=days)).order_by( - ibis._.created_at.desc() - ), - x="created_at", - y="total_forks", - title="cumulative forks", -) -st.plotly_chart(c3, use_container_width=True) - -c4 = px.line( - pulls.filter(ibis._.created_at > datetime.now() - timedelta(days=days)).order_by( - ibis._.created_at.desc() - ), - x="created_at", - y="total_pulls", - title="cumulative pull requests", -) -st.plotly_chart(c4, use_container_width=True) - -c5 = px.line( - issues.filter(ibis._.created_at > datetime.now() - timedelta(days=days)).order_by( - ibis._.created_at.desc() - ), - x="created_at", - y="total_issues", - title="cumulative issues", -) -st.plotly_chart(c5, use_container_width=True) - -c6 = px.line( - commits.filter( - ibis._.committed_date > datetime.now() - timedelta(days=days) - ).order_by(ibis._.committed_date.desc()), - x="committed_date", - y="total_commits", - title="cumulative commits", -) -st.plotly_chart(c6, use_container_width=True) - -# new contributors over time -c7 = px.bar( - issues.filter(ibis._.created_at > datetime.now() - timedelta(days=days)) - .filter(ibis._.is_first_issue == True) - .group_by([ibis._.created_at.truncate(timescale).name("created_at")]) - .agg( - ibis._.login.nunique().name("new_contributors"), - ), - x="created_at", - y="new_contributors", - title="new contributors (issues)", -) -st.plotly_chart(c7, use_container_width=True) - -c8 = px.bar( - pulls.filter(ibis._.created_at > datetime.now() - timedelta(days=days)) - .filter(ibis._.is_first_pull == True) - .group_by([ibis._.created_at.truncate(timescale).name("created_at")]) - .agg( - ibis._.login.nunique().name("new_contributors"), - ), - x="created_at", - y="new_contributors", - title="new contributors (pull requests)", -) -st.plotly_chart(c8, use_container_width=True) diff --git a/pages/1_pypi.py b/pages/1_pypi.py deleted file mode 100644 index 8c8f661..0000000 --- a/pages/1_pypi.py +++ /dev/null @@ -1,236 +0,0 @@ -# imports -import toml -import ibis - -import streamlit as st -import plotly.io as pio -import plotly.express as px -import ibis.selectors as s - -from dotenv import load_dotenv -from datetime import datetime, timedelta - -# options -## load .env -load_dotenv() - -## config.toml -config = toml.load("config.toml")["app"] - -## streamlit config -st.set_page_config(layout="wide") - -## ibis config -con = ibis.connect(f"duckdb://{config['database']}", read_only=True) - -# use precomputed data -downloads = con.table("downloads") - -# display metrics -""" -# PyPI metrics -""" - -f""" ---- -""" - -with open("pages/1_pypi.py") as f: - pypi_code = f.read() - -with st.expander("Show source code", expanded=False): - st.code(pypi_code, line_numbers=True, language="python") - -f""" ---- - -## totals (all time) -""" -downloads_all_time = downloads["downloads"].sum().to_pandas() -st.metric("downloads", f"{downloads_all_time:,}") - -with st.form(key="groupers"): - groupers = st.multiselect( - "groupers", - ["version", "system", "country_code", "python"], - ["version"], - ) - update_button = st.form_submit_button(label="update") - -f""" -downloads grouped by {groupers} -""" -grouped = ( - downloads.group_by(groupers) - .aggregate(ibis._.downloads.sum().name("downloads")) - .order_by(ibis._.downloads.desc()) -) - -st.dataframe(grouped, use_container_width=True) - -# variables -with st.form(key="pypi"): - days = st.number_input( - "X days", - min_value=1, - max_value=3650, - value=90, - step=30, - format="%d", - ) - timescale = st.selectbox( - "timescale (plots)", - ["D", "W", "M", "Q", "Y"], - index=0, - ) - exclude_linux = st.checkbox("exclude linux", value=False) - update_button = st.form_submit_button(label="update") - -if exclude_linux: - downloads = downloads.filter(~ibis._.system.contains("Linux")) - -f""" -## totals (last {days} days) -""" -downloads_last_x_days = ( - downloads.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days))[ - "downloads" - ] - .sum() - .to_pandas() -) -st.metric("downloads", f"{downloads_last_x_days:,}") - -f""" -downloads grouped by {groupers} -""" -downloads_last_x_days_by_groupers = ( - downloads.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by(groupers) - .agg(ibis._.downloads.sum().name("downloads")) - .order_by(ibis._.downloads.desc()) -) -st.dataframe(downloads_last_x_days_by_groupers, use_container_width=True) - -# viz -t = ( - downloads.mutate( - timestamp=downloads["timestamp"].truncate("D"), - ) - .group_by("timestamp") - .agg(downloads=ibis._["downloads"].sum()) - .select( - "timestamp", - downloads=ibis._["downloads"] - .sum() - .over(ibis.window(order_by="timestamp", preceding=28, following=0)), - ) - .filter((ibis._["timestamp"] >= datetime.now() - timedelta(days=days))) - .order_by("timestamp") -) - -c00 = px.line( - t, - x="timestamp", - y="downloads", - title="downloads (28 days rolling)", -) -st.plotly_chart(c00, use_container_width=True) - -t = ( - downloads.mutate( - timestamp=downloads["timestamp"].truncate("D"), - version=downloads["version"].split(".")[0], - ) - .group_by("timestamp", "version") - .agg(downloads=ibis._["downloads"].sum()) - .select( - "timestamp", - "version", - downloads=ibis._["downloads"] - .sum() - .over( - ibis.window( - order_by="timestamp", group_by="version", preceding=28, following=0 - ) - ), - ) - .filter((ibis._["timestamp"] >= datetime.now() - timedelta(days=days))) - .order_by("timestamp") -) - -c00 = px.line( - t, - x="timestamp", - y="downloads", - color="version", - category_orders={ - "version": reversed( - sorted( - t.distinct(on="version")["version"].to_pyarrow().to_pylist(), - key=lambda x: int(x), - ) - ) - }, - title="downloads (28 days rolling) by version", -) -st.plotly_chart(c00, use_container_width=True) - -c0 = px.line( - downloads.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by(ibis._.timestamp.truncate(timescale).name("timestamp")) - .agg(ibis._.downloads.sum().name("downloads")) - .order_by(ibis._.timestamp.desc()) - .mutate( - ibis._.downloads.sum() - .over(rows=(0, None), order_by=ibis._.timestamp.desc()) - .name("total_downloads") - ) - .order_by(ibis._.timestamp.desc()), - x="timestamp", - y="total_downloads", - # log_y=True, - title="cumulative downloads", -) -st.plotly_chart(c0, use_container_width=True) - -c1 = px.bar( - downloads.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by( - [ibis._.timestamp.truncate(timescale).name("timestamp"), ibis._[groupers[0]]] - ) - .agg(ibis._.downloads.sum().name("downloads")) - .order_by(ibis._.timestamp.desc()) - .mutate( - ibis._.downloads.sum() - .over(rows=(0, None), group_by=groupers[0], order_by=ibis._.timestamp.desc()) - .name("total_downloads") - ) - .order_by([ibis._.timestamp.desc(), ibis._.downloads.desc()]), - x="timestamp", - y="downloads", - color=groupers[0], - title=f"downloads by {groupers[0]}", -) -st.plotly_chart(c1, use_container_width=True) - -c2 = px.line( - downloads.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by( - [ibis._.timestamp.truncate(timescale).name("timestamp"), ibis._[groupers[0]]] - ) - .agg(ibis._.downloads.sum().name("downloads")) - .order_by(ibis._.timestamp.desc()) - .mutate( - ibis._.downloads.sum() - .over(rows=(0, None), group_by=groupers[0], order_by=ibis._.timestamp.desc()) - .name("total_downloads") - ) - .order_by([ibis._.timestamp.desc(), ibis._.downloads.desc()]), - x="timestamp", - y="total_downloads", - log_y=True, - color=groupers[0], - title=f"cumulative downloads by {groupers[0]}", -) -st.plotly_chart(c2, use_container_width=True) diff --git a/pages/2_zulip.py b/pages/2_zulip.py deleted file mode 100644 index 3212bb6..0000000 --- a/pages/2_zulip.py +++ /dev/null @@ -1,101 +0,0 @@ -# imports -import toml -import ibis - -import streamlit as st -import plotly.io as pio -import plotly.express as px -import ibis.selectors as s - -from dotenv import load_dotenv -from datetime import datetime, timedelta - -# options -## load .env -load_dotenv() - -## config.toml -config = toml.load("config.toml")["app"] - -## streamlit config -st.set_page_config(layout="wide") - -## ibis config -con = ibis.connect(f"duckdb://{config['database']}", read_only=True) - -# use precomputed data -members = con.table("zulip_members") -messages = con.table("zulip_messages") - -# display metrics -""" -# Zulip metrics -""" - - -f""" ---- -""" - -with open("pages/2_zulip.py") as f: - zulip_code = f.read() - -with st.expander("Show source code", expanded=False): - st.code(zulip_code, line_numbers=True, language="python") - -""" ---- -""" - -total_members_all_time = members.select("user_id").nunique().to_pandas() -total_messages_all_time = messages.select("id").nunique().to_pandas() - -f""" -## totals (all time) -""" - -col0, col1 = st.columns(2) - -with col0: - st.metric(label="members", value=total_members_all_time) -with col1: - st.metric(label="messages", value=total_messages_all_time) - -# viz -c0 = px.line( - # members.filter(ibis._.date_joined > datetime.now() - timedelta(days=days)), - members, - x="date_joined", - y="total_members", - title=f"members over time", -) -st.plotly_chart(c0, use_container_width=True) - -c1 = px.line( - # TODO: investigate hack here - # messages.filter(ibis._.timestamp > datetime.now() - timedelta(days=days)).order_by( - # ibis._.timestamp.desc() - # ), - messages.order_by(ibis._.timestamp.desc()), - x="timestamp", - y="total_messages", - title=f"messages over time", -) -st.plotly_chart(c1, use_container_width=True) - -## variables -# with st.form(key="zulip"): -# days = st.number_input( -# "X days", -# min_value=1, -# max_value=3650, -# value=90, -# step=30, -# format="%d", -# ) -# timescale = st.selectbox( -# "timescale", -# options=["days", "weeks", "months", "years"], -# index=2, -# ) -# update_button = st.form_submit_button(label="update") diff --git a/pages/3_docs.py b/pages/3_docs.py deleted file mode 100644 index a81f43c..0000000 --- a/pages/3_docs.py +++ /dev/null @@ -1,147 +0,0 @@ -# imports -import toml -import ibis - -import streamlit as st -import plotly.io as pio -import plotly.express as px -import ibis.selectors as s - -from dotenv import load_dotenv -from datetime import datetime, timedelta - -# options -## load .env -load_dotenv() - -## config.toml -config = toml.load("config.toml")["app"] - -## streamlit config -st.set_page_config(layout="wide") - -# TODO: remove after docs are working -st.warning( - "Documentation metrics are broken as I fight with rate limiting, check back soon!", - icon="⚠️", -) - -## ibis config -con = ibis.connect(f"duckdb://{config['database']}", read_only=True) - -# use precomputed data -docs = con.table("docs") - -# display metrics -""" -# Documentation metrics -""" - -f""" ---- -""" - -with open("pages/3_docs.py") as f: - docs_code = f.read() - -with st.expander("Show source code", expanded=False): - st.code(docs_code, line_numbers=True, language="python") - -f""" ---- - -## totals (all time) -""" -total_visits_all_time = docs.count().to_pandas() -total_first_visits_all_time = docs.first_visit.sum().to_pandas() - -st.metric("visits", f"{total_visits_all_time:,}") -st.metric("first visits", f"{total_first_visits_all_time:,}") - -# variables -with st.form(key="docs"): - days = st.number_input( - "X days", - min_value=1, - max_value=3650, - value=90, - step=30, - format="%d", - ) - timescale = st.selectbox( - "timescale (plots)", - ["D", "W", "M", "Q", "Y"], - index=0, - ) - grouper = st.selectbox( - "grouper", - ["location", "browser", "referrer", "bot", "user_agent", "event"], - index=2, - ) - update_button = st.form_submit_button(label="update") - -f""" -## last X days -""" -total_visits = ( - docs.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .count() - .to_pandas() -) -total_first_visits = ( - docs.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .first_visit.sum() - .to_pandas() -) - -st.metric("visits", f"{total_visits:,}") -st.metric("first visits", f"{total_first_visits:,}") - -# viz -c0 = px.bar( - docs.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by([ibis._.timestamp.truncate(timescale).name("timestamp"), ibis._[grouper]]) - .agg([ibis._.count().name("visits"), ibis._.first_visit.sum().name("first_visits")]) - .order_by(ibis._.timestamp.desc()) - .mutate( - (ibis._.first_visits / ibis._.visits).name("percent_first_visit"), - ) - .mutate(ibis._[grouper].cast(str)[:20].name(grouper)) - .order_by([ibis._.timestamp.desc(), ibis._.visits.desc()]), - x="timestamp", - y="visits", - color=grouper, - title="docs visits", -) -st.plotly_chart(c0, use_container_width=True) - -c2 = ( - docs.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by( - [ - ibis._.timestamp.truncate(timescale).name("timestamp"), - ibis._.path, - ibis._.referrer, - ] - ) - .agg([ibis._.count().name("visits"), ibis._.first_visit.sum().name("first_visits")]) - .order_by(ibis._.timestamp.desc()) - .mutate( - (100 * (ibis._.first_visits / ibis._.visits)).name("percent_first_visit"), - ) - .order_by([ibis._.timestamp.desc(), ibis._.visits.desc()]) -) -st.dataframe(c2, use_container_width=True) - -c1 = px.line( - docs.filter(ibis._.timestamp >= datetime.now() - timedelta(days=days)) - .group_by([ibis._.timestamp.truncate(timescale).name("timestamp")]) - .agg([ibis._.count().name("visits"), ibis._.first_visit.sum().name("first_visits")]) - .order_by(ibis._.timestamp.desc()) - .mutate((100 * (ibis._.first_visits / ibis._.visits)).name("percent_first_visit")) - .order_by(ibis._.timestamp.desc()), - x="timestamp", - y="percent_first_visit", - title="docs first visit %", -) -st.plotly_chart(c1, use_container_width=True) diff --git a/pages/4_about.py b/pages/4_about.py deleted file mode 100644 index d753e3b..0000000 --- a/pages/4_about.py +++ /dev/null @@ -1,67 +0,0 @@ -# imports -import streamlit as st - -## streamlit config -st.set_page_config(layout="wide") - -# display header stuff -with open("readme.md") as f: - readme_code = f.read() - -f""" -{readme_code} -""" - -with open("requirements.txt") as f: - metrics_code = f.read() - -with st.expander("show `requirements.txt`", expanded=False): - st.code(metrics_code, line_numbers=True, language="python") - -with open("config.toml") as f: - config_code = f.read() - -with st.expander("show `config.toml`", expanded=False): - st.code(config_code, line_numbers=True, language="toml") - -with open("justfile") as f: - justfile_code = f.read() - -with st.expander("show `justfile`", expanded=False): - st.code(justfile_code, line_numbers=True, language="makefile") - -with open(".github/workflows/cicd.yaml") as f: - cicd_code = f.read() - -with st.expander("show `cicd.yaml`", expanded=False): - st.code(cicd_code, line_numbers=True, language="yaml") - - -col0, col1 = st.columns(2) -with col0: - """ - Built with [Ibis](https://ibis-project.org) and other OSS: - - - [DuckDB](https://duckdb.org) (databases, query engine) - - [Streamlit](https://streamlit.io) (dashboard) - - [Plotly](https://plotly.com/python) (plotting) - - [Dagster](https://dagster.io) (DAG pipeline) - - [justfile](https://github.com/casey/just) (command runner) - - [TOML](https://toml.io) (configuration)""" - -with col1: - """ - And some freemium cloud services: - - - [GitHub](https://github.com/ibis-project/ibis-analytics) (source control, CI/CD, source data) - - [Google BigQuery](https://cloud.google.com/free/docs/free-cloud-features#bigquery) (source data) - - [Zulip](https://zulip.com) (source data) - - [Azure](https://azure.microsoft.com) (VM, storage backups) - - [Streamlit Community Cloud](https://docs.streamlit.io/streamlit-community-cloud) (cloud hosting for dashboard) - - [MotherDuck](https://motherduck.com/) (cloud hosting for production databases)""" - - -""" - -:violet[**Note**]: data is refreshed every day. Check [the CI/CD pipeline for the latest run](https://github.com/ibis-project/ibis-analytics/actions/workflows/cicd.yaml). -""" diff --git a/pyproject.toml b/pyproject.toml index f6483e4..813e580 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,49 @@ requires = ["hatchling"] build-backend = "hatchling.build" +[tool.hatch.metadata] +allow-direct-references = true + [project] -name = "dag" +name = "ibis-analytics" version = "0.1.0" +authors = [{ name = "Cody", email = "cody@dkdc.dev" }] +description = "Ibis analytics with Ibis" +readme = "readme.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + # secret management + 'python-dotenv', + # http + 'zulip', + 'httpx', + 'requests', + # cli + 'typer', + # cloud + 'gcsfs', + # data + 'ibis-framework[duckdb,polars,clickhouse,deltalake]', + # visualization + 'plotly', + 'great-tables', + # dashboards and apps + 'shiny>=1.0.0', + 'shinywidgets>=0.3.3', + 'shinyswatch>=0.7.0', +] + +[project.urls] +"Homepage" = "https://github.com/lostmygithubaccount/ibis-analytics" +"Bug Tracker" = "https://github.com/lostmygithubaccount/ibis-analytics/issues" + +[project.scripts] +ia = "ibis_analytics.cli:app" +[tool.ruff] +extend-include = ["*.ipynb"] diff --git a/readme.md b/readme.md index 61c4982..3b83050 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,11 @@ -# ibis-analytics +# Ibis analytics -[![cicd](https://github.com/ibis-project/ibis-analytics/workflows/cicd/badge.svg)](https://github.com/ibis-project/ibis-analytics/actions/workflows/cicd.yaml) +***Ibis analytics with Ibis.*** -This is an end-to-end analytics project for [Ibis](https://ibis-project.org), ingesting and processing >10M rows of data at little to no cost. +## Getting started -[Check out the blog](https://ibis-project.org/posts/ibis-analytics/) or [the dashboard](https://ibis-analytics.streamlit.app/). +[See the documentation](https://lostmygithubaccount.github.io/ibis-analytics). + +## Contributing + +Contributions welcome. diff --git a/requirements.txt b/requirements.txt index 9b06092..e714c16 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,2 @@ -# python -ruff -build -python-dotenv - -# web clients -httpx -zulip - -# data -duckdb==0.10.2 -ibis-framework[duckdb,bigquery,deltalake] @ git+https://github.com/ibis-project/ibis - -# viz -plotly -streamlit - -# ops -dagster -dagster-webserver - +-e . +#ibis-analytics diff --git a/dag/assets/extract/__init__.py b/src/ibis_analytics/__init__.py similarity index 100% rename from dag/assets/extract/__init__.py rename to src/ibis_analytics/__init__.py diff --git a/src/ibis_analytics/catalog.py b/src/ibis_analytics/catalog.py new file mode 100644 index 0000000..0b02d06 --- /dev/null +++ b/src/ibis_analytics/catalog.py @@ -0,0 +1,62 @@ +# imports +import os +import ibis + +from ibis_analytics.config import CLOUD, BUCKET, DATA_DIR + + +# functions +def delta_table_filename(table_name: str) -> str: + return f"{table_name}.delta" + + +def delta_table_path(table_name: str) -> str: + return os.path.join(DATA_DIR, delta_table_filename(table_name)) + + +def read_table(table_name: str) -> ibis.Table: + if CLOUD: + import gcsfs + + fs = gcsfs.GCSFileSystem(token="anon") + ibis.get_backend().register_filesystem(fs) + + table_path = f"gs://{BUCKET}/{delta_table_path(table_name)}" + else: + table_path = delta_table_path(table_name) + + return ibis.read_delta(table_path) + + +def write_table(t: ibis.Table, table_name: str) -> None: + if CLOUD: + import gcsfs + + fs = gcsfs.GCSFileSystem() + ibis.get_backend().register_filesystem(fs) + + table_path = f"gs://{BUCKET}/{delta_table_path(table_name)}" + else: + table_path = delta_table_path(table_name) + + t.to_delta( + table_path, + mode="overwrite", + partition_by=["extracted_at"], + ) + + +# classes +class Catalog: + def list_tables(self): + return [ + d + for d in os.listdir(DATA_DIR) + if not (d.startswith("_") or d.startswith(".")) + ] + + def table(self, table_name): + return read_table(table_name) + + def write_table(self, t, table_name): + write_table(t, table_name) diff --git a/src/ibis_analytics/cli.py b/src/ibis_analytics/cli.py new file mode 100644 index 0000000..e7489e7 --- /dev/null +++ b/src/ibis_analytics/cli.py @@ -0,0 +1,189 @@ +import os +import typer +import httpx +import subprocess + +from ibis_analytics.etl import main as etl_main +from ibis_analytics.ingest import main as ingest_main +from ibis_analytics.catalog import delta_table_path + +from ibis_analytics.config import ( + DATA_DIR, + RAW_DATA_DIR, + GH_PRS_TABLE, + GH_FORKS_TABLE, + GH_STARS_TABLE, + GH_ISSUES_TABLE, + GH_COMMITS_TABLE, + GH_WATCHERS_TABLE, +) + +TYPER_KWARGS = { + "no_args_is_help": True, + "add_completion": False, + "context_settings": {"help_option_names": ["-h", "--help"]}, +} +app = typer.Typer(help="acc", **TYPER_KWARGS) +clean_app = typer.Typer(help="Clean the data lake.", **TYPER_KWARGS) + +## add subcommands +app.add_typer(clean_app, name="clean") + +## add subcommand aliases +app.add_typer(clean_app, name="c", hidden=True) + + +# helper functions +def check_ingested_data_exists() -> bool: + # check that the ingested data exists + if not os.path.exists(os.path.join(DATA_DIR, RAW_DATA_DIR)): + typer.echo("run `acc ingest` first!") + return False + return True + + +def check_data_lake_exists() -> bool: + # check that the data lake exists + tables = [ + GH_PRS_TABLE, + GH_FORKS_TABLE, + GH_STARS_TABLE, + GH_ISSUES_TABLE, + GH_COMMITS_TABLE, + GH_WATCHERS_TABLE, + ] + for table in tables: + if not os.path.exists(delta_table_path(table)): + typer.echo("run `acc run` first!") + return False + return True + + +# commands +@app.command() +def ingest(): + """Ingest source data.""" + # ensure project config exists + try: + ingest_main() + except KeyboardInterrupt: + typer.echo("stopping...") + + except Exception as e: + typer.echo(f"error: {e}") + + +@app.command() +@app.command("etl", hidden=True) +def run( + override: bool = typer.Option( + False, "--override", "-o", help="Override checks", show_default=True + ), +): + """Run ETL.""" + + etl_main() + # ensure data is ingested + # if not override and not check_ingested_data_exists(): + # return + + # try: + # etl_main() + # except KeyboardInterrupt: + # typer.echo("stopping...") + # except Exception as e: + # typer.echo(f"error: {e}") + + +@app.command() +@app.command("dash", hidden=True) +@app.command("metrics", hidden=True) +def dashboard(): + """Open the dashboard.""" + + # ensure data is ingested + if not check_ingested_data_exists(): + return + + # ensure data lake exists + if not check_data_lake_exists(): + return + + if not os.path.exists("dashboard.py"): + url = "https://raw.githubusercontent.com/lostmygithubaccount/ibis-analytics/main/dashboard.py" + + response = httpx.get(url) + if response.status_code != 200: + typer.echo(f"error: {response.text}") + return + dashboard_code = response.text + + typer.echo("creating dashboard.py...") + with open("dashboard.py", "w") as f: + f.write(dashboard_code) + else: + typer.echo("found dashboard.py") + + typer.echo("opening dashboard...") + + cmd = "shiny run dashboard.py -b" + subprocess.call(cmd, shell=True) + + +@clean_app.command("lake") +def clean_lake( + override: bool = typer.Option( + False, "--override", "-o", help="Override checks", show_default=True + ), +): + """Clean the data lake.""" + # ensure the data lake exists + if not override and not check_data_lake_exists(): + return + + tables = [ + GH_PRS_TABLE, + GH_FORKS_TABLE, + GH_STARS_TABLE, + GH_ISSUES_TABLE, + GH_COMMITS_TABLE, + GH_WATCHERS_TABLE, + ] + + for table in tables: + cmd = f"rm -rf {delta_table_path(table)}/" + typer.echo(f"running: {cmd}...") + subprocess.call(cmd, shell=True) + + +@clean_app.command("ingest") +def clean_ingest( + override: bool = typer.Option( + False, "--override", "-o", help="Override checks", show_default=True + ), + confirm: bool = typer.Option( + True, "--confirm", "-c", help="Confirm deletion", show_default=True + ), +): + """Clean the raw data.""" + # ensure the data ingested exists + if not override and not check_ingested_data_exists(): + return + + if confirm: + typer.confirm("Are you sure you want to delete the ingested data?", abort=True) + + cmd = f"rm -rf {os.path.join(DATA_DIR, RAW_DATA_DIR)}/" + typer.echo(f"running: {cmd}...") + subprocess.call(cmd, shell=True) + + +@clean_app.command("all") +def clean_all(): + """Clean all the data.""" + clean_lake() + clean_ingest() + + +if __name__ == "__main__": + typer.run(app) diff --git a/src/ibis_analytics/config.py b/src/ibis_analytics/config.py new file mode 100644 index 0000000..9415499 --- /dev/null +++ b/src/ibis_analytics/config.py @@ -0,0 +1,26 @@ +GH_REPO = "ibis-project/ibis" +PYPI_PACKAGE = "ibis-framework" + +ZULIP_URL = "https://ibis-project.zulipchat.com" +DOCS_URL = "https://ibis.goatcounter.com" + +CLOUD = True +BUCKET = "ibis-analytics" + +DATA_DIR = "datalake" +RAW_DATA_DIR = "_raw" +RAW_DATA_GH_DIR = "github" +RAW_DATA_DOCS_DIR = "docs" +RAW_DATA_PYPI_DIR = "pypi" +RAW_DATA_ZULIP_DIR = "zulip" + +GH_PRS_TABLE = "gh_prs" +GH_FORKS_TABLE = "gh_forks" +GH_STARS_TABLE = "gh_stars" +GH_ISSUES_TABLE = "gh_issues" +GH_COMMITS_TABLE = "gh_commits" +GH_WATCHERS_TABLE = "gh_watchers" +DOCS_TABLE = "docs" +PYPI_TABLE = "pypi" +ZULIP_MEMBERS_TABLE = "zulip_members" +ZULIP_MESSAGES_TABLE = "zulip_messages" diff --git a/src/ibis_analytics/etl/__init__.py b/src/ibis_analytics/etl/__init__.py new file mode 100644 index 0000000..0e3e045 --- /dev/null +++ b/src/ibis_analytics/etl/__init__.py @@ -0,0 +1,106 @@ +# imports +from ibis_analytics.config import ( + GH_PRS_TABLE, + GH_FORKS_TABLE, + GH_STARS_TABLE, + GH_ISSUES_TABLE, + GH_COMMITS_TABLE, + GH_WATCHERS_TABLE, + DOCS_TABLE, + ZULIP_MEMBERS_TABLE, + ZULIP_MESSAGES_TABLE, +) +from ibis_analytics.catalog import Catalog +from ibis_analytics.etl.extract import ( + gh_prs as extract_gh_prs, + gh_forks as extract_gh_forks, + gh_stars as extract_gh_stars, + gh_issues as extract_gh_issues, + gh_commits as extract_gh_commits, + gh_watchers as extract_gh_watchers, + docs as extract_docs, + zulip_members as extract_zulip_members, + zulip_messages as extract_zulip_messages, +) +from ibis_analytics.etl.transform import ( + gh_prs as transform_gh_prs, + gh_forks as transform_gh_forks, + gh_stars as transform_gh_stars, + gh_issues as transform_gh_issues, + gh_commits as transform_gh_commits, + gh_watchers as transform_gh_watchers, + docs as transform_docs, + zulip_members as transform_zulip_members, + zulip_messages as transform_zulip_messages, +) + + +# functions +def main(): + # instantiate catalog + catalog = Catalog() + + # extract + extract_gh_prs_t = extract_gh_prs() + extract_gh_forks_t = extract_gh_forks() + extract_gh_stars_t = extract_gh_stars() + extract_gh_issues_t = extract_gh_issues() + extract_gh_commits_t = extract_gh_commits() + extract_gh_watchers_t = extract_gh_watchers() + extract_docs_t = extract_docs() + extract_zulip_members_t = extract_zulip_members() + extract_zulip_messages_t = extract_zulip_messages() + + # data validation + for t in [ + extract_gh_prs_t, + extract_gh_forks_t, + extract_gh_stars_t, + extract_gh_issues_t, + extract_gh_commits_t, + extract_gh_watchers_t, + extract_docs_t, + extract_zulip_members_t, + extract_zulip_messages_t, + ]: + assert ( + t.count().to_pyarrow().as_py() > 0 + ), f"No extracted data for {t.get_name()}" + + # transform + transform_gh_prs_t = transform_gh_prs(extract_gh_prs_t) + transform_gh_forks_t = transform_gh_forks(extract_gh_forks_t) + transform_gh_stars_t = transform_gh_stars(extract_gh_stars_t) + transform_gh_issues_t = transform_gh_issues(extract_gh_issues_t) + transform_gh_commits_t = transform_gh_commits(extract_gh_commits_t) + transform_gh_watchers_t = transform_gh_watchers(extract_gh_watchers_t) + transform_docs_t = transform_docs(extract_docs_t) + transform_zulip_members_t = transform_zulip_members(extract_zulip_members_t) + transform_zulip_messages_t = transform_zulip_messages(extract_zulip_messages_t) + + # data validation + for t in [ + transform_gh_prs_t, + transform_gh_forks_t, + transform_gh_stars_t, + transform_gh_issues_t, + transform_gh_commits_t, + transform_gh_watchers_t, + transform_docs_t, + transform_zulip_members_t, + transform_zulip_messages_t, + ]: + assert ( + t.count().to_pyarrow().as_py() > 0 + ), f"No transformed data for {t.get_name()}" + + # load + catalog.write_table(transform_gh_prs_t, GH_PRS_TABLE) + catalog.write_table(transform_gh_forks_t, GH_FORKS_TABLE) + catalog.write_table(transform_gh_stars_t, GH_STARS_TABLE) + catalog.write_table(transform_gh_issues_t, GH_ISSUES_TABLE) + catalog.write_table(transform_gh_commits_t, GH_COMMITS_TABLE) + catalog.write_table(transform_gh_watchers_t, GH_WATCHERS_TABLE) + catalog.write_table(transform_docs_t, DOCS_TABLE) + catalog.write_table(transform_zulip_members_t, ZULIP_MEMBERS_TABLE) + catalog.write_table(transform_zulip_messages_t, ZULIP_MESSAGES_TABLE) diff --git a/src/ibis_analytics/etl/extract.py b/src/ibis_analytics/etl/extract.py new file mode 100644 index 0000000..6603aec --- /dev/null +++ b/src/ibis_analytics/etl/extract.py @@ -0,0 +1,157 @@ +# imports +import os +import ibis + +from datetime import datetime +from ibis_analytics.config import ( + DATA_DIR, + RAW_DATA_DIR, + RAW_DATA_GH_DIR, + RAW_DATA_DOCS_DIR, + RAW_DATA_ZULIP_DIR, +) + +# set extracted_at timestamp +extracted_at = datetime.utcnow().isoformat() + + +# functions +def add_extracted_at(t): + """Add extracted_at column to table.""" + + # add extracted_at column and relocate it to the first position + t = t.mutate(extracted_at=ibis.literal(extracted_at)).relocate("extracted_at") + + return t + + +def constraints(t): + """Check common constraints for extracted tables.""" + + assert t.count().to_pyarrow().as_py() > 0, "table is empty!" + + return t + + +# extract data assets +def gh_commits(): + """Extract GitHub commits data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "commits.*.json") + gh_commits = ibis.read_json(data_glob) + + # add extracted_at column + gh_commits = gh_commits.pipe(add_extracted_at).pipe(constraints) + + return gh_commits + + +def gh_issues(): + """Extract GitHub issues data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "issues.*.json") + gh_issues = ibis.read_json(data_glob) + + # add extracted_at column + gh_issues = gh_issues.pipe(add_extracted_at).pipe(constraints) + + return gh_issues + + +def gh_prs(): + """Extract GitHub pull request (PR) data.""" + + # read in raw data + data_glob = os.path.join( + DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "pullRequests.*.json" + ) + gh_prs = ibis.read_json(data_glob) + + # add extracted_at column + gh_prs = gh_prs.pipe(add_extracted_at).pipe(constraints) + + return gh_prs + + +def gh_forks(): + """Extract GitHub forks data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "forks.*.json") + gh_forks = ibis.read_json(data_glob) + + # add extracted_at column + gh_forks = gh_forks.pipe(add_extracted_at).pipe(constraints) + + return gh_forks + + +def gh_stars(): + """Extract GitHub stargazers data.""" + + # read in raw data + data_glob = os.path.join( + DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "stargazers.*.json" + ) + gh_stars = ibis.read_json(data_glob) + + # add extracted_at column + gh_stars = gh_stars.pipe(add_extracted_at).pipe(constraints) + + return gh_stars + + +def gh_watchers(): + """Extract GitHub watchers data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_GH_DIR, "watchers.*.json") + gh_watchers = ibis.read_json(data_glob) + + # add extracted_at column + gh_watchers = gh_watchers.pipe(add_extracted_at).pipe(constraints) + + return gh_watchers + + +def docs(): + """Extract documentation data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_DOCS_DIR, "*.csv.gz") + docs = ibis.read_csv(data_glob) + + # add extracted_at column + docs = docs.pipe(add_extracted_at).pipe(constraints) + + return docs + + +def zulip_members(): + """Extract Zulip members data.""" + + # read in raw data + data_glob = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_ZULIP_DIR, "members.json") + zulip_members = ibis.read_json(data_glob) + + # add extracted_at column + zulip_members = zulip_members.pipe(add_extracted_at).pipe(constraints) + + return zulip_members + + +def zulip_messages(): + """Extract Zulip messages data.""" + + # read in raw data + data_glob = os.path.join( + DATA_DIR, RAW_DATA_DIR, RAW_DATA_ZULIP_DIR, "messages.json" + ) + zulip_messages = ibis.read_json(data_glob) + + # add extracted_at column + zulip_messages = zulip_messages.pipe(add_extracted_at).pipe(constraints) + + return zulip_messages diff --git a/src/ibis_analytics/etl/load.py b/src/ibis_analytics/etl/load.py new file mode 100644 index 0000000..68c4527 --- /dev/null +++ b/src/ibis_analytics/etl/load.py @@ -0,0 +1,11 @@ +# imports +from ibis_analytics.catalog import Catalog + + +# functions +def load_table(table, table_name): + # instantiate catalog + catalog = Catalog() + + # load + catalog.write_table(table, table_name) diff --git a/src/ibis_analytics/etl/transform.py b/src/ibis_analytics/etl/transform.py new file mode 100644 index 0000000..8ba6122 --- /dev/null +++ b/src/ibis_analytics/etl/transform.py @@ -0,0 +1,223 @@ +# imports +import re +import ibis +import ibis.selectors as s + + +# transform functions +def preprocess(t): + """Common preprocessing steps.""" + + # ensure unique records + t = t.rename("snake_case") + t = t.distinct(on=~s.c("extracted_at"), keep="first").order_by("extracted_at") + + return t + + +def postprocess(t): + """Common postprocessing steps.""" + + # ensure consistent column casing + t = t.rename("snake_case") + + return t + + +# transform data assets +def gh_commits(gh_commits): + """Transform GitHub commits data.""" + + def transform(t): + t = t.unpack("node").unpack("author").rename("snake_case") + t = t.order_by(ibis._["committed_date"].desc()) + t = t.mutate(total_commits=ibis._.count().over(rows=(0, None))) + return t + + gh_commits = gh_commits.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_commits + + +def gh_issues(gh_issues): + """Transform GitHub issues data.""" + + def transform(t): + issue_state = ( + ibis.case().when(ibis._["is_closed"], "closed").else_("open").end() + ) + + t = t.unpack("node").unpack("author").rename("snake_case") + t = t.order_by(ibis._["created_at"].desc()) + t = t.mutate(is_closed=(ibis._["closed_at"] != None)) + t = t.mutate(total_issues=ibis._.count().over(rows=(0, None))) + t = t.mutate(state=issue_state) + t = ( + t.mutate( + is_first_issue=( + ibis.row_number().over( + ibis.window(group_by="login", order_by=t["created_at"]) + ) + == 0 + ) + ) + .relocate("is_first_issue", "login", "created_at") + .order_by(t["created_at"].desc()) + ) + return t + + gh_issues = gh_issues.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_issues + + +def gh_prs(gh_prs): + """Transform GitHub pull request (PR) data.""" + + def transform(t): + pull_state = ( + ibis.case() + .when(ibis._["is_merged"], "merged") + .when(ibis._["is_closed"], "closed") + .else_("open") + .end() + ) + + t = t.unpack("node").unpack("author").rename("snake_case") + t = t.order_by(ibis._["created_at"].desc()) + t = t.mutate(is_merged=(ibis._["merged_at"] != None)) + t = t.mutate(is_closed=(ibis._["closed_at"] != None)) + t = t.mutate(total_pulls=ibis._.count().over(rows=(0, None))) + # to remove bots + # t = t.filter( + # ~( + # (ibis._.login == "ibis-squawk-bot") + # | (ibis._.login == "pre-commit-ci") + # | (ibis._.login == "renovate") + # ) + # ) + t = t.mutate(state=pull_state) + t = t.mutate( + merged_at=ibis._["merged_at"].cast("timestamp") + ) # TODO: temporary fix + + # add first pull by login + t = ( + t.mutate( + is_first_pull=( + ibis.row_number().over( + ibis.window(group_by="login", order_by=t["created_at"]) + ) + == 0 + ) + ) + .relocate("is_first_pull", "login", "created_at") + .order_by(t["created_at"].desc()) + ) + return t + + gh_prs = gh_prs.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_prs + + +def gh_forks(gh_forks): + """Transform GitHub forks data.""" + + def transform(t): + t = t.unpack("node").unpack("owner").rename("snake_case") + t = t.order_by(ibis._["created_at"].desc()) + t = t.mutate(total_forks=ibis._.count().over(rows=(0, None))) + return t + + gh_forks = gh_forks.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_forks + + +def gh_stars(gh_stars): + """Transform GitHub stargazers data.""" + + def transform(t): + t = t.unpack("node").rename("snake_case") + # TODO: fix + t = t.order_by(ibis._["starred_at"].desc()) + t = t.mutate(company=ibis._["company"].fill_null("Unknown")) + t = t.mutate(total_stars=ibis._.count().over(rows=(0, None))) + return t + + gh_stars = gh_stars.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_stars + + +def gh_watchers(gh_watchers): + """Transform GitHub watchers data.""" + + def transform(t): + t = t.unpack("node").rename("snake_case") + # TODO: fix this + t = t.order_by(ibis._["updated_at"].desc()) + t = t.mutate(total_t=ibis._.count().over(rows=(0, None))) + t = t.order_by(ibis._["updated_at"].desc()) + return t + + gh_watchers = gh_watchers.pipe(preprocess).pipe(transform).pipe(postprocess) + return gh_watchers + + +def docs(t): + """ + Transform the docs data. + """ + + def transform(t): + t = t.rename({"path": "2_path", "timestamp": "date"}) + return t + + docs = t.pipe(preprocess).pipe(transform).pipe(postprocess) + return docs + + +def zulip_members(t): + """ + Transform the Zulip members data. + """ + + def transform(t): + # t = t.mutate(date_joined=ibis._["date_joined"].cast("timestamp")) + t = t.filter(ibis._["is_bot"] == False) + t = t.mutate( + total_members=ibis._.count().over( + rows=(0, None), order_by=ibis.desc("date_joined") + ) + ) + t = t.relocate("full_name", "date_joined", "timezone") + return t + + zulip_members = t.pipe(preprocess).pipe(transform).pipe(postprocess) + return zulip_members + + +def zulip_messages(t): + """ + Transform the Zulip messages data. + """ + + def transform(t): + t = t.mutate( + timestamp=ibis._["timestamp"].cast("timestamp"), + last_edit_timestamp=ibis._["last_edit_timestamp"].cast("timestamp"), + ) + t = t.filter(ibis._["stream_id"] != 405931) + t = t.mutate( + total_messages=ibis._.count().over( + rows=(0, None), order_by=ibis.desc("timestamp") + ) + ) + t = t.relocate( + "sender_full_name", + "display_recipient", + "subject", + "timestamp", + "last_edit_timestamp", + ) + return t + + zulip_messages = t.pipe(preprocess).pipe(transform).pipe(postprocess) + return zulip_messages diff --git a/dag/ingest.py b/src/ibis_analytics/ingest/__init__.py similarity index 66% rename from dag/ingest.py rename to src/ibis_analytics/ingest/__init__.py index 3445c98..127a6d5 100644 --- a/dag/ingest.py +++ b/src/ibis_analytics/ingest/__init__.py @@ -1,21 +1,32 @@ # imports import os +import sys import ibis -import time -import toml import json +import time import httpx import zulip -import inspect +import tomllib as toml import requests import logging as log -from ibis import _ from dotenv import load_dotenv -from datetime import datetime, timedelta, date - -from dag.graphql_queries import ( +from datetime import datetime, timedelta + +from ibis_analytics.config import ( + GH_REPO, + PYPI_PACKAGE, + ZULIP_URL, + DOCS_URL, + DATA_DIR, + RAW_DATA_DIR, + RAW_DATA_GH_DIR, + RAW_DATA_DOCS_DIR, + RAW_DATA_ZULIP_DIR, + RAW_DATA_PYPI_DIR, +) +from ibis_analytics.ingest.graphql_queries import ( issues_query, pulls_query, forks_query, @@ -24,18 +35,22 @@ watchers_query, ) +# configure logger +log.basicConfig(level=log.INFO) + # main function def main(): + """ + Ingest data. + """ # load environment variables load_dotenv() # ingest data - # ingest_docs() - ingest_zulip() - ingest_pypi() - ingest_gh() - # ingest_ci() # TODO: fix permissions, add assets + ingest_gh(gh_repo=GH_REPO) + ingest_zulip(zulip_url=ZULIP_URL) + ingest_docs(docs_url=DOCS_URL) # helper functions @@ -46,12 +61,15 @@ def write_json(data, filename): # ingest functions -def ingest_gh(): +def ingest_gh(gh_repo): """ - Ingest the GitHub data. + Ingest GitHub data. """ - # configure logger - log.basicConfig(level=log.INFO) + + def write_json(data, filename): + # write the data to a file + with open(filename, "w") as f: + json.dump(data, f, indent=4) # constants GRAPH_URL = "https://api.github.com/graphql" @@ -59,9 +77,7 @@ def ingest_gh(): # load environment variables GH_TOKEN = os.getenv("GITHUB_TOKEN") - # load config - config = toml.load("config.toml")["ingest"]["github"] - log.info(f"Using repos: {config['repos']}") + assert GH_TOKEN is not None and GH_TOKEN != "", "GITHUB_TOKEN is not set" # construct header headers = { @@ -162,6 +178,7 @@ def fetch_data(client, owner, repo, query_name, query, output_dir, num_items=100 filename = get_filename(query_name, page) output_path = os.path.join(output_dir, filename) log.info(f"\t\tWriting data to {output_path}") + write_json(data, output_path) variables["cursor"] = f"{cursor}" @@ -172,135 +189,35 @@ def fetch_data(client, owner, repo, query_name, query, output_dir, num_items=100 # increment page number page += 1 - except: + except Exception as e: # print error if response - log.error(f"\t\tFailed to fetch data for {owner}/{repo}") + log.error(f"\t\tFailed to fetch data for {owner}/{repo}: {e}") try: log.error(f"\t\t\tResponse: {resp.text}") - except: - pass - + except Exception as e: + log.error(f"\t\t\tFailed to print response: {e}") break # create a requests session with requests.Session() as client: - for repo in config["repos"]: + for repo in [gh_repo]: log.info(f"Fetching data for {repo}...") for query in queries: owner, repo_name = repo.split("/") output_dir = os.path.join( - "data", - "ingest", - "github", - owner, - repo_name, + DATA_DIR, + RAW_DATA_DIR, + RAW_DATA_GH_DIR, ) os.makedirs(output_dir, exist_ok=True) log.info(f"\tFetching data for {owner}/{repo_name} {query}...") fetch_data(client, owner, repo_name, query, queries[query], output_dir) -def ingest_pypi(): - """ - Ingest the PyPI data. - """ - # constants - # set DEFAULT_BACKFILL to the number of days - # since July 19th, 2015 until today - DEFAULT_BACKFILL = (datetime.now() - datetime(2015, 7, 19)).days - BIGQUERY_DATASET = "bigquery-public-data.pypi.file_downloads" - - # configure logger - log.basicConfig(level=log.INFO) - - # load environment variables - project_id = os.getenv("BQ_PROJECT_ID") - log.info(f"Project ID: {project_id}") - - # load config - config = toml.load("config.toml")["ingest"]["pypi"] - log.info(f"Packages: {config['packages']}") - - # configure lookback window - backfill = config["backfill"] if "backfill" in config else DEFAULT_BACKFILL - log.info(f"Backfill: {backfill}") - - # for each package - for package in config["packages"]: - log.info(f"Package: {package}") - # create output directory - output_dir = os.path.join("data", "ingest", "pypi", package) - os.makedirs(output_dir, exist_ok=True) - - # construct query - query = f""" - SELECT * - FROM `{BIGQUERY_DATASET}` - WHERE file.project = '{package}' - AND DATE(timestamp) - BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL {backfill} DAY) - AND CURRENT_DATE() - """.strip() - query = inspect.cleandoc(query) - - # connect to bigquery and execute query - con = ibis.connect(f"bigquery://{project_id}") - log.info(f"Executing query:\n{query}") - t = con.sql(query) - - # write to parquet - filename = f"file_downloads.parquet" - output_path = os.path.join(output_dir, filename) - log.info(f"Writing to: {output_path}") - t.to_parquet(output_path) - - -def ingest_ci(): - """ - Ingest the CI data. - """ - # constants - # set DEFAULT_BACKFILL to the number of days - # since July 19th, 2015 until today - DEFAULT_BACKFILL = (datetime.now() - datetime(2015, 7, 19)).days - - # configure logger - log.basicConfig(level=log.INFO) - - # load environment variables - project_id = os.getenv("BQ_PROJECT_ID") - log.info(f"Project ID: {project_id}") - - # load config - config = toml.load("config.toml")["ingest"]["ci"] - - # configure lookback window - backfill = config["backfill"] if "backfill" in config else DEFAULT_BACKFILL - log.info(f"Backfill: {backfill}") - - # make sure the data directory exists - os.makedirs("data/ingest/ci/ibis", exist_ok=True) - - # connect to databases - con = ibis.connect("duckdb://data/ingest/ci/ibis/raw.ddb") - bq_con = ibis.connect(f"bigquery://{project_id}/workflows") - - # copy over tables - for table in bq_con.list_tables(): - log.info(f"Writing table: {table}") - con.create_table(table, bq_con.table(table).to_pyarrow(), overwrite=True) - - -def ingest_zulip(): +def ingest_zulip(zulip_url, zulip_email: str = "cody@dkdc.dev"): """Ingest the Zulip data.""" - # constants - email = "cody@dkdc.dev" - # load config - config = toml.load("config.toml")["ingest"]["zulip"] - log.info(f"Using url: {config['url']}") - # configure logger log.basicConfig(level=log.INFO) @@ -308,7 +225,7 @@ def ingest_zulip(): zulip_key = os.getenv("ZULIP_KEY") # create the client - client = zulip.Client(email=email, site=config["url"], api_key=zulip_key) + client = zulip.Client(email=zulip_email, site=zulip_url, api_key=zulip_key) # get the users r = client.get_members() @@ -316,12 +233,12 @@ def ingest_zulip(): log.error(f"Failed to get users: {r}") else: members = r["members"] - # make sure the directory exists - os.makedirs("data/ingest/zulip", exist_ok=True) # write the users to a file filename = "members.json" - output_path = os.path.join("data", "ingest", "zulip", filename) + output_dir = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_ZULIP_DIR) + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, filename) log.info(f"Writing members to: {output_path}") write_json(members, output_path) @@ -351,28 +268,23 @@ def ingest_zulip(): messages = r["messages"] all_messages.extend(messages) - # make sure the directory exists - os.makedirs("data/ingest/zulip", exist_ok=True) - # write the messages to a file filename = "messages.json" - output_path = os.path.join("data", "ingest", "zulip", filename) + output_dir = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_ZULIP_DIR) + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, filename) log.info(f"Writing messages to: {output_path}") write_json(all_messages, output_path) -def ingest_docs(): +def ingest_docs(docs_url): """Ingest the docs data.""" - # (not so) constants - endpoint = "/api/v0/export" # configure logger log.basicConfig(level=log.INFO) - # load config - config = toml.load("config.toml")["ingest"]["docs"] - log.info(f"Using url: {config['url']}") - url = config["url"] + url = docs_url + endpoint = "/api/v0/export" # load environment variables goat_token = os.getenv("GOAT_TOKEN") @@ -429,14 +341,57 @@ def ingest_docs(): # write the export to a file try: - os.makedirs("data/ingest/docs", exist_ok=True) - filename = os.path.join("data", "ingest", "docs", "goatcounter.csv.gz") - with open(filename, "wb") as f: + output_dir = os.path.join(DATA_DIR, RAW_DATA_DIR, RAW_DATA_DOCS_DIR) + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join( + DATA_DIR, RAW_DATA_DIR, RAW_DATA_DOCS_DIR, "goatcounter.csv.gz" + ) + with open(output_path, "wb") as f: f.write(r.content) except: log.error(f"Failed to write export to file: {r}") return -if __name__ == "__main__": - main() +# def ingest_pypi(pypi_package): +# """ +# Ingest PyPI data. +# """ +# log.info(f"Fetching data for {pypi_package}...") +# +# # define external source +# host = "clickpy-clickhouse.clickhouse.com" +# port = 443 +# user = "play" +# database = "pypi" +# +# ch_con = ibis.clickhouse.connect( +# host=host, +# port=port, +# user=user, +# database=database, +# ) +# +# # create output directory +# output_dir = os.path.join(DATA_DIR, RAW_DATA_DIR, "pypi") +# os.makedirs(output_dir, exist_ok=True) +# +# # get table and metadata +# t = ch_con.table( +# "pypi_downloads_per_day_by_version_by_installer_by_type_by_country" +# ).filter(ibis._["project"] == pypi_package) +# min_date = t["date"].min().to_pyarrow().as_py() +# max_date = t["date"].max().to_pyarrow().as_py() +# +# # write data to parquet files +# date = min_date +# while date <= max_date: +# old_date = date +# date += timedelta(days=7) +# a = t.filter(t["date"] >= old_date, t["date"] < date) +# +# filename = f"start_date={old_date.strftime('%Y-%m-%d')}.parquet" +# log.info(f"\tWriting data to {os.path.join(output_dir, filename)}...") +# if a.count().to_pyarrow().as_py() > 0: +# a.to_parquet(os.path.join(output_dir, filename)) +# log.info(f"\tData written to {os.path.join(output_dir, filename)}...") diff --git a/dag/graphql_queries.py b/src/ibis_analytics/ingest/graphql_queries.py similarity index 100% rename from dag/graphql_queries.py rename to src/ibis_analytics/ingest/graphql_queries.py diff --git a/src/ibis_analytics/metrics.py b/src/ibis_analytics/metrics.py new file mode 100644 index 0000000..18c5e40 --- /dev/null +++ b/src/ibis_analytics/metrics.py @@ -0,0 +1,49 @@ +# imports +import ibis +from ibis_analytics.catalog import Catalog +import ibis.selectors as s + +from ibis_analytics.config import ( + PYPI_PACKAGE, + GH_PRS_TABLE, + GH_FORKS_TABLE, + GH_STARS_TABLE, + GH_ISSUES_TABLE, + GH_COMMITS_TABLE, + GH_WATCHERS_TABLE, + DOCS_TABLE, + ZULIP_MEMBERS_TABLE, + ZULIP_MESSAGES_TABLE, +) + + +# connect to PyPI data in the ClickHouse Cloud playground +host = "clickpy-clickhouse.clickhouse.com" +port = 443 +user = "play" +database = "pypi" + +ch_con = ibis.clickhouse.connect( + host=host, + port=port, + user=user, + database=database, +) + +# connect to catalog +catalog = Catalog() + +# get source tables +# TODO: use table names from config +pulls_t = catalog.table(GH_PRS_TABLE).cache() +stars_t = catalog.table(GH_STARS_TABLE).cache() +forks_t = catalog.table(GH_FORKS_TABLE).cache() +issues_t = catalog.table(GH_ISSUES_TABLE).cache() +commits_t = catalog.table(GH_COMMITS_TABLE).cache() +watchers_t = catalog.table(GH_WATCHERS_TABLE).cache() +downloads_t = ch_con.table( + "pypi_downloads_per_day_by_version_by_installer_by_type_by_country" +).filter(ibis._["project"] == PYPI_PACKAGE) +docs_t = catalog.table(DOCS_TABLE).cache() +zulip_members_t = catalog.table(ZULIP_MEMBERS_TABLE).cache() +zulip_messages_t = catalog.table(ZULIP_MESSAGES_TABLE).cache() diff --git a/website/.gitignore b/website/.gitignore new file mode 100644 index 0000000..075b254 --- /dev/null +++ b/website/.gitignore @@ -0,0 +1 @@ +/.quarto/ diff --git a/website/_quarto.yml b/website/_quarto.yml new file mode 100644 index 0000000..1d4bcd9 --- /dev/null +++ b/website/_quarto.yml @@ -0,0 +1,31 @@ +project: + type: website + output-dir: _output + +# website +website: + # basics + title: "Ibis Analytics" + description: "Ibis analytics with Ibis." + #favicon: logo.png + search: false + + # options + reader-mode: false + twitter-card: true + back-to-top-navigation: true + repo-url: https://github.com/lostmygithubaccount/ibis-analytics + + # footer + page-footer: + border: false + center: "Python analytics accelerator" + right: + - icon: github + href: https://github.com/lostmygithubaccount/ibis-analytics + +# theme +format: + html: + theme: minty + toc: true diff --git a/website/img/layers.png b/website/img/layers.png new file mode 100644 index 0000000..e4f721c Binary files /dev/null and b/website/img/layers.png differ diff --git a/website/img/ui-to-engine.png b/website/img/ui-to-engine.png new file mode 100644 index 0000000..cdeb599 Binary files /dev/null and b/website/img/ui-to-engine.png differ diff --git a/website/index.qmd b/website/index.qmd new file mode 100644 index 0000000..2f36ab1 --- /dev/null +++ b/website/index.qmd @@ -0,0 +1,48 @@ +--- +title: "Ibis analytics" +about: + template: solana + links: + - icon: github + text: github + href: https://github.com/lostmygithubaccount/ibis-analytics +--- + +***Ibis analytics with Ibis.*** + +![Source: [Voltron Data Composable Codex](https://voltrondata.com/codex)](img/layers.png) + +This project uses: + +1. **User interface**: [Ibis](https://github.com/ibis-project/ibis) (Python dataframe code and/or SQL) +2. **Execution engine**: [DuckDB](https://github.com/duckdb/duckdb) (local) and [ClickHouse](https://github.com/clickhouse/clickhouse) (remote) +3. **Data storage**: [Delta Lake](https://github.com/delta-io/delta) tables (local or cloud object storage) + +You can mix and match these components as needed. + +## Setup + +Install [`gh`](https://github.com/cli/cli) and [`just`](https://github.com/casey/just), then: + +```bash +gh repo clone lostmygithubaccount/ibis-analytics +cd ibis-analytics +just setup +. .venv/bin/activate +``` + +## Usage + +Use the CLI: + +```bash +ia +``` + +## Development + +Format your code: + +```bash +just fmt +```