-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathpaycheck.py
executable file
·295 lines (253 loc) · 11.6 KB
/
paycheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#!/usr/bin/env python
#
# Copyright (C) 2013 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Command-line tool for checking and applying Chrome OS update payloads."""
from __future__ import absolute_import
from __future__ import print_function
# pylint: disable=import-error
import argparse
import filecmp
import os
import sys
import tempfile
# pylint: disable=redefined-builtin
from six.moves import zip
from update_payload import error
lib_dir = os.path.join(os.path.dirname(__file__), 'lib')
if os.path.exists(lib_dir) and os.path.isdir(lib_dir):
sys.path.insert(1, lib_dir)
import update_payload # pylint: disable=wrong-import-position
_TYPE_FULL = 'full'
_TYPE_DELTA = 'delta'
def CheckApplyPayload(args):
"""Whether to check the result after applying the payload.
Args:
args: Parsed command arguments (the return value of
ArgumentParser.parse_args).
Returns:
Boolean value whether to check.
"""
return args.dst_part_paths is not None
def ApplyPayload(args):
"""Whether to apply the payload.
Args:
args: Parsed command arguments (the return value of
ArgumentParser.parse_args).
Returns:
Boolean value whether to apply the payload.
"""
return CheckApplyPayload(args) or args.out_dst_part_paths is not None
def ParseArguments(argv):
"""Parse and validate command-line arguments.
Args:
argv: command-line arguments to parse (excluding the program name)
Returns:
Returns the arguments returned by the argument parser.
"""
parser = argparse.ArgumentParser(
description=('Applies a Chrome OS update PAYLOAD to src_part_paths'
'emitting dst_part_paths, respectively. '
'src_part_paths are only needed for delta payloads. '
'When no partitions are provided, verifies the payload '
'integrity.'),
epilog=('Note: a payload may verify correctly but fail to apply, and '
'vice versa; this is by design and can be thought of as static '
'vs dynamic correctness. A payload that both verifies and '
'applies correctly should be safe for use by the Chrome OS '
'Update Engine. Use --check to verify a payload prior to '
'applying it.'),
formatter_class=argparse.RawDescriptionHelpFormatter
)
check_args = parser.add_argument_group('Checking payload integrity')
check_args.add_argument('-c', '--check', action='store_true', default=False,
help=('force payload integrity check (e.g. before '
'applying)'))
check_args.add_argument('-r', '--report', metavar='FILE',
help="dump payload report (`-' for stdout)")
check_args.add_argument('-t', '--type', dest='assert_type',
help='assert the payload type',
choices=[_TYPE_FULL, _TYPE_DELTA])
check_args.add_argument('-z', '--block-size', metavar='NUM', default=0,
type=int,
help='assert a non-default (4096) payload block size')
check_args.add_argument('-u', '--allow-unhashed', action='store_true',
default=False, help='allow unhashed operations')
check_args.add_argument('-d', '--disabled_tests', default=(), metavar='',
help=('space separated list of tests to disable. '
'allowed options include: ' +
', '.join(update_payload.CHECKS_TO_DISABLE)),
choices=update_payload.CHECKS_TO_DISABLE)
check_args.add_argument('-k', '--key', metavar='FILE',
help=('override standard key used for signature '
'validation'))
check_args.add_argument('-m', '--meta-sig', metavar='FILE',
help='verify metadata against its signature')
check_args.add_argument('-s', '--metadata-size', metavar='NUM', default=0,
help='the metadata size to verify with the one in'
' payload')
check_args.add_argument('--part_sizes', metavar='NUM', nargs='+', type=int,
help='override partition size auto-inference')
apply_args = parser.add_argument_group('Applying payload')
# TODO(ahassani): Extent extract-bsdiff to puffdiff too.
apply_args.add_argument('-x', '--extract-bsdiff', action='store_true',
default=False,
help=('use temp input/output files with BSDIFF '
'operations (not in-place)'))
apply_args.add_argument('--bspatch-path', metavar='FILE',
help='use the specified bspatch binary')
apply_args.add_argument('--puffpatch-path', metavar='FILE',
help='use the specified puffpatch binary')
apply_args.add_argument('--src_part_paths', metavar='FILE', nargs='+',
help='source partitition files')
apply_args.add_argument('--dst_part_paths', metavar='FILE', nargs='+',
help='destination partition files')
apply_args.add_argument('--out_dst_part_paths', metavar='FILE', nargs='+',
help='created destination partition files')
parser.add_argument('payload', metavar='PAYLOAD', help='the payload file')
parser.add_argument('--part_names', metavar='NAME', nargs='+',
help='names of partitions')
# Parse command-line arguments.
args = parser.parse_args(argv)
# There are several options that imply --check.
args.check = (args.check or args.report or args.assert_type or
args.block_size or args.allow_unhashed or
args.disabled_tests or args.meta_sig or args.key or
args.part_sizes is not None or args.metadata_size)
# Makes sure the following arguments have the same length as |part_names| if
# set.
for arg in ['part_sizes', 'src_part_paths', 'dst_part_paths',
'out_dst_part_paths']:
if getattr(args, arg) is None:
# Parameter is not set.
continue
if len(args.part_names) != len(getattr(args, arg, [])):
parser.error('partitions in --%s do not match --part_names' % arg)
def _IsSrcPartPathsProvided(args):
return args.src_part_paths is not None
# Makes sure parameters are coherent with payload type.
if ApplyPayload(args):
if _IsSrcPartPathsProvided(args):
if args.assert_type == _TYPE_FULL:
parser.error('%s payload does not accept source partition arguments'
% _TYPE_FULL)
else:
args.assert_type = _TYPE_DELTA
else:
if args.assert_type == _TYPE_DELTA:
parser.error('%s payload requires source partitions arguments'
% _TYPE_DELTA)
else:
args.assert_type = _TYPE_FULL
else:
# Not applying payload.
if args.extract_bsdiff:
parser.error('--extract-bsdiff can only be used when applying payloads')
if args.bspatch_path:
parser.error('--bspatch-path can only be used when applying payloads')
if args.puffpatch_path:
parser.error('--puffpatch-path can only be used when applying payloads')
# By default, look for a metadata-signature file with a name based on the name
# of the payload we are checking. We only do it if check was triggered.
if args.check and not args.meta_sig:
default_meta_sig = args.payload + '.metadata-signature'
if os.path.isfile(default_meta_sig):
args.meta_sig = default_meta_sig
print('Using default metadata signature', args.meta_sig, file=sys.stderr)
return args
def main(argv):
# Parse and validate arguments.
args = ParseArguments(argv[1:])
with open(args.payload, 'rb') as payload_file:
payload = update_payload.Payload(payload_file)
try:
# Initialize payload.
payload.Init()
# Perform payload integrity checks.
if args.check:
report_file = None
do_close_report_file = False
metadata_sig_file = None
try:
if args.report:
if args.report == '-':
report_file = sys.stdout
else:
report_file = open(args.report, 'w')
do_close_report_file = True
part_sizes = (args.part_sizes and
dict(zip(args.part_names, args.part_sizes)))
metadata_sig_file = args.meta_sig and open(args.meta_sig, 'rb')
payload.Check(
pubkey_file_name=args.key,
metadata_sig_file=metadata_sig_file,
metadata_size=int(args.metadata_size),
report_out_file=report_file,
assert_type=args.assert_type,
block_size=int(args.block_size),
part_sizes=part_sizes,
allow_unhashed=args.allow_unhashed,
disabled_tests=args.disabled_tests)
finally:
if metadata_sig_file:
metadata_sig_file.close()
if do_close_report_file:
report_file.close()
# Apply payload.
if ApplyPayload(args):
dargs = {'bsdiff_in_place': not args.extract_bsdiff}
if args.bspatch_path:
dargs['bspatch_path'] = args.bspatch_path
if args.puffpatch_path:
dargs['puffpatch_path'] = args.puffpatch_path
if args.assert_type == _TYPE_DELTA:
dargs['old_parts'] = dict(zip(args.part_names, args.src_part_paths))
out_dst_parts = {}
file_handles = []
if args.out_dst_part_paths is not None:
for name, path in zip(args.part_names, args.out_dst_part_paths):
handle = open(path, 'wb+')
file_handles.append(handle)
out_dst_parts[name] = handle.name
else:
for name in args.part_names:
handle = tempfile.NamedTemporaryFile()
file_handles.append(handle)
out_dst_parts[name] = handle.name
payload.Apply(out_dst_parts, **dargs)
# If destination kernel and rootfs partitions are not given, then this
# just becomes an apply operation with no check.
if CheckApplyPayload(args):
# Prior to comparing, add the unused space past the filesystem
# boundary in the new target partitions to become the same size as
# the given partitions. This will truncate to larger size.
for part_name, out_dst_part, dst_part in zip(args.part_names,
file_handles,
args.dst_part_paths):
out_dst_part.truncate(os.path.getsize(dst_part))
# Compare resulting partitions with the ones from the target image.
if not filecmp.cmp(out_dst_part.name, dst_part):
raise error.PayloadError(
'Resulting %s partition corrupted.' % part_name)
# Close the output files. If args.out_dst_* was not given, then these
# files are created as temp files and will be deleted upon close().
for handle in file_handles:
handle.close()
except error.PayloadError as e:
sys.stderr.write('Error: %s\n' % e)
return 1
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))