Skip to content

Commit

Permalink
Remove unused asset features
Browse files Browse the repository at this point in the history
  • Loading branch information
thesadru committed Aug 11, 2024
1 parent 7481a2d commit fd85ccf
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 207 deletions.
19 changes: 2 additions & 17 deletions arkprts/assets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class Assets(abc.ABC):
def __init__(
self,
*,
default_server: netn.ArknightsServer = "en",
default_server: netn.ArknightsServer | None = None,
json_loads: typing.Callable[[bytes], typing.Any] = json.loads,
) -> None:
self.default_server = default_server
self.default_server = default_server or "en"
self.loaded = False
self.excel_cache = {}
self.json_loads = json_loads
Expand Down Expand Up @@ -75,10 +75,6 @@ async def update_assets(self) -> None:
def get_file(self, path: str, *, server: netn.ArknightsServer | None = None) -> bytes:
"""Get an extracted asset file. If server is None any server is allowed with preference for default server."""

@abc.abstractmethod
async def aget_file(self, path: str, *, server: netn.ArknightsServer | None = None) -> bytes:
"""Get an extracted asset file without requiring load."""

def get_excel(self, name: str, *, server: netn.ArknightsServer | None = None) -> models.DDict:
"""Get a gamedata table file."""
path = f"gamedata/excel/{name}.json"
Expand All @@ -90,17 +86,6 @@ def get_excel(self, name: str, *, server: netn.ArknightsServer | None = None) ->

return models.DDict(data)

async def aget_excel(self, name: str, *, server: netn.ArknightsServer | None = None) -> models.DDict:
"""Get a gamedata table file without requiring load."""
path = f"gamedata/excel/{name}.json"
if data := self.excel_cache.setdefault(server or self.default_server, {}).get(path):
return models.DDict(data)

data = self.json_loads(await self.aget_file(path, server=server))
self.excel_cache[server or self.default_server][path] = data

return models.DDict(data)

def __getitem__(self, name: str) -> models.DDict:
"""Get a gamedata table file."""
return self.get_excel(name)
Expand Down
156 changes: 46 additions & 110 deletions arkprts/assets/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
from __future__ import annotations

import asyncio
import concurrent.futures
import fnmatch
import functools
import io
import json
import logging
Expand All @@ -20,7 +18,6 @@
import subprocess
import tempfile
import typing
import warnings
import zipfile

from arkprts import network as netn
Expand Down Expand Up @@ -56,7 +53,7 @@ def unzip_only_file(stream: io.BytesIO | bytes) -> bytes:

def resolve_unity_asset_cache(filename: str, server: netn.ArknightsServer) -> pathlib.Path:
"""Resolve a path to a cached arknights ab file."""
path = pathlib.Path(tempfile.gettempdir()) / "ArknightsUnity" / filename
path = netn.TEMP_DIR / "ArknightsAB" / filename
path.parent.mkdir(parents=True, exist_ok=True)
return path.with_suffix(".ab")

Expand Down Expand Up @@ -117,7 +114,8 @@ def run_flatbuffers(
]
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False) # noqa: S603, UP022
if result.returncode != 0:
file = pathlib.Path(tempfile.mktemp(".log"))
file = pathlib.Path(tempfile.mktemp(".log", dir=netn.TEMP_DIR / "flatbufferlogs"))
file.parent.mkdir(parents=True, exist_ok=True)
file.write_bytes(result.stdout + b"\n\n\n\n" + result.stderr)
raise ValueError(
f"flatc failed with code {result.returncode}: {file} `{shlex.join(args)}` (random exit code likely means a faulty FBS file was provided)",
Expand All @@ -132,7 +130,7 @@ def resolve_fbs_schema_directory(server: typing.Literal["cn", "yostar"]) -> path
if path:
return pathlib.Path(path)

core_path = pathlib.Path(tempfile.gettempdir()) / "ArknightsFBS"
core_path = netn.APPDATA_DIR / "ArknightsFBS"
core_path.mkdir(parents=True, exist_ok=True)
path = core_path / server / "OpenArknightsFBS" / "FBS"
os.environ[f"FLATBUFFERS_SCHEMA_DIR_{server.upper()}"] = str(path)
Expand Down Expand Up @@ -178,7 +176,7 @@ def decrypt_fbs_file(
if rsa:
data = data[128:]

tempdir = pathlib.Path(tempfile.gettempdir()) / "TempArknightsFBS"
tempdir = netn.TEMP_DIR / "ArknightsFBS"
tempdir.mkdir(parents=True, exist_ok=True)

fbs_path = tempdir / (table_name + ".bytes")
Expand Down Expand Up @@ -235,23 +233,15 @@ def normalize_json(data: bytes, *, indent: int = 4, lenient: bool = True) -> byt
DYNP = r"assets/torappu/dynamicassets/"


def unpack_assets(
def find_ab_assets(
asset: UnityPyAsset,
target_container: str | None = None,
# target_path: str | None = None,
*,
server: netn.ArknightsServer | None = None,
server: netn.ArknightsServer,
normalize: bool = False,
) -> typing.Iterable[tuple[str, bytes]]:
"""Yield relative paths and data for a unity asset."""
for container, obj in asset.container.items():
if target_container and container != target_container:
continue

if obj.type.name == "TextAsset":
if server is None:
raise TypeError("Server required for text decryption")

if match := re.match(DYNP + r"(.+\.txt)", container):
data = obj.read()
yield (match[1], data.script)
Expand Down Expand Up @@ -293,23 +283,28 @@ def unpack_assets(
continue


def guess_asset_path(path: str, hot_update_list: typing.Any) -> typing.Sequence[str]:
"""Return a sequence of all files thought to be needed to be downloaded for an asset to be available."""
# images have to be added later
match = re.match(r"(gamedata/\w+).json", path)
if not match:
return []
def extract_ab(
ab_path: PathLike,
save_directory: PathLike,
*,
server: netn.ArknightsServer,
normalize: bool = False,
) -> typing.Sequence[pathlib.Path]:
"""Extract an AB file and save files. Returns a list of found files."""
ab_path = pathlib.Path(ab_path)
save_directory = pathlib.Path(save_directory)
asset = load_unity_file(ab_path.read_bytes())

filename = match[0]
paths: list[pathlib.Path] = []
for unpacked_rel_path, unpacked_data in find_ab_assets(asset, server=server, normalize=normalize):
savepath = save_directory / server / unpacked_rel_path
savepath.parent.mkdir(exist_ok=True, parents=True)
savepath.write_bytes(unpacked_data)

asset_paths: list[str] = []
for info in hot_update_list["abInfos"]:
# just supporting excel for now
match = re.match(DYNP + filename + r"(?:[a-fA-F0-9]{6})?\.ab", info["name"])
if match:
asset_paths.append(info["name"])
LOGGER.debug("Extracted asset %s from %s for server %s", unpacked_rel_path, ab_path.name, server)
paths.append(savepath)

return asset_paths
return paths


def get_outdated_hashes(hot_update_now: typing.Any, hot_update_before: typing.Any) -> typing.Sequence[str]:
Expand Down Expand Up @@ -348,11 +343,10 @@ def __init__(
except OSError as e:
raise ImportError("Cannot use BundleAssets without a flatc executable") from e

super().__init__(default_server=default_server or "en", json_loads=json_loads)
super().__init__(default_server=default_server or (network and network.default_server), json_loads=json_loads)

temporary_directory = pathlib.Path(tempfile.gettempdir())
self.directory = pathlib.Path(directory or temporary_directory / "ArknightsResources")
self.network = network or netn.NetworkSession(default_server=default_server)
self.directory = pathlib.Path(directory or netn.APPDATA_DIR / "ArknightsResources")
self.network = network or netn.NetworkSession(default_server=self.default_server)

async def _download_asset(self, path: str, *, server: netn.ArknightsServer | None = None) -> bytes:
"""Download a raw zipped unity asset."""
Expand Down Expand Up @@ -381,53 +375,22 @@ def _get_current_hot_update_list(self, server: netn.ArknightsServer) -> typing.A
if not path.exists():
return None

path.parent.mkdir(exist_ok=True, parents=True)
with path.open("r") as file:
return json.load(file)

async def _download_unity_file(
self,
path: str,
*,
save: bool = True,
server: netn.ArknightsServer | None = None,
) -> bytes:
"""Download an asset and return it unzipped."""
) -> pathlib.Path:
"""Download an asset and return its path."""
LOGGER.debug("Downloading and unzipping asset %s for server %s", path, server)
zipped_data = await self._download_asset(path, server=server)
data = unzip_only_file(zipped_data)
if save:
p = resolve_unity_asset_cache(path, server=server or self.default_server)
p.write_bytes(data)

return data

def _parse_and_save(
self,
data: bytes,
*,
target_container: str | None = None,
server: netn.ArknightsServer | None = None,
normalize: bool = False,
) -> typing.Iterable[tuple[str, bytes]]:
"""Download and extract an asset."""
server = server or self.default_server

asset = load_unity_file(data)

fetched_any = False
for fetched_any, (unpacked_rel_path, unpacked_data) in enumerate(
unpack_assets(asset, target_container, server=server, normalize=normalize),
1,
):
savepath = self.directory / server / unpacked_rel_path
savepath.parent.mkdir(exist_ok=True, parents=True)
savepath.write_bytes(unpacked_data)

yield (unpacked_rel_path, unpacked_data)

if not fetched_any:
warnings.warn(f"Unpacking yielded no results (container: {target_container}) ")
cache_path = resolve_unity_asset_cache(path, server=server or self.default_server)
cache_path.write_bytes(data)
return cache_path

async def update_assets(
self,
Expand All @@ -449,34 +412,27 @@ async def update_assets(
return

hot_update_list = await self._get_hot_update_list(server)
requested_names = [info["name"] for info in hot_update_list["abInfos"] if fnmatch.fnmatch(info["name"], allow)]

old_hot_update_list = self._get_current_hot_update_list(server)

requested_names = [info["name"] for info in hot_update_list["abInfos"] if fnmatch.fnmatch(info["name"], allow)]
if old_hot_update_list and not force:
outdated_names = set(get_outdated_hashes(hot_update_list, old_hot_update_list))
requested_names = [name for name in requested_names if name in outdated_names]

if any("gamedata" in name for name in requested_names):
await update_fbs_schema()

datas = await asyncio.gather(*(self._download_unity_file(name, server=server) for name in requested_names))
loop = asyncio.get_event_loop()
# this should be a ProcessPoolExecutor but pickling is a problem in classes
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
loop.run_in_executor(
executor,
functools.partial(self._parse_and_save, d, server=server, normalize=normalize),
)
for d in datas
]
for name, f in zip(requested_names, asyncio.as_completed(futures)):
try:
for path, _ in await f:
LOGGER.debug("Extracted asset %s from %s for server %s", path, name, server)
except Exception as e:
LOGGER.exception("Failed to extract asset %s for server %s", name, server, exc_info=e)
# download and extract assets
ab_file_paths = await asyncio.gather(
*(self._download_unity_file(name, server=server) for name in requested_names),
)
for path in ab_file_paths:
try:
extract_ab(path, self.directory, server=server, normalize=normalize)
except Exception as e:
LOGGER.exception("Failed to extract asset %s for server %s", path.name, server, exc_info=e)

# save new hot_update_list
hot_update_list_path = self.directory / server / "hot_update_list.json"
hot_update_list_path.parent.mkdir(parents=True, exist_ok=True)
with hot_update_list_path.open("w") as file:
Expand All @@ -487,23 +443,3 @@ async def update_assets(
def get_file(self, path: str, *, server: netn.ArknightsServer | None = None) -> bytes:
"""Get an extracted asset file. If server is None any server is allowed with preference for default server."""
return (self.directory / (server or self.default_server) / path).read_bytes()

async def aget_file(self, path: str, *, server: netn.ArknightsServer | None = None, save: bool = True) -> bytes:
"""Get an extracted asset file without requiring load."""
server = server or self.default_server
hot_update_list = await self._get_hot_update_list(server)
asset_paths = guess_asset_path(path, hot_update_list)
if not asset_paths:
raise ValueError("No viable asset path found, please load all assets and use get_file.")

for potential_asset_path in asset_paths:
asset = load_unity_file(await self._download_unity_file(potential_asset_path, server=server))
for output_path, data in unpack_assets(asset, path, server=server):
if save:
savepath = self.directory / server / output_path
savepath.parent.mkdir(exist_ok=True, parents=True)
savepath.write_bytes(data)

return data

raise ValueError("File not found, please load all assets and use get_file.")
Loading

0 comments on commit fd85ccf

Please sign in to comment.