-
Notifications
You must be signed in to change notification settings - Fork 5
/
eos-esp-generator
executable file
·991 lines (840 loc) · 34.1 KB
/
eos-esp-generator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
#!/usr/bin/env python3
# Copyright © 2023 Endless OS Foundation, LLC
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""\
Endless OS EFI System Partition (ESP) mount generator
This program is responsible for mounting the EFI System Partition (ESP) on EOS
systems. It's heavily inspired by systemd-gpt-auto-generator with a few policy
changes that are not easily expressed there:
* Both GPT and MBR partition tables are supported.
* The EFI LoaderDevicePartUUID variable is preferred but not required. This
allows for usage with bootloaders such as GRUB that do not implement the boot
loader interface.
* Mounting at /boot is allowed even when the /efi directory exists. This allows
use of a generic OS commit that can be used on systems where /boot data
exists in the ESP or not.
* When the ESP is mounted at /boot, it is made world readable to allow
unprivileged ostree admin operations to succeed.
* When the endless.image.device kernel command line argument is set, the
ESP from that disk is mounted.
The units created with this generator take precedence over
systemd-gpt-auto-generator. Output from the generator can be read with
"journalctl -t eos-esp-generator". When invoked as eos-esp-generator-gather,
the generator will only gather data and store it in a tarball in /run.
"""
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from dataclasses import dataclass
import json
import logging
from logging.handlers import SysLogHandler
import os
from pathlib import Path
import shlex
import subprocess
import sys
import tarfile
import time
from tempfile import TemporaryDirectory
from textwrap import dedent
progname = os.path.basename(__file__)
logger = logging.getLogger(progname)
ESP_GPT_PARTTYPE = 'c12a7328-f81f-11d2-ba4b-00a0c93ec93b'
ESP_MBR_PARTTYPE = '0xef'
XBOOTLDR_GPT_PARTTYPE = 'bc13c2ff-59e6-4262-a352-b275fd6f7172'
LOADER_EFI_VENDOR = '4a67b082-0a4c-41cf-b6c7-440b29bb8c4f'
LOADER_DEVICE_PART_UUID_NAME = 'LoaderDevicePartUUID'
LOADER_DEVICE_PART_UUID_EFIVAR = f'{LOADER_DEVICE_PART_UUID_NAME}-{LOADER_EFI_VENDOR}'
# Program name used to automatically enable gather mode.
GATHER_MODE_PROGNAME = 'eos-esp-generator-gather'
class EspError(Exception):
"""ESP generator errors"""
pass
class KmsgHandler(logging.StreamHandler):
"""Logging Handler using /dev/kmsg"""
PRIORITY_MAP = {
logging.DEBUG: SysLogHandler.LOG_DEBUG,
logging.INFO: SysLogHandler.LOG_INFO,
logging.WARNING: SysLogHandler.LOG_WARNING,
logging.ERROR: SysLogHandler.LOG_ERR,
logging.CRITICAL: SysLogHandler.LOG_CRIT,
}
def __init__(self, kmsg=None):
if kmsg is None:
kmsg = open('/dev/kmsg', 'wb', buffering=0)
self.kmsg = kmsg
self.pid = os.getpid()
super().__init__(stream=self.kmsg)
def close(self):
self.acquire()
try:
try:
self.flush()
finally:
self.kmsg.close()
finally:
self.release()
def emit(self, record):
try:
message = self.format(record).encode('utf-8', errors='replace')
# kmsg recognizes a syslog style <priority>identifier[pid]: prefix.
priority = self.PRIORITY_MAP.get(record.levelno, SysLogHandler.LOG_INFO)
prefix = (
f'<{priority:d}>{progname}[{self.pid:d}]: '
.encode('utf-8', errors='replace')
)
# kmsg allows a maximum of 1024 bytes per message, so split the
# message up if needed, without splitting multibyte-encoded
# characters.
start = 0
length = len(message)
size = 1024 - len(prefix)
while start < length:
end = start + size
while end < length and (message[end] & 0b1100_0000) == 0b1000_0000:
end -= 1
buf = prefix + message[start:end]
start = end
self.kmsg.write(buf)
self.flush()
except: # noqa: E722
self.handleError(record)
def in_generator_env():
"""Whether the process is executing in a systemd generator environment"""
return 'SYSTEMD_SCOPE' in os.environ
def run(cmd, *args, **kwargs):
"""Run a command with logging
By default, subprocess.run is called with check and text set to True and
encoding set to utf-8. In a systemd generator environment, stdout is set to
subprocess.DEVNULL and stderr is set to subprocess.PIPE so that all output
is captured by default.
"""
kwargs.setdefault('check', True)
kwargs.setdefault('text', True)
kwargs.setdefault('encoding', 'utf-8')
if in_generator_env():
kwargs.setdefault('stdout', subprocess.DEVNULL)
kwargs.setdefault('stderr', subprocess.PIPE)
logger.debug(f'> {shlex.join(cmd)}')
return subprocess.run(cmd, *args, **kwargs)
def _get_root_path():
"""Get the path to the root directory
For testing, the ESPGEN_ROOT_PATH variable is read. If not set or empty, /
is returned.
"""
return Path(os.getenv('ESPGEN_ROOT_PATH') or '/')
def get_mount_data():
"""Gather mounted filesystems
Returns a list of mounted filesystem dicts.
"""
cmd = (
'findmnt',
'--real',
'--json',
'--list',
'--canonicalize',
'--evaluate',
'--nofsroot',
'--output',
'TARGET,SOURCE,MAJ:MIN,FSTYPE,FSROOT,OPTIONS',
)
proc = run(cmd, stdout=subprocess.PIPE)
data = json.loads(proc.stdout).get('filesystems', {})
logger.debug('mounts:')
for entry in data:
logger.debug(f' {entry}')
return data
def get_fstab_data():
"""Gather filesystem mount configuration
Returns a list of fstab entry dicts.
"""
cmd = (
'findmnt',
'--fstab',
'--json',
'--list',
'--canonicalize',
'--evaluate',
'--nofsroot',
'--output',
'TARGET,SOURCE,MAJ:MIN,FSTYPE,FSROOT,OPTIONS',
)
proc = run(cmd, stdout=subprocess.PIPE)
data = json.loads(proc.stdout).get('filesystems', {})
logger.debug('fstab:')
for entry in data:
logger.debug(f' {entry}')
return data
def get_partition_data():
"""Gather disk partition data
Returns a list of partition dicts. Ideally this would use lsblk --json, but
that just collects udev attributes. Since this will be run as a generator,
udev won't be running yet. In that case, blkid is used to gather partition
data just like udev itself does.
"""
partitions = []
# Get the partition devices, stripping any empty lines in the output.
proc = run(
['blkid', '-d', '-c', '/dev/null', '-o', 'device'],
stdout=subprocess.PIPE,
)
partition_devs = [line for line in proc.stdout.splitlines() if line]
logger.debug(f'partition devices: {partition_devs}')
# Probe all the partition devices.
for dev in partition_devs:
try:
proc = run(
['blkid', '--probe', '-d', '-c', '/dev/null', '-o', 'export', dev],
stdout=subprocess.PIPE,
)
except subprocess.CalledProcessError as err:
# blkid will exit with code 2 if it can't gather information about
# the device. Just carry on in that case.
if err.returncode == 2:
logger.warning(f'Ignoring blkid --probe exit code 2 for {dev}')
continue
else:
raise
entry = {}
for line in proc.stdout.splitlines():
tag, sep, value = line.partition('=')
if not sep or not tag:
continue
entry[tag] = value
if entry:
partitions.append(entry)
else:
logger.warning(f'No blkid probe output for {dev}')
logger.debug('partitions:')
for entry in partitions:
logger.debug(f' {entry}')
return partitions
def read_efivar(name):
"""Read an EFI variable value
Reads the data from efivarfs mounted at /sys/firmware/efi/efivars.
Only the variable value is returned, not the attributes.
"""
root = _get_root_path()
varpath = root / 'sys/firmware/efi/efivars' / name
logger.debug(f'Reading EFI variable {varpath}')
try:
with open(varpath, 'rb') as f:
value = f.read()
except FileNotFoundError:
logger.debug(f'EFI variable {name} not set')
return None
# Skip the first 4 bytes, those are the 32 bit attribute mask.
if len(value) < 4:
logger.warning(f'Invalid EFI variable {name} is less than 4 bytes')
return None
return value[4:]
def read_efivar_utf16_string(name):
"""Read an EFI variable UTF-16 string
If the EFI variable doesn't exist, None is returned. Any nul
terminating bytes will be removed.
"""
value = read_efivar(name)
if value is None:
return None
logger.debug(f'EFI variable {name} contents: {value}')
# Systemd appends 3 nul bytes for some reason. If there are an odd
# number of bytes, ignore the last one so there are an appropriate
# number of utf16 bytes.
end = len(value)
if end % 2 == 1:
end -= 1
# Ignore any trailing nul byte pairs.
while end > 0:
if value[end - 2:end] != b'\0\0':
break
end -= 2
return value[:end].decode('utf-16', errors='replace')
def parse_kernel_command_line():
"""Read and parse the kernel command line from /proc/cmdline
Returns a dictionary of arguments names and values. A value of None is used
if the argument has no = sign.
"""
path = _get_root_path() / 'proc/cmdline'
cmdline = path.read_text('utf-8')
arguments = {}
for arg in shlex.split(cmdline):
name, eq, value = arg.partition('=')
if not eq:
# If there was no = in the argument, use None as the value to
# differentiate from an empty value.
value = None
arguments[name] = value
return arguments
def systemd_escape_path(path):
"""Escape a path for usage in systemd unit names"""
proc = run(
('systemd-escape', '--path', str(path)),
stdout=subprocess.PIPE,
)
return proc.stdout.strip()
@dataclass
class EspMount:
"""ESP mount specification
Describes the parameters for mounting the ESP. The write_units()
method can be used to create the systemd units from a generator.
"""
source: str
target: str
type: str = 'vfat'
umask: str = '0077'
def write_units(self, unit_dir):
source_escaped = systemd_escape_path(self.source)
target_escaped = systemd_escape_path(self.target)
automount_unit = unit_dir / f'{target_escaped}.automount'
mount_unit = unit_dir / f'{target_escaped}.mount'
local_fs_wants = unit_dir / 'local-fs.target.wants' / automount_unit.name
logger.debug(f'Writing unit {automount_unit}')
automount_unit.write_text(dedent(f"""\
# Automatically generated by {progname}
[Unit]
Description=EFI System Partition Automount
[Automount]
Where={self.target}
TimeoutIdleSec=2min
"""))
logger.debug(f'Writing unit {mount_unit}')
mount_unit.write_text(dedent(f"""\
# Automatically generated by {progname}
[Unit]
Description=EFI System Partition Automount
Requires=systemd-fsck@{source_escaped}.service
After=systemd-fsck@{source_escaped}.service
After=blockdev@{source_escaped}.target
[Mount]
What={self.source}
Where={self.target}
Type={self.type}
Options=umask={self.umask},noauto,rw
"""))
# Create a symlink for the automount unit in local-fs.target.wants.
logger.debug(f'Creating {local_fs_wants} symlink')
local_fs_wants.parent.mkdir(parents=True, exist_ok=True)
link = os.path.relpath(automount_unit, local_fs_wants.parent)
os.symlink(link, local_fs_wants)
class EspGenerator:
"""Generator for mounting the ESP
Locates the ESP device and determines the path to mount it at. Call
get_esp_mount() to retrieve an EspMount instance describing the
configuration.
"""
def __init__(self):
self.root = _get_root_path()
self.mounts = get_mount_data()
self.fstab = get_fstab_data()
self.partitions = get_partition_data()
self.kcmdline = parse_kernel_command_line()
self._root_part = self._get_root_partition()
self._endless_image_device, self._endless_image_part = (
self._get_endless_image_device()
)
self._loader_device_uuid, self._loader_device_part = self._get_loader_device()
def get_esp_mount(self):
"""Get the ESP mount specification
Returns an EspMount or None.
"""
# Only mount units when booted with EFI.
efi_path = self.root / 'sys/firmware/efi'
if not efi_path.is_dir():
logger.info('Skipping ESP mounting for non-EFI booted system')
return None
# Don't mount units in the initrd.
if os.getenv('SYSTEMD_IN_INITRD', '0') == '1':
logger.info('Skipping ESP mounting in initrd')
return None
esp_part = self._get_esp_part()
if not esp_part:
logger.info('No ESP partition found, skipping mounting')
return None
esp_dev = esp_part['DEVNAME']
logger.info(f'ESP device: {esp_dev}')
esp_path = self._get_esp_path(esp_part)
if not esp_path:
logger.info('No ESP mount path determined, skipping mounting')
return None
logger.info(f'ESP mount path: {esp_path}')
# If the ESP is mounted at /boot, it needs to be world readable
# since it's also accessed by ostree and some operations are
# expected to work unprivileged.
umask = '0022' if esp_path == '/boot' else '0077'
mount = EspMount(source=esp_dev, target=esp_path, umask=umask)
logger.info(f'Created ESP mount instance {mount}')
return mount
def print_data(self):
"""Print gathered device data"""
print('mounts:\n{}'.format(json.dumps(self.mounts, indent=2)))
print('fstab:\n{}'.format(json.dumps(self.fstab, indent=2)))
print('partitions:\n{}'.format(json.dumps(self.partitions, indent=2)))
print('kcmdline:\n{}'.format(json.dumps(self.kcmdline, indent=2)))
def save_data(self):
"""Save gathered data"""
with TemporaryDirectory(dir='/run', prefix='espgen-') as savedir:
logger.debug(f'Writing gathered data to {savedir}')
mounts_path = os.path.join(savedir, 'mounts.json')
with open(mounts_path, 'w') as f:
json.dump(self.mounts, f, indent=2)
f.write('\n')
fstab_path = os.path.join(savedir, 'fstab.json')
with open(fstab_path, 'w') as f:
json.dump(self.fstab, f, indent=2)
f.write('\n')
partitions_path = os.path.join(savedir, 'partitions.json')
with open(partitions_path, 'w') as f:
json.dump(self.partitions, f, indent=2)
f.write('\n')
kcmdline_path = os.path.join(savedir, 'kcmdline.json')
with open(kcmdline_path, 'w') as f:
json.dump(self.kcmdline, f, indent=2)
f.write('\n')
now = time.strftime('%Y%m%d%H%M%S')
data_path = f'/run/espgen-data-{now}.tar.gz'
logger.info(f'Saving gathered data to {data_path}')
with tarfile.open(data_path, 'x:gz') as tf:
tf.add(savedir, 'espgen-data')
def _get_esp_part(self):
"""Determine the ESP partition to use"""
# If LoaderDevicePartUUID is set, that's always used.
if self._loader_device_uuid:
# If the partition wasn't found, bail out.
if not self._loader_device_part:
return None
# Make sure it points to an ESP. I'm fairly certain this will only
# be set on GPT disks, but validate MBR, too.
loader_device_dev = self._loader_device_part['DEVNAME']
loader_device_scheme = self._loader_device_part['PART_ENTRY_SCHEME']
loader_device_parttype = self._loader_device_part['PART_ENTRY_TYPE']
if loader_device_scheme == 'gpt':
if loader_device_parttype != ESP_GPT_PARTTYPE:
logger.info(
f'Ignoring LoaderDevicePartUUID device {loader_device_dev} '
'since it is not an EFI system partition'
)
return None
elif loader_device_scheme == 'dos':
if loader_device_parttype != ESP_MBR_PARTTYPE:
logger.info(
f'Ignoring LoaderDevicePartUUID device {loader_device_dev} '
'since it is not an EFI system partition'
)
return None
else:
logger.warning(
f'Unexpected partition type "{loader_device_scheme}" for '
f'LoaderDevicePartUUID device {loader_device_dev} disk'
)
return None
# If it's on the root disk, we're done.
root_dev = self._root_part['DEVNAME']
loader_on_root = self._partitions_on_same_disk(
self._loader_device_part,
self._root_part,
)
if loader_on_root:
logger.debug(
f'Using LoaderDevicePartUUID device {loader_device_dev} for ESP '
f'since it is on the same disk as root device {root_dev}'
)
return self._loader_device_part
# If it's on a different disk, it might not be our ESP. Only
# use it if it's on the endless.image.device disk.
if not self._endless_image_part:
return None
endless_image_dev = self._endless_image_part["DEVNAME"]
loader_on_endless_image = self._partitions_on_same_disk(
self._loader_device_part,
self._endless_image_part,
)
if not loader_on_endless_image:
logger.info(
f'Ignoring LoaderDevicePartUUID device {loader_device_dev} since '
f'it is not on the same disk as root device {root_dev} or '
f'endless.image.device {endless_image_dev}'
)
return None
logger.debug(
f'Using LoaderDevicePartUUID device {loader_device_dev} for ESP since '
f'it is on the same disk as endless.image.device {endless_image_dev}'
)
return self._loader_device_part
# At this point we should be done handling LoaderDevicePartUUID.
assert not self._loader_device_uuid, (
'LoaderDevicePartUUID handling did not complete'
)
# Look for an appropriate ESP partition. If endless.image.device
# is set, use that disk. Otherwise, use the root partition disk.
if self._endless_image_device:
esp_disk_part = self._endless_image_part
else:
esp_disk_part = self._root_part
if not esp_disk_part:
# If the partition wasn't found, bail out.
return None
# Look for the first ESP partition on the disk depending on partition
# type.
esp_part = None
esp_disk_part_dev = esp_disk_part['DEVNAME']
esp_disk_scheme = esp_disk_part['PART_ENTRY_SCHEME']
if esp_disk_scheme == 'gpt':
esp_part = self._get_partition(
PART_ENTRY_SCHEME=esp_disk_scheme,
PART_ENTRY_DISK=esp_disk_part['PART_ENTRY_DISK'],
PART_ENTRY_TYPE=ESP_GPT_PARTTYPE,
)
if not esp_part:
logger.info(
f'No GPT partition with ESP type {ESP_GPT_PARTTYPE} found '
f'on {esp_disk_part_dev} disk'
)
return None
elif esp_disk_scheme == 'dos':
esp_part = self._get_partition(
PART_ENTRY_SCHEME=esp_disk_scheme,
PART_ENTRY_DISK=esp_disk_part['PART_ENTRY_DISK'],
PART_ENTRY_TYPE=ESP_MBR_PARTTYPE,
)
if not esp_part:
logger.info(
f'No MBR partition with ESP type {ESP_MBR_PARTTYPE} found '
f'on {esp_disk_part_dev} disk'
)
return None
else:
logger.warning(
f'Unexpected partition type "{esp_disk_scheme}" for '
f'{esp_disk_part_dev} disk'
)
return None
assert esp_part, 'Should have found esp_part'
esp_part_dev = esp_part['DEVNAME']
# Don't handle the ESP mount if the disk has an XBOOTLDR partition. We
# don't currently create an XBOOTLDR partition and supporting them
# would provide very little benefit. systemd-gpt-auto-generator should
# mostly DTRT, anyways.
if self._disk_has_xbootldr(esp_part):
logger.info(
f'Ignoring ESP device {esp_part_dev} since the disk has an '
'XBOOTLDR partition'
)
return None
esp_disk_use = (
'endless.image.device' if self._endless_image_device else 'root'
)
logger.debug(
f'Using device {esp_part_dev} for ESP since it is on the same disk as '
f'{esp_disk_use} device {esp_disk_part_dev}'
)
return esp_part
def _get_esp_path(self, esp_part):
"""Determine the path to mount the ESP partition"""
esp_dev = esp_part['DEVNAME']
boot_mount = self._get_mount(target='/boot')
boot_fstab = self._get_fstab(target='/boot')
efi_mount = self._get_mount(target='/efi')
efi_fstab = self._get_fstab(target='/efi')
def _use_if_dir_exists(path):
rooted_path = self.root / path.lstrip('/')
if not rooted_path.exists():
logger.info(f'Would use {path} for mount path, but it does not exist')
return None
return path
# /boot is preferred unless some other boot directory/partition will be
# mounted there.
path = '/boot'
if boot_fstab:
# If the source is the ESP device, do nothing as the fstab
# generator will create the mount unit.
if boot_fstab['source'] == esp_dev:
logger.info(f'Skipping /boot since it is in fstab using {esp_dev}')
return None
# Something else is setup to be mounted at /boot. Prefer /efi.
logger.debug(
f'/boot is in fstab using {boot_fstab["source"]}, '
f'prefer /efi as mount path'
)
path = '/efi'
if path == '/boot':
if boot_mount:
# /boot is already mounted but not via fstab configuration.
if boot_mount['source'] == esp_dev:
# If it's mounted using the ESP device, then that probably
# happened from a mount unit created by a previous run of
# this generator. Keep using /boot.
logger.debug(
f'/boot is mounted using ESP device {esp_dev}, '
'use it as mount path'
)
else:
# Some other device is mounted at /boot. Likely this is the
# bind mount created by ostree-prepare-root in the
# initramfs. Regardless, we don't want to mount the ESP
# over it.
logger.debug(
f'/boot is mounted using {boot_mount["source"]}, '
f'prefer /efi as mount path'
)
path = '/efi'
if path == '/boot':
return _use_if_dir_exists(path)
# At this point we should only be dealing with /efi.
assert path == '/efi', '/boot handling did not complete'
# Validate potential /efi mount point.
if efi_fstab:
logger.info(
f'Skipping /efi since it is in fstab using {efi_fstab["source"]}'
)
return None
if efi_mount:
# /efi is already mounted but not via fstab configuration.
if efi_mount['source'] == esp_dev:
# If it's mounted using the ESP device, then that probably
# happened from a mount unit created by a previous run of this
# generator. Keep using /efi.
logger.debug(
f'/efi is mounted using ESP device {esp_dev}, '
'use it as mount path'
)
else:
# Some other device is mounted at /efi. Don't want to mount the
# ESP over it.
logger.info(
f'Skipping /efi since it is mounted using {efi_mount["source"]}'
)
return None
return _use_if_dir_exists(path)
def _get_root_partition(self):
root_mount = self._get_mount(target='/')
if not root_mount:
raise EspError(f'Could not find / mount in {self.mounts}')
root_dev = root_mount['source']
root_part = self._get_partition(DEVNAME=root_dev)
if not root_part:
raise EspError(f'Could not find / device {root_dev} in {self.partitions}')
logger.debug(f'Determined root partition {root_part}')
return root_part
@staticmethod
def _partitions_on_same_disk(a, b):
return a['PART_ENTRY_DISK'] == b['PART_ENTRY_DISK']
def _get_endless_image_device(self):
"""Find the partition for the endless.image.device command line option
Looks for the endless.image.device kernel command line option and tries
to find the corresponding partition. Returns a tuple of argument and
partition.
"""
image_device_arg = self.kcmdline.get('endless.image.device')
if not image_device_arg:
logger.debug('endless.image.device kernel command line arg not set')
return None, None
logger.debug(
f'Found endless.image.device kernel command line arg is {image_device_arg}'
)
tag, eq, value = image_device_arg.partition('=')
if not eq:
# No = sign, treat the argument as a device path.
part = self._get_partition(DEVNAME=image_device_arg)
else:
# Presumably a tag like UUID=<uuid>. Make sure it's something we
# can handle.
if tag.upper() not in ('UUID', 'LABEL', 'PARTUUID', 'PARTLABEL'):
logger.warning(
f'Unrecognized tag "{tag}" in endless.image.device argument '
f'{image_device_arg}'
)
return image_device_arg, None
tag = tag.upper()
if tag == 'PARTUUID':
tag = 'PART_ENTRY_UUID'
elif tag == 'PARTLABEL':
tag = 'PART_ENTRY_NAME'
part_kwargs = {tag: value}
part = self._get_partition(**part_kwargs)
if not part:
logger.warning(
f'Could not locate endless.image.device "{image_device_arg}"'
)
return image_device_arg, part
def _get_loader_device(self):
"""Find the partition for the LoaderDevicePartUUID EFI variable
Looks for the LoaderDevicePartUUID EFI variable to find the
corresponding partition. Returns a tuple of value and partition.
"""
loader_part_uuid = read_efivar_utf16_string(LOADER_DEVICE_PART_UUID_EFIVAR)
if not loader_part_uuid:
return None, None
logger.debug(
f'Found EFI var {LOADER_DEVICE_PART_UUID_NAME} is {loader_part_uuid}'
)
uuid = loader_part_uuid.lower()
part = self._get_partition(PART_ENTRY_UUID=uuid)
if not part:
logger.warning(
f'Could not locate {LOADER_DEVICE_PART_UUID_NAME} "{loader_part_uuid}"'
)
return loader_part_uuid, part
def _disk_has_xbootldr(self, partition):
"""Whether the partition's disk has an XBOOTLDR partition"""
xbootldr_part = self._get_partition(
PART_ENTRY_SCHEME='gpt',
PART_ENTRY_DISK=partition['PART_ENTRY_DISK'],
PART_ENTRY_TYPE=XBOOTLDR_GPT_PARTTYPE,
)
return bool(xbootldr_part)
@staticmethod
def _filter_data(data, **kwargs):
"""Filter iterable of dict data
Returns an iterator of dicts where the dict entries match the keyword
arguments.
"""
if not kwargs:
raise ValueError('Must provide filter values')
def _test(entry):
for name, value in kwargs.items():
if name not in entry or entry[name] != value:
return False
return True
return filter(_test, data)
def _get_mount(self, **kwargs):
return next(self._filter_data(self.mounts, **kwargs), None)
def _get_fstab(self, **kwargs):
return next(self._filter_data(self.fstab, **kwargs), None)
def _get_partition(self, **kwargs):
return next(self._filter_data(self.partitions, **kwargs), None)
def main():
"""ESP generator main entry point"""
doclines = __doc__.splitlines()
ap = ArgumentParser(
description=doclines[0],
epilog='\n'.join(doclines[2:]),
formatter_class=RawDescriptionHelpFormatter,
)
ap.add_argument(
'normal_dir',
metavar='NORMAL',
nargs='?',
help='Normal generator output directory',
)
ap.add_argument(
'early_dir',
metavar='EARLY',
nargs='?',
help='Early generator output directory',
)
ap.add_argument(
'late_dir',
metavar='LATE',
nargs='?',
help='Late generator output directory',
)
ap.add_argument(
'-n',
'--dry-run',
action='store_true',
help='only show what would be done',
)
ap.add_argument(
'-g',
'--gather-data',
action='store_true',
help='only gather device data',
)
ap.add_argument(
'-s',
'--save-data',
action='store_true',
help='save gathered data',
)
log_level_group = ap.add_mutually_exclusive_group()
log_level_group.add_argument(
'--quiet',
dest='log_level',
action='store_const',
const=logging.WARNING,
help='only log warning messages',
)
log_level_group.add_argument(
'--debug',
dest='log_level',
action='store_const',
const=logging.DEBUG,
help='log debug messages',
)
# Detect when being run as a generator.
run_as_generator = in_generator_env()
# Default to debug logging when run as a generator and debug is in the
# kernel command line.
if run_as_generator and 'debug' in parse_kernel_command_line():
default_log_level = logging.DEBUG
else:
default_log_level = logging.INFO
ap.set_defaults(log_level=default_log_level)
# Default to gather mode when invoked using the special gather name.
if progname == GATHER_MODE_PROGNAME:
ap.set_defaults(gather_data=True, save_data=True)
# Generators are run with stdout/stderr connected to /dev/null, so
# reconnect them to /dev/kmsg and use the KmsgHandler logging handler.
if run_as_generator:
with open('/dev/kmsg', 'wb', buffering=0) as kmsg:
kmsg_fd = kmsg.fileno()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stderr.fileno()
os.dup2(kmsg_fd, stdout_fd)
os.dup2(kmsg_fd, stderr_fd)
log_handler = KmsgHandler()
# Use a format string with just the message since the name, level and
# time are already handled.
log_format = '%(message)s'
else:
# Use a normal stderr handler.
log_handler = logging.StreamHandler(sys.stderr)
log_format = '%(name)s:%(levelname)s:%(message)s'
args = ap.parse_args()
logging.basicConfig(level=args.log_level, handlers=[log_handler], format=log_format)
# When not in gather mode, at least the normal unit directory is needed.
if not args.gather_data and not args.normal_dir:
logger.error('Normal generator directory not specified')
sys.exit(1)
generator = EspGenerator()
if args.gather_data:
# In gather mode, just collect the data and exit.
if args.save_data:
generator.save_data()
else:
generator.print_data()
return
esp_mount = generator.get_esp_mount()
if esp_mount and not args.dry_run:
# The normal generator directory is used so that we take precedence
# over systemd-gpt-auto-generator. However, this is the same level as
# systemd-fstab-generator, so we need to take care to not override
# fstab entries.
unit_dir = Path(args.normal_dir)
esp_mount.write_units(unit_dir)
if __name__ == '__main__':
# Print exceptions through logging so it gets associated to this program in
# kmsg.
try:
main()
except SystemExit:
pass
except subprocess.CalledProcessError as err:
logger.exception(f'Executing {err.cmd[0]} failed:\n{err.stderr}')
sys.exit(err.returncode)
except: # noqa: E722
logger.exception('Generator failed:')
sys.exit(1)