Source code for bluetooth_clocks.devices.pvvx

"""Bluetooth clock support for devices running the PVVX firmware."""
from __future__ import annotations

import asyncio
import logging
import struct
from time import localtime
from typing import TYPE_CHECKING
from uuid import UUID

if TYPE_CHECKING:
    from bleak.backends.device import BLEDevice
    from bleak.backends.scanner import AdvertisementData

from bleak import BleakClient, BleakGATTCharacteristic

from bluetooth_clocks import BluetoothClock
from bluetooth_clocks.exceptions import InvalidTimeBytesError

_logger = logging.getLogger(__name__)


[docs] class PVVX(BluetoothClock): """Bluetooth clock support for devices running the PVVX firmware.""" DEVICE_TYPE = "PVVX" SERVICE_UUID = UUID("00001f10-0000-1000-8000-00805f9b34fb") CHAR_UUID = UUID("00001f1f-0000-1000-8000-00805f9b34fb") TIME_GET_FORMAT = "<BLL" """The format string to convert bytes read from the device to a time.""" TIME_SET_FORMAT = "<BL" """The format string to convert a time to bytes written to the PVVX device.""" WRITE_WITH_RESPONSE = False """Writing the time to the PVVX device needs write without response.""" SERVICE_DATA_UUID = UUID("0000181a-0000-1000-8000-00805f9b34fb") """UUID of the service data the PVVX device is advertising.""" PVVX_GET_SET_TIME_COMMAND = 0x23 """Command for PVVX firmware to Get/Set Time.""" def __init__(self, device: BLEDevice) -> None: """Create a PVVX object. Args: device (BLEDevice): The Bluetooth device. """ super().__init__(device) self.notified_time = 0.0
[docs] @classmethod def recognize( cls, device: BLEDevice, advertisement_data: AdvertisementData, ) -> bool: # ARG003 """Recognize the PVVX device from advertisement data. This checks whether the advertisement has service data with service UUID 0x181a (PVVX custom format). Args: device (~bleak.backends.device.BLEDevice): The Bluetooth device. advertisement_data (AdvertisementData): The advertisement data. Returns: bool: ``True`` if the device is recognized as a PVVX device, ``False`` otherwise. """ return str(cls.SERVICE_DATA_UUID) in advertisement_data.service_data
[docs] async def get_time(self) -> float: """Get the time of the PVVX device. Returns: float: The time of the Bluetooth clock. """ _logger.info("Connecting to device...") async with BleakClient(self.address) as client: service = client.services.get_service(self.SERVICE_UUID) characteristic = service.get_characteristic(self.CHAR_UUID) # Define callback function for notifications and subscribe def time_callback(sender: BleakGATTCharacteristic, data: bytearray) -> None: """Convert bytes read from a notification to a timestamp.""" self.notified_time = self.get_time_from_bytes(data) await client.start_notify(characteristic, time_callback) # Write Get/Set Time command await client.write_gatt_char( characteristic, [self.PVVX_GET_SET_TIME_COMMAND], response=self.WRITE_WITH_RESPONSE, ) # Wait for a few seconds so a notification is received. await asyncio.sleep(5) return self.notified_time
[docs] def get_time_from_bytes(self, time_bytes: bytes) -> float: """Convert bytes read from the PVVX device to a timestamp. Args: time_bytes (bytes): The raw bytes read from the device. Raises: InvalidTimeBytesError: If `time_bytes` don't have the right format. Returns: float: The time encoded as a Unix timestamp. """ try: _, time_time, _ = struct.unpack(self.TIME_GET_FORMAT, time_bytes) except struct.error as exception: raise InvalidTimeBytesError(time_bytes) from exception return float(time_time)
[docs] def get_bytes_from_time( self, timestamp: float, ampm: bool = False, ) -> bytes: """Generate the bytes to set the time on the PVVX device. Args: timestamp (float): The time encoded as a Unix timestamp. ampm (bool): ``True`` if the device should show the time with AM/PM, ``False`` if it should use 24-hour format. The PVVX device ignores this argument, as it doesn't support this option. Returns: bytes: The bytes needed to set the time of the device to `timestamp`. """ # Convert timestamp to little-endian unsigned long integer # And add header byte for Get/Set Time command. return struct.pack( self.TIME_SET_FORMAT, self.PVVX_GET_SET_TIME_COMMAND, int(timestamp + localtime(timestamp).tm_gmtoff), )