-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #66 from GaLaXy102/master
Create an interface to this ~~script~~ library
- Loading branch information
Showing
3 changed files
with
197 additions
and
104 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .bluetooth_battery import BatteryStateQuerier, BatteryQueryError | ||
from bluetooth import BluetoothError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,119 +1,163 @@ | ||
#!/usr/bin/env python3 | ||
|
||
""" | ||
A python script to get battery level from Bluetooth headsets | ||
A python library to get battery level from Bluetooth headsets | ||
""" | ||
|
||
# License: GPL-3.0 | ||
# Author: @TheWeirdDev | ||
# Author: @TheWeirdDev, @GaLaXy102 | ||
# 29 Sept 2019 | ||
|
||
import sys | ||
import argparse | ||
import bluetooth | ||
from typing import Optional, Union, List, Dict | ||
|
||
|
||
class BatteryQueryError(bluetooth.BluetoothError): | ||
pass | ||
|
||
|
||
class SocketDataIterator: | ||
def __init__(self, sock: bluetooth.BluetoothSocket, chunk_size: int = 128): | ||
""" | ||
Create an Iterator over the given Socket | ||
chunk_size defines the amount of data in Bytes to be read per iteration | ||
""" | ||
self._sock = sock | ||
self._chunk_size = chunk_size | ||
|
||
def __next__(self): | ||
""" | ||
Receive chunks | ||
""" | ||
return self._sock.recv(self._chunk_size) | ||
|
||
|
||
class RFCOMMSocket(bluetooth.BluetoothSocket): | ||
|
||
def __init__(self, proto=bluetooth.RFCOMM, _sock=None): | ||
super().__init__(proto, _sock) | ||
|
||
def __iter__(self): | ||
""" | ||
Iterate over incoming chunks of 128 Bytes | ||
""" | ||
return SocketDataIterator(self) | ||
|
||
@staticmethod | ||
def find_rfcomm_port(device_mac) -> int: | ||
""" | ||
Find the RFCOMM port number for a given bluetooth device | ||
""" | ||
uuid = "0000111e-0000-1000-8000-00805f9b34fb" | ||
services: List[Dict] = bluetooth.find_service(address=device_mac, uuid=uuid) | ||
|
||
for service in services: | ||
if "protocol" in service.keys() and service["protocol"] == "RFCOMM": | ||
return service["port"] | ||
# Raise Interface error when the required service is not offered my the end device | ||
raise bluetooth.BluetoothError("Couldn't find the RFCOMM port number. Perhaps the device is offline?") | ||
|
||
def send(self, data): | ||
""" | ||
This function sends a message through a bluetooth socket with added line separators | ||
""" | ||
return super().send(b"\r\n" + data + b"\r\n") | ||
|
||
|
||
class BatteryStateQuerier: | ||
|
||
def __init__(self, bluetooth_mac: str, bluetooth_port: Optional[Union[str, int]] = None): | ||
""" | ||
Prepare a query for the end devices' battery state | ||
bluetooth_mac is the MAC of the end device, e.g. 11:22:33:44:55:66 | ||
bluetooth_port is the Port of the RFCOMM/SPP service of the end device. | ||
It will be determined automatically if not given. | ||
The actual query can be performed using the int() and str() method. | ||
""" | ||
self._bt_settings = bluetooth_mac, int(bluetooth_port or RFCOMMSocket.find_rfcomm_port(bluetooth_mac)) | ||
|
||
def __int__(self): | ||
""" | ||
Perform a reading and get the result as int between 0 and 100 | ||
""" | ||
return self._perform_query() | ||
|
||
def __str__(self): | ||
""" | ||
Perform a reading and get the result as str between 0% and 100% | ||
""" | ||
return "{:.0%}".format(self._perform_query() / 100) | ||
|
||
def _perform_query(self) -> int: | ||
""" | ||
Will try to get and print the battery level of supported devices | ||
""" | ||
result = None | ||
sock = RFCOMMSocket() | ||
sock.connect(self._bt_settings) | ||
# Iterate received packets until there is no more or a result was found | ||
for line in sock: | ||
if b"BRSF" in line: | ||
sock.send(b"+BRSF: 1024") | ||
sock.send(b"OK") | ||
elif b"CIND=" in line: | ||
sock.send(b"+CIND: (\"battchg\",(0-5))") | ||
sock.send(b"OK") | ||
elif b"CIND?" in line: | ||
sock.send(b"+CIND: 5") | ||
sock.send(b"OK") | ||
elif b"BIND=?" in line: | ||
# Announce that we support the battery level HF indicator | ||
# https://www.bluetooth.com/specifications/assigned-numbers/hands-free-profile/ | ||
sock.send(b"+BIND: (2)") | ||
sock.send(b"OK") | ||
elif b"BIND?" in line: | ||
# Enable battery level HF indicator | ||
sock.send(b"+BIND: 2,1") | ||
sock.send(b"OK") | ||
elif b"XAPL=" in line: | ||
sock.send(b"+XAPL=iPhone,7") | ||
sock.send(b"OK") | ||
elif b"IPHONEACCEV" in line: | ||
parts = line.strip().split(b',')[1:] | ||
if len(parts) > 1 and (len(parts) % 2) == 0: | ||
parts = iter(parts) | ||
params = dict(zip(parts, parts)) | ||
if b'1' in params: | ||
result = (int(params[b'1']) + 1) * 10 | ||
break | ||
elif b"BIEV=" in line: | ||
params = line.strip().split(b"=")[1].split(b",") | ||
if params[0] == b"2": | ||
result = int(params[1]) | ||
break | ||
elif b"XEVENT=BATTERY" in line: | ||
params = line.strip().split(b"=")[1].split(b",") | ||
result = int(params[1]) / int(params[2]) * 100 | ||
break | ||
else: | ||
sock.send(b"OK") | ||
sock.close() | ||
# Check whether the result was found, otherwise raise an Error | ||
if result is None: | ||
raise BatteryQueryError("Could not query the battery state.") | ||
return result | ||
|
||
|
||
def send(sock, message): | ||
""" | ||
This function sends a message through a bluetooth socket | ||
""" | ||
sock.send(b"\r\n" + message + b"\r\n") | ||
|
||
|
||
def get_at_command(sock, line, device): | ||
""" | ||
Will try to get and print the battery level of supported devices | ||
""" | ||
blevel = -1 | ||
|
||
if b"BRSF" in line: | ||
send(sock, b"+BRSF: 1024") | ||
send(sock, b"OK") | ||
elif b"CIND=" in line: | ||
send(sock, b"+CIND: (\"battchg\",(0-5))") | ||
send(sock, b"OK") | ||
elif b"CIND?" in line: | ||
send(sock, b"+CIND: 5") | ||
send(sock, b"OK") | ||
elif b"BIND=?" in line: | ||
# Announce that we support the battery level HF indicator | ||
# https://www.bluetooth.com/specifications/assigned-numbers/hands-free-profile/ | ||
send(sock, b"+BIND: (2)") | ||
send(sock, b"OK") | ||
elif b"BIND?" in line: | ||
# Enable battery level HF indicator | ||
send(sock, b"+BIND: 2,1") | ||
send(sock, b"OK") | ||
elif b"XAPL=" in line: | ||
send(sock, b"+XAPL=iPhone,7") | ||
send(sock, b"OK") | ||
elif b"IPHONEACCEV" in line: | ||
parts = line.strip().split(b',')[1:] | ||
if len(parts) > 1 and (len(parts) % 2) == 0: | ||
parts = iter(parts) | ||
params = dict(zip(parts, parts)) | ||
if b'1' in params: | ||
blevel = (int(params[b'1']) + 1) * 10 | ||
elif b"BIEV=" in line: | ||
params = line.strip().split(b"=")[1].split(b",") | ||
if params[0] == b"2": | ||
blevel = int(params[1]) | ||
elif b"XEVENT=BATTERY" in line: | ||
params = line.strip().split(b"=")[1].split(b",") | ||
blevel = int(params[1]) / int(params[2]) * 100 | ||
else: | ||
send(sock, b"OK") | ||
|
||
if blevel != -1: | ||
print(f"Battery level for {device} is {blevel}%") | ||
return False | ||
|
||
return True | ||
|
||
|
||
def find_rfcomm_port(device): | ||
""" | ||
Find the RFCOMM port number for a given bluetooth device | ||
""" | ||
uuid = "0000111e-0000-1000-8000-00805f9b34fb" | ||
proto = bluetooth.find_service(address=device, uuid=uuid) | ||
if len(proto) == 0: | ||
print("Couldn't find the RFCOMM port number") | ||
return 4 | ||
|
||
for pr in proto: | ||
if 'protocol' in pr and pr['protocol'] == 'RFCOMM': | ||
port = pr['port'] | ||
return port | ||
return 4 | ||
|
||
|
||
def main(): | ||
if __name__ == "__main__": | ||
""" | ||
The starting point of the program. For each device address in the argument | ||
list a bluetooth socket will be opened and the battery level will be read | ||
and printed to stdout | ||
""" | ||
if len(sys.argv) < 2: | ||
print("Usage: bluetooth_battery.py BT_MAC_ADDRESS_1.PORT ...") | ||
print(" Port number is optional") | ||
sys.exit() | ||
else: | ||
for device in sys.argv[1:]: | ||
i = device.find('.') | ||
if i == -1: | ||
port = find_rfcomm_port(device) | ||
else: | ||
port = int(device[i+1:]) | ||
device = device[:i] | ||
try: | ||
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | ||
sock.connect((device, port)) | ||
while get_at_command(sock, sock.recv(128), device): | ||
pass | ||
sock.close() | ||
except OSError as err: | ||
print(f"{device} is offline", err) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
parser = argparse.ArgumentParser(description="Get battery level from Bluetooth headsets") | ||
parser.add_argument("devices", metavar="DEVICE_MAC[.PORT]", type=str, nargs="+", | ||
help="(MAC address of target)[.SPP Port]") | ||
args = parser.parse_args() | ||
for device in args.devices: | ||
query = BatteryStateQuerier(*device.split(".")) | ||
print("Battery level for {} is {}".format(device, str(query))) |