"""Set and get the time on various Bluetooth clocks.
This project offers a way to easily recognize Bluetooth Low Energy clocks from their
advertisements and has a device-independent API to set and get the time on them.
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from importlib import import_module
from importlib.metadata import PackageNotFoundError, version # pragma: no cover
from inspect import isclass
from pathlib import Path
from pkgutil import iter_modules
from time import time
from typing import TYPE_CHECKING, ClassVar
if TYPE_CHECKING:
from uuid import UUID
from bleak import BleakClient
if TYPE_CHECKING:
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bluetooth_clocks.exceptions import TimeNotReadableError, UnsupportedDeviceError
try:
# Change here if project is renamed and does not equal the package name
dist_name = "bluetooth-clocks"
__version__ = version(dist_name)
except PackageNotFoundError: # pragma: no cover
__version__ = "unknown"
finally:
del version, PackageNotFoundError
_logger = logging.getLogger(__name__)
SECONDS_IN_HOUR = 3_600
"""The number of seconds in an hour.
You can use this constant in subclasses of :class:`BluetoothClock`.
"""
MICROSECONDS = 1_000_000
"""The number of microseconds in a second.
You can use this constant in subclasses of :class:`BluetoothClock`.
"""
_supported_devices: list[type[BluetoothClock]] = []
"""Registry of all :class:`BluetoothClock` subclasses for supported devices."""
[docs]
def supported_devices() -> list[str]:
"""Get a list of names of supported devices.
Returns:
list[str]: A list of the names of devices supported by this library.
Example:
>>> from bluetooth_clocks import supported_devices
>>> "ThermoPro TP393" in supported_devices()
True
"""
return [device.DEVICE_TYPE for device in _supported_devices]
[docs]
class BluetoothClock(ABC):
"""Abstract class that represents the definition of a Bluetooth clock.
Support for every type of Bluetooth clock is implemented as a separate
subclass by giving the class variables a value and/or by overriding
methods or implementing abstract methods of this class.
Attributes:
address (str): The Bluetooth address of the device.
name (str | None): The name of the device, or ``None`` if it doesn't
have a name.
"""
DEVICE_TYPE: ClassVar[str]
"""The name of the device type."""
SERVICE_UUID: ClassVar[UUID]
"""The UUID of the service used to read/write the time."""
CHAR_UUID: ClassVar[UUID]
"""The UUID of the characteristic used to read/write the time."""
TIME_GET_FORMAT: ClassVar[str | None]
"""The format string to convert bytes read from the device to a time.
This is ``None`` if the device doesn't support reading the time.
"""
TIME_SET_FORMAT: ClassVar[str]
"""The format string to convert a time to bytes written to the device."""
WRITE_WITH_RESPONSE: ClassVar[bool]
"""``True`` if the bytes to set the time should use write with response."""
LOCAL_NAME: ClassVar[str | None]
"""The local name used to recognize this type of device.
This is ``None`` if the local name isn't used to recognize the device."""
LOCAL_NAME_STARTS_WITH: ClassVar[bool | None]
"""Whether the local name should start with `LOCAL_NAME`.
``True`` if the start of `LOCAL_NAME` is used to recognize this type of device.
``False`` if the local name should exactly match `LOCAL_NAME`.
This is ``None`` if the local name isn't used to recognize the device.
"""
def __init__(self, device: BLEDevice) -> None:
"""Create a BluetoothClock object.
Args:
device (BLEDevice): The Bluetooth device.
"""
self.address = device.address
self.name = device.name
[docs]
@classmethod
def create_from_advertisement(
cls,
device: BLEDevice,
advertisement_data: AdvertisementData,
) -> BluetoothClock:
"""Create object of a :class:`BluetoothClock` subclass from advertisement data.
This is a factory method that you use if you don't know the exact device type
beforehand. This method automatically recognizes the device type and creates
an object of the corresponding subclass.
Args:
device (~bleak.backends.device.BLEDevice): The Bluetooth device.
advertisement_data (~bleak.backends.scanner.AdvertisementData): The
advertisement data.
Raises:
UnsupportedDeviceError: If the device with address `address` isn't
supported.
Returns:
:class:`BluetoothClock`: An object of the subclass corresponding to
the recognized device type.
"""
for device_class in _supported_devices:
if device_class.recognize(device, advertisement_data):
return device_class(device)
raise UnsupportedDeviceError(device, advertisement_data)
[docs]
@classmethod
def is_readable(cls) -> bool:
"""Test whether you can read the time from this device.
Returns:
bool: ``True`` if this device supports reading the time, ``False``
otherwise.
Example:
>>> from bluetooth_clocks.devices.xiaomi import LYWSD02
>>> from bluetooth_clocks.devices.qingping import CGC1
>>> LYWSD02.is_readable()
True
>>> CGC1.is_readable()
False
"""
return bool(cls.TIME_GET_FORMAT)
[docs]
@classmethod
def recognize(
cls,
device: BLEDevice,
advertisement_data: AdvertisementData,
) -> bool:
"""Recognize this device type from advertisement data.
By default this checks whether the advertisement data has a local name
that is equal to or starts with `LOCAL_NAME`, by calling
:meth:`recognize_from_local_name`.
Override this method in a subclass if the device type should be recognized
in another way from advertisement data.
Args:
device (~bleak.backends.device.BLEDevice): The Bluetooth device.
advertisement_data (~bleak.backends.scanner.AdvertisementData): The
advertisement data.
Returns:
bool: ``True`` if this subclass of :class:`BluetoothClock` recognizes the
device, ``False`` otherwise.
"""
return cls.recognize_from_local_name(advertisement_data.local_name)
[docs]
@classmethod
def recognize_from_service_uuids(cls, service_uuids: list[str] | None) -> bool:
"""Recognize this device type from service UUIDs.
This is a helper method that subclasses can use to implement their
:meth:`recognize` method.
Args:
service_uuids (list[str] | None = None): Service UUIDs of the device, or
``None`` if the device doesn't advertise service UUIDs.
Returns:
bool: ``True`` if this subclass of :class:`BluetoothClock` recognizes the
device from the service UUIDs in `service_uuids`, ``False`` otherwise.
"""
if service_uuids is None:
# The device doesn't advertise service UUIDs
return False
return str(cls.SERVICE_UUID) in service_uuids
[docs]
@classmethod
def recognize_from_local_name(
cls,
local_name: str | None,
) -> bool:
"""Recognize the device from an advertised local name.
This is a helper method that subclasses can use to implement their
:meth:`recognize` method.
Args:
local_name (str | None = None): The local name of the device, or ``None`` if
it doesn't advertise its local name.
Returns:
bool: ``True`` if this subclass of :class:`BluetoothClock` recognizes the
device from its local name `local_name`, ``False`` otherwise.
"""
if local_name is None or cls.LOCAL_NAME is None:
# The device doesn't advertise a local name
# or the device type can't be recognized by its local name.
return False
if cls.LOCAL_NAME_STARTS_WITH:
return local_name.startswith(cls.LOCAL_NAME)
return local_name == cls.LOCAL_NAME
[docs]
def get_time_from_bytes(self, time_bytes: bytes) -> float:
"""Convert bytes read from a device to a timestamp.
Override this method in a subclass for a device that supports getting the time.
Args:
time_bytes (bytes): The raw bytes read from the device.
Raises:
InvalidTimeBytesError: If `time_bytes` don't have the right format.
TimeNotReadableError: If the device doesn't support getting the time.
Returns:
float: The time encoded as a Unix timestamp.
Example:
>>> from bluetooth_clocks.devices.xiaomi import LYWSD02
>>> from bleak.backends.device import BLEDevice
>>> from datetime import datetime
>>> clock = LYWSD02(BLEDevice("E7:2E:00:B1:38:96", "", {}, -67))
>>> timestamp = clock.get_time_from_bytes(
... bytes([0xdd, 0xbc, 0xb9, 0x63, 0x00]))
>>> print(datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S"))
2023-01-07 18:41:33
""" # noqa: E501
raise TimeNotReadableError
[docs]
@abstractmethod
def get_bytes_from_time(self, timestamp: float, ampm: bool = False) -> bytes:
"""Generate the bytes to set the time on this device.
Override this method in a subclass to implement the device's time format.
Args:
timestamp (float): The time encoded as a Unix timestamp.
ampm (bool): ``True`` if the device should show the time with AM/PM,
``False`` if it should use 24-hour format. Devices that don't
support choosing the mode can ignore this argument.
Returns:
bytes: The bytes needed to set the time of the device to `timestamp`.
Example:
>>> from bluetooth_clocks.devices.thermopro import TP393
>>> from bleak.backends.device import BLEDevice
>>> from datetime import datetime
>>> clock = TP393(BLEDevice("10:76:36:14:2A:3D", "TP393 (2A3D)", {}, -67))
>>> timestamp = datetime.fromisoformat("2023-01-07 17:32:50").timestamp()
>>> clock.get_bytes_from_time(timestamp, ampm=True).hex()
'a517010711203206005a'
"""
[docs]
async def get_time(self) -> float:
"""Get the time of the Bluetooth clock.
Raises:
TimeNotReadableError: If the device doesn't support getting the time.
Returns:
float: The time of the Bluetooth clock.
"""
# Don't try to connect if the device doesn't support getting the time.
if not self.is_readable():
raise TimeNotReadableError
_logger.info("Connecting to device...")
async with BleakClient(self.address) as client:
service = client.services.get_service(self.SERVICE_UUID)
characteristic = service.get_characteristic(self.CHAR_UUID)
time_bytes = await client.read_gatt_char(characteristic)
return self.get_time_from_bytes(time_bytes)
[docs]
async def set_time(
self,
timestamp: float | None = None,
ampm: bool = False,
) -> None:
"""Set the time of the Bluetooth clock.
Args:
timestamp (float | None = None): The timestamp to write to the clock. If
this is ``None``, the current time is used.
ampm (bool): ``True`` if the device should show the time with AM/PM,
``False`` if it should use 24-hour format. Devices that don't
support choosing the mode can ignore this argument.
"""
async with BleakClient(self.address) as client:
service = client.services.get_service(self.SERVICE_UUID)
characteristic = service.get_characteristic(self.CHAR_UUID)
if timestamp is None:
# Use the current time if the time is not specified.
timestamp = time()
await client.write_gatt_char(
characteristic,
self.get_bytes_from_time(timestamp, ampm),
response=self.WRITE_WITH_RESPONSE,
)
# Iterate through the modules in the module `device`.
package_dir = Path(__file__).resolve().parent / "devices"
for _, module_name, _ in iter_modules([str(package_dir)]): # type: ignore[assignment]
# Import the module and iterate through its attributes
module = import_module(f"{__name__}.devices.{module_name}")
for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if isclass(attribute) and hasattr(attribute, "DEVICE_TYPE"):
_supported_devices.append(attribute)