From c5b8cc182f292b8dcc27205718c2c8c832664140 Mon Sep 17 00:00:00 2001 From: Adi Roiban Date: Mon, 9 Dec 2013 13:47:51 +0200 Subject: [PATCH] Initial suppor for AIX big archive format. --- arpy.py | 261 +++++++++++++++++++++++++++++++++- test/test_aix_big.py | 331 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 586 insertions(+), 6 deletions(-) create mode 100644 test/test_aix_big.py diff --git a/arpy.py b/arpy.py index eb580f9..84be177 100644 --- a/arpy.py +++ b/arpy.py @@ -50,6 +50,8 @@ random access through seek and tell functions is supported on the archived files """ +import struct + HEADER_BSD = 1 HEADER_GNU = 2 HEADER_GNU_TABLE = 3 @@ -76,8 +78,6 @@ class ArchiveFileHeader(object): def __init__(self, header, offset): """ Creates a new header from binary data starting at a specified offset """ - import struct - name, timestamp, uid, gid, mode, size, magic = struct.unpack( "16s 12s 6s 6s 8s 10s 2s", header) if magic != b"\x60\x0a": @@ -214,7 +214,7 @@ def _seek(self, offset): # reached EOF before target offset return - def __read_file_header(self, offset): + def _read_file_header(self, offset): """ Reads and returns a single new file header """ self._seek(offset) @@ -234,7 +234,7 @@ def __read_file_header(self, offset): if offset == self.next_header_offset: new_offset = file_header.file_offset + file_header.size - self.next_header_offset = Archive.__pad2(new_offset) + self.next_header_offset = Archive._pad2(new_offset) return file_header @@ -285,7 +285,7 @@ def __fix_name(self, header): return 0 @staticmethod - def __pad2(num): + def _pad2(num): """ Returns a 2-aligned offset """ if num % 2 == 0: return num @@ -302,7 +302,7 @@ def read_next_header(self): """ Reads a single new header, returning a its representation, or None at the end of file """ - header = self.__read_file_header(self.next_header_offset) + header = self._read_file_header(self.next_header_offset) if header is not None: self.headers.append(header) if header.type in (HEADER_BSD, HEADER_NORMAL, HEADER_GNU): @@ -330,3 +330,252 @@ def read_all_headers(self): def close(self): """ Closes the archive file descriptor """ self.file.close() + + +class AIXBigArchive(Archive): + """ + Combines several files into one. + + This is the default ar library archive format for the AIX operating system. + + This file format accommodates both 32-bit and 64-bit object files within + the same archive. + + """ + + def __init__(self, filename=None, fileobj=None): + self.headers = [] + self.file = fileobj or open(filename, "rb") + self._detect_seekable() + + self.position = 0 + self.archived_files = {} + + self.global_header = AIXBigGlobalHeader( + self._read(AIXBigGlobalHeader.LENGTH)) + + self.next_header_offset = self.global_header.first_member + + def _read_file_header(self, offset): + """ + Reads and returns a single new file header. + + Also updates next header pointer when this is call as part of an + iteration. + + """ + # We are already at the last member. + if offset == 0: + return None + + self._seek(offset) + + header_content = self._read(AIXBigFileHeader.MINIMUM_LENGTH) + + if len(header_content) == 0: + return None + + file_header = AIXBigFileHeader(header_content, offset) + + content = self._read(file_header.remaining_header_length) + file_header.updateRemainingHeader(content) + + # If we are in the process of iterating file members, + # update the next header. + if offset == self.next_header_offset: + + # If we are last in the list, set to 0. + if offset == self.global_header.last_member: + self.next_header_offset = 0 + else: + self.next_header_offset = file_header.next_member + + return file_header + + +class AIXBigGlobalHeader(object): + """ + Each archive begins with a fixed-length header that contains offsets to + special archive file members. The fixed-length header also contains the + magic number, which identifies the archive file. + + The fixed-length header has the following format: + + #define __AR_BIG__ + #define AIAMAGBIG "\n" /* Magic string */ + #define SAIAMAG 8 /*Length of magic string */ + struct fl_hdr /*Fixed-length header */ + + { + char fl_magic[SAIAMAG]; /* Archive magic string */ + /* Offset to member table -> members */ + char fl_memoff[20]; + /* Offset to global symbol table -> global_symbol */ + char fl_gstoff[20]; + /* Offset global symbol table for 64-bit objects -> global_symbol_64 */ + char fl_gst64off[20]; + /* Offset to first archive member -> first_member */ + char fl_fstmoff[20]; + /* Offset to last archive member -> last_member */ + char fl_lstmoff[20]; + /* Offset to first mem on free list -> first_free_member */ + char fl_freeoff[20]; + } + + Archive magic string is already parsed, so header is passed without the + magic string. + + """ + + LENGTH = 128 + AIAMAGBIG = '\n' + SAIAMAG = 8 + + def __init__(self, content): + self._content = content + self._checkValidType() + header = () + try: + header = struct.unpack("8s 20s 20s 20s 20s 20s 20s", content) + except struct.error: + raise ArchiveFormatError("bad format for global header") + + ( + magic, + self.members, + self.global_symbol, + self.global_symbol_64, + self.first_member, + self.last_member, + self.first_free_member, + ) = header + self.members = int(self.members) + self.global_symbol = int(self.global_symbol) + self.global_symbol_64 = int(self.global_symbol_64) + self.first_member = int(self.first_member) + self.last_member = int(self.last_member) + self.first_free_member = int(self.first_free_member) + + def _checkValidType(self): + """Raise an error if archive had bad type.""" + if len(self._content) < self.LENGTH: + raise ArchiveFormatError("file to short for AIX big format") + + if self._content[:8] != AIXBigGlobalHeader.AIAMAGBIG: + raise ArchiveFormatError("this is not an AIX big format archive") + + +class AIXBigFileHeader(object): + """ + Each archive file member is preceded by a file member header, + which contains the following information about the file member: + + #define AIAFMAG "`\n" /* Header trailer string*/ + struct ar_hdr /* File member header*/ + { + /* File member size - decimal -> size */ + char ar_size[20]; + /* Next member offset-decimal -> next_member */ + char ar_nxtmem[20]; + /* Previous member offset-dec -> previous_member */ + char ar_prvmem[20]; + /* File member date-decimal -> timestamp*/ + char ar_date[12]; + /* File member userid-decimal -> uid */ + char ar_uid[12]; + /* File member group id-decimal -> gid */ + char ar_gid[12]; + /* File member mode-octal -> mode */ + char ar_mode[12]; + /* File member name length-dec -> filename_length */ + char ar_namlen[4]; + union + { + char ar_name[2]; /* Start of member name */ + char ar_fmag[2]; /* AIAFMAG - string to end */ + }; + _ar_name; /* Header and member name */ + }; + + The member header provides support for member names up to 255 characters + long. The ar_namlen field contains the length of the member name. + The character string containing the member name begins at the _ar_name + field. The AIAFMAG string is cosmetic only. + + Each archive member header begins on an even-byte boundary. The total + length of a member header is: + + sizeof (struct ar_hdr) + ar_namlen + The actual data for a file member begins at the first even-byte boundary + beyond the member header and continues for the number of bytes specified + by the ar_size field. The ar command inserts null bytes for padding + where necessary. + + All information in the fixed-length header and archive members is in + printable ASCII format. Numeric information, with the exception of + the ar_mode field, is stored as decimal numbers; + the ar_mode field is stored in octal format. Thus, if the archive file + contains only printable files, you can print the archive. + + """ + AIAFMAG = '`\n' + + MINIMUM_LENGTH = 112 + type = HEADER_NORMAL + + def __init__(self, content, offset): + if len(content) < AIXBigFileHeader.MINIMUM_LENGTH: + raise ArchiveFormatError('file header too short') + + header = () + try: + header = struct.unpack("20s 20s 20s 12s 12s 12s 12s 4s", content) + except struct.error: + raise ArchiveFormatError("bad format for file header") + + ( + self.size, + self.next_member, + self.previous_member, + self.timestamp, + self.uid, + self.gid, + self.mode, + self.filename_length, + ) = header + + self.size = int(self.size) + self.next_member = int(self.next_member) + self.previous_member = int(self.previous_member) + self.timestamp = int(self.timestamp) + self.uid = int(self.uid) + self.gid = int(self.gid) + self.mode = int(self.mode, 8) + self.filename_length = int(self.filename_length) + self._header_offset = offset + + @property + def remaining_header_length(self): + """Length of filename content raw data.""" + # actual_filename + ALIGN_PAD + HEADER_TRAILING_STRING + return Archive._pad2(self.filename_length + len(self.AIAFMAG)) + + @property + def relative_file_offset(self): + """Offset to file content start, relative to header.""" + return self.MINIMUM_LENGTH + self.remaining_header_length + + @property + def file_offset(self): + """Offset to file content start, absolute to file.""" + return self._header_offset + self.relative_file_offset + + def updateRemainingHeader(self, content): + """Update header with the variable length content.""" + if len(content) < self.remaining_header_length: + raise ArchiveFormatError('file header end too short') + + if not content.endswith(self.AIAFMAG): + raise ArchiveFormatError("bad ending for file header") + + self.name = content[:self.filename_length] diff --git a/test/test_aix_big.py b/test/test_aix_big.py new file mode 100644 index 0000000..3bfdfcf --- /dev/null +++ b/test/test_aix_big.py @@ -0,0 +1,331 @@ +"""Tests for archives in AIX Big Format.""" +from cStringIO import StringIO +import arpy +import random +import string +import unittest + + +class AIXBigFormatMixin(object): + """Common code for testing AIX big format archive.""" + + def getFileWithContent(self, content): + """Return a file like object with `content`.""" + return StringIO(content) + + def getGlobalHeaderContent(self, first_member=None, last_member=None): + """Return a valid content for global header with random data.""" + if first_member is None: + first_member = random.randint(128, 10000) + + if last_member is None: + last_member = random.randint(128, 10000) + + content = ( + '\n' + '91268 ' + '91458 ' + '93664 ' + '%-20d' + '%-20d' + '234 ' + ) % (first_member, last_member) + return content + + def getFileHeaderInitialContent(self, + content_length=None, filename_length=None, next_member=None): + """Return a file header initial content with random data.""" + + if content_length is None: + content_length = random.randint(1, 43563) + if filename_length is None: + filename_length = random.randint(1, 255) + if next_member is None: + next_member = random.randint(1, 43563) + + initial_content = ( + '%-20d' + '%-20d' + '123 ' + '1087823288 ' + '300 ' + '301 ' + '640 ' + '%-4d' + ) % (content_length, next_member, filename_length) + return initial_content + + def getRandomString(self, + size=12, chars=string.ascii_uppercase + string.digits): + """Return a string with random data.""" + return ''.join(random.choice(chars) for x in range(size)) + + def getFileEntry(self, name=None, content=None, next_member=None): + """Return raw content for a file entry from the archive.""" + if name is None: + name = self.getRandomString() + + if content is None: + content = self.getRandomString() + + initial_content = self.getFileHeaderInitialContent( + content_length=len(content), + filename_length=len(name), + next_member=next_member, + ) + + pad = '' + if len(name) % 2: + pad = '\0' + + return ( + initial_content + + name + pad + '`\n' + + content) + + +class TestArchiveAIXBigFormat(unittest.TestCase, AIXBigFormatMixin): + """Test for loading archives in AIX big format.""" + + def test_init_bad_format(self): + """Raise an error when file is not AIX big format.""" + content = self.getFileWithContent("some bad content") + + with self.assertRaises(arpy.ArchiveFormatError): + arpy.AIXBigArchive(fileobj=content) + + def test_init_good_format(self): + """It can be initialized with a valid AIX big format file.""" + first_member = 123 + content = self.getFileWithContent( + self.getGlobalHeaderContent(first_member=first_member)) + + archive = arpy.AIXBigArchive(fileobj=content) + + self.assertIsNotNone(archive.global_header) + self.assertEqual(first_member, archive.global_header.first_member) + self.assertEqual(first_member, archive.next_header_offset) + self.assertEqual([], archive.headers) + self.assertEqual({}, archive.archived_files) + # The cursor is now at the end of the header. + self.assertEqual(archive.global_header.LENGTH, archive.position) + + def test_read_next_header_end_of_list(self): + """None is returned when there is no more a next header.""" + content = self.getFileWithContent(self.getGlobalHeaderContent()) + archive = arpy.AIXBigArchive(fileobj=content) + archive.next_header_offset = 0 + + result = archive.read_next_header() + + self.assertIsNone(result) + + def test_read_next_header_start(self): + """ + After archive is initialized, it will read the first member and will + update the next member pointer. + + """ + # Add a gap after global header to test seeking. + gap = 'Xdssd' + first_member = 128 + len(gap) + next_member = 1024 + file_content = 'ABCDE-MARKER' + content = self.getFileWithContent( + self.getGlobalHeaderContent(first_member=first_member) + + gap + + self.getFileEntry(next_member=next_member, content=file_content) + ) + archive = arpy.AIXBigArchive(fileobj=content) + + file_header = archive.read_next_header() + + self.assertEqual(next_member, archive.next_header_offset) + self.assertIn(file_header, archive.headers) + data = archive.archived_files[file_header.name] + self.assertIsNotNone(data) + self.assertEqual(file_content, data.read()) + + def test_read_next_header_end_of_file(self): + """ + Return None when we are at the end of the file. + + """ + content = self.getFileWithContent( + self.getGlobalHeaderContent() + + self.getFileEntry() + ) + archive = arpy.AIXBigArchive(fileobj=content) + archive.next_header_offset = len(content.getvalue()) + + result = archive.read_next_header() + + self.assertIsNone(result) + self.assertEqual([], archive.headers) + + def test_read_next_header_last_header(self): + """ + Set next_member to 0 when we have read the last member. + + """ + first_member = 128 + # Use event size to avoid padding + first_name = self.getRandomString(size=6) + last_name = self.getRandomString() + first_content = self.getRandomString() + next_member = 128 + 112 + len(first_name) + 2 + len(first_content) + content = self.getFileWithContent( + self.getGlobalHeaderContent( + first_member=first_member, last_member=next_member) + + self.getFileEntry( + name=first_name, + next_member=next_member, + content=first_content, + ) + + self.getFileEntry(name=last_name) + ) + archive = arpy.AIXBigArchive(fileobj=content) + # Read first header. + archive.read_next_header() + + # Read last header. + result = archive.read_next_header() + self.assertIsNotNone(result) + self.assertEqual(last_name, result.name) + + # After last header. + result = archive.read_next_header() + self.assertIsNone(result) + + +class TestAIXBigGlobalHeader(unittest.TestCase): + """Tests for global header or AIX big format archive.""" + + def test_short_header(self): + """An error is raised when header is too short.""" + with self.assertRaises(arpy.ArchiveFormatError) as context: + arpy.AIXBigGlobalHeader('\nshort_header') + + self.assertIn('file to short', context.exception.message) + + def test_bad_magic_marker(self): + """An error is raised when header does not starts with a marker.""" + with self.assertRaises(arpy.ArchiveFormatError) as context: + # Create a file which can theoretically accommodate a header. + arpy.AIXBigGlobalHeader('a' * (arpy.AIXBigGlobalHeader.LENGTH + 1)) + + self.assertIn('not an AIX big', context.exception.message) + + def test_parse_content(self): + """Global header content is parsed for all fields.""" + content = ( + '\n' + '91268 ' + '91458 ' + '93664 ' + '136 ' + '43270 ' + '234 ' + ) + global_header = arpy.AIXBigGlobalHeader(content) + + self.assertEqual(91268, global_header.members) + self.assertEqual(91458, global_header.global_symbol) + self.assertEqual(93664, global_header.global_symbol_64) + self.assertEqual(136, global_header.first_member) + self.assertEqual(43270, global_header.last_member) + self.assertEqual(234, global_header.first_free_member) + + +class TestAIXBigFileHeader(unittest.TestCase, AIXBigFormatMixin): + """Test for a file header from an AIX big format archive.""" + + def test_short_header(self): + """An error is raised if header is too short.""" + with self.assertRaises(arpy.ArchiveFormatError) as context: + arpy.AIXBigFileHeader('short-header', None) + + self.assertIn('file header too short', context.exception.message) + + def test_initial_parse(self): + """ + It is initialized with the minimum fixed header size which does not + contains the variable file name. + + """ + offset = random.randint(128, 1023) + initial_content = ( + '1216 ' + '1466 ' + '123 ' + '1087823288 ' + '300 ' + '301 ' + '640 ' + '6 ' + ) + header = arpy.AIXBigFileHeader(initial_content, offset) + + # Check parded data. + self.assertEqual(1216, header.size) + self.assertEqual(1466, header.next_member) + self.assertEqual(123, header.previous_member) + self.assertEqual(1087823288, header.timestamp) + self.assertEqual(300, header.uid) + self.assertEqual(301, header.gid) + self.assertEqual(416, header.mode) # Parsed in octal. + self.assertEqual(6, header.filename_length) + + # Check other members. + self.assertEqual(arpy.HEADER_NORMAL, header.type) + self.assertEqual(6 + 2, header.remaining_header_length) + + self.assertEqual(112 + 6 + 2, header.relative_file_offset) + self.assertEqual(offset + 112 + 6 + 2, header.file_offset) + + def test_remaining_header_length_already_aligned(self): + """No padding is added if header ends at an aligned address.""" + initial_content = self.getFileHeaderInitialContent(filename_length=6) + header = arpy.AIXBigFileHeader(initial_content, None) + + self.assertEqual(6 + 2, header.remaining_header_length) + + def test_remaining_header_length_already_unaligned(self): + """ + A 1 byte padding is added to align the header ending / file content + start at 2 bytes offset. + + """ + initial_content = self.getFileHeaderInitialContent(filename_length=7) + header = arpy.AIXBigFileHeader(initial_content, None) + + self.assertEqual(7 + 1 + 2, header.remaining_header_length) + + def test_updateRemainingHeader_short(self): + """An error is raised if content is too short.""" + initial_content = self.getFileHeaderInitialContent(filename_length=45) + header = arpy.AIXBigFileHeader(initial_content, None) + + with self.assertRaises(arpy.ArchiveFormatError) as context: + header.updateRemainingHeader('short-filename') + + self.assertIn('file header end too short', context.exception.message) + + def test_updateRemainingHeader_bad_end_marker(self): + """An error is raised when the file header end marker is not found.""" + initial_content = self.getFileHeaderInitialContent(filename_length=5) + header = arpy.AIXBigFileHeader(initial_content, None) + + with self.assertRaises(arpy.ArchiveFormatError) as context: + header.updateRemainingHeader('12345\nAB') + + self.assertIn('bad ending for file header', context.exception.message) + + def test_updateRemainingHeader_ok(self): + """Filename is updated from remaining header.""" + initial_content = self.getFileHeaderInitialContent(filename_length=5) + header = arpy.AIXBigFileHeader(initial_content, None) + + header.updateRemainingHeader('12345\0`\n') + + self.assertEqual('12345', header.name)