Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sha256 validation #129

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 87 additions & 15 deletions apt-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,51 @@ def mkdir_with_dot_tmp(folder: Path)->Tuple[Path, Path]:
shutil.rmtree(str(tmpdir))
tmpdir.mkdir(parents=True, exist_ok=True)
return (folder, tmpdir)

def parse_packages_info(content: str, package_info = None, is_new = True):

if package_info is None:
package_info = {}

for pkg in content.split('\n\n'):
if len(pkg) < 10: # ignore blanks
continue
try:
pkg_filename = pattern_package_name.search(pkg).group(1)
pkg_size = int(pattern_package_size.search(pkg).group(1))
pkg_checksum = pattern_package_sha256.search(pkg).group(1)
if pkg_filename not in package_info:
jack9603301 marked this conversation as resolved.
Show resolved Hide resolved
if is_new:
pkg_info = {
'size': pkg_size,
'sha256': {
'new': pkg_checksum,
'old': None
}
}
else:
pkg_info = {
'size': None,
'sha256': {
'new': None,
'old': pkg_checksum
}
}
else:
pkg_info = package_info[pkg_filename]
if is_new:
pkg_info['size'] = pkg_size
pkg_info['sha256']['new'] = pkg_checksum
else:
pkg_info['sha256']['old'] = pkg_checksum
package_info.update({
pkg_filename: pkg_info
})
except:
print("Failed to parse one package description", flush=True)
traceback.print_exc()
return package_info
return package_info

def move_files_in(src: Path, dst: Path):
empty = True
Expand All @@ -110,6 +155,9 @@ def move_files_in(src: Path, dst: Path):
print(f"{src} is empty")

def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Path, deb_set: Dict[str, int])->int:

package_info = {}

if not dest_base_dir.is_dir():
print("Destination directory is empty, cannot continue")
return 1
Expand All @@ -134,6 +182,7 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
pkgidx_dir,pkgidx_tmp_dir = mkdir_with_dot_tmp(comp_dir / arch_dir)
with open(release_file, "r") as fd:
pkgidx_content=None
pkgidx_content_old = None
cnt_start=False
for line in fd:
if cnt_start:
Expand All @@ -146,6 +195,8 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
filename.startswith(f"Contents-{arch}"):
fn = Path(filename)
pkgidx_file = dist_dir / fn.parent / ".tmp" / fn.name
if pkgidx_file.stem == 'Packages':
pkgidx_file_old = Path(f'{dist_dir}/{filename}')
else:
print(f"Ignore the file {filename}")
continue
Expand Down Expand Up @@ -176,12 +227,38 @@ def apt_mirror(base_url: str, dist: str, repo: str, arch: str, dest_base_dir: Pa
pkgidx_content = content.decode('utf-8')
else:
print("unsupported format")
continue

package_info = parse_packages_info(pkgidx_content, package_info = package_info, is_new = True)

if not os.path.exists(pkgidx_file_old):
continue

with pkgidx_file_old.open('rb') as t: content = t.read()
jack9603301 marked this conversation as resolved.
Show resolved Hide resolved
if pkgidx_content_old is None and pkgidx_file_old.stem == 'Packages':
print(f"getting packages index content from old {pkgidx_file_old.name}", flush=True)
suffix = pkgidx_file.suffix
if suffix == '.xz':
pkgidx_content_old = lzma.decompress(content).decode('utf-8')
elif suffix == '.bz2':
pkgidx_content_old = bz2.decompress(content).decode('utf-8')
elif suffix == '.gz':
pkgidx_content_old = gzip.decompress(content).decode('utf-8')
elif suffix == '':
pkgidx_content_old = content.decode('utf-8')
else:
print("unsupported format")
continue

package_info = parse_packages_info(pkgidx_content_old, package_info = package_info, is_new = False)


# Currently only support SHA-256 checksum, because
# "Clients may not use the MD5Sum and SHA1 fields for security purposes, and must require a SHA256 or a SHA512 field."
# from https://wiki.debian.org/DebianRepository/Format#A.22Release.22_files
if line.startswith('SHA256:'):
cnt_start = True

if not cnt_start:
print("Cannot find SHA-256 checksum")
return 1
Expand Down Expand Up @@ -216,18 +293,13 @@ def collect_tmp_dir():
err = 0
deb_count = 0
deb_size = 0
for pkg in pkgidx_content.split('\n\n'):
if len(pkg) < 10: # ignore blanks
continue
try:
pkg_filename = pattern_package_name.search(pkg).group(1)
pkg_size = int(pattern_package_size.search(pkg).group(1))
pkg_checksum = pattern_package_sha256.search(pkg).group(1)
except:
print("Failed to parse one package description", flush=True)
traceback.print_exc()
err = 1
for pkg_filename, pkg_info in package_info.items():
pkg_size = pkg_info['size']
jack9603301 marked this conversation as resolved.
Show resolved Hide resolved
pkg_checksum = pkg_info['sha256']

if pkg_checksum['new'] is None and pkg_size is None:
continue

deb_count += 1
deb_size += pkg_size

Expand All @@ -237,8 +309,8 @@ def collect_tmp_dir():
dest_dir.mkdir(parents=True, exist_ok=True)
if dest_filename.suffix == '.deb':
deb_set[str(dest_filename.relative_to(dest_base_dir))] = pkg_size
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size:
print(f"Skipping {pkg_filename}, size {pkg_size}")
if dest_filename.is_file() and dest_filename.stat().st_size == pkg_size and pkg_checksum['old'] == pkg_checksum['new']:
print(f"Skipping {pkg_filename}, size {pkg_size}, sha256 {pkg_checksum['new']}")
continue

pkg_url=f"{base_url}/{pkg_filename}"
Expand All @@ -253,8 +325,8 @@ def collect_tmp_dir():
with dest_tmp_filename.open("rb") as f:
for block in iter(lambda: f.read(1024**2), b""):
sha.update(block)
if sha.hexdigest() != pkg_checksum:
print(f"Invalid checksum of {dest_filename}, expected {pkg_checksum}")
if sha.hexdigest() != pkg_checksum['new']:
print(f"Invalid checksum of {dest_filename}, expected {pkg_checksum['new']}")
dest_tmp_filename.unlink()
continue
dest_tmp_filename.rename(dest_filename)
Expand Down