diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-06-09 07:07:42 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2024-06-09 07:07:42 +0000 |
commit | 9c07ea5f2140fd83922710dd4f63f4af80993c82 (patch) | |
tree | 8f9a2633e467978877b0ac2f935da1c14c62d47c | |
parent | d62bd98cee9fc1f8efaab191bd28234f50159546 (diff) | |
parent | 5d34f7ab34134d22f37d819d54f9f57f34b4ce62 (diff) | |
download | bumble-busytown-mac-infra-release.tar.gz |
Snap for 11933214 from 5d34f7ab34134d22f37d819d54f9f57f34b4ce62 to busytown-mac-infra-releasebusytown-mac-infra-release
Change-Id: I26cb63474bf93ef9785c303a2799f509e99de678
96 files changed, 4781 insertions, 1018 deletions
diff --git a/.github/workflows/code-check.yml b/.github/workflows/code-check.yml index d65c650..37fb816 100644 --- a/.github/workflows/code-check.yml +++ b/.github/workflows/code-check.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] fail-fast: false steps: @@ -33,7 +33,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install ".[build,test,development]" + python -m pip install ".[build,test,development,pandora]" - name: Check run: | invoke project.pre-commit diff --git a/.github/workflows/python-avatar.yml b/.github/workflows/python-avatar.yml index eb1b270..a7403a3 100644 --- a/.github/workflows/python-avatar.yml +++ b/.github/workflows/python-avatar.yml @@ -32,7 +32,7 @@ jobs: - name: Install run: | python -m pip install --upgrade pip - python -m pip install .[avatar] + python -m pip install .[avatar,pandora] - name: Rootcanal run: nohup python -m rootcanal > rootcanal.log & - name: Test diff --git a/.github/workflows/python-build-test.yml b/.github/workflows/python-build-test.yml index f1a8105..230779f 100644 --- a/.github/workflows/python-build-test.yml +++ b/.github/workflows/python-build-test.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: os: ['ubuntu-latest', 'macos-latest', 'windows-latest'] - python-version: ["3.8", "3.9", "3.10", "3.11"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] fail-fast: false steps: @@ -46,7 +46,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.8", "3.9", "3.10", "3.11" ] + python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] rust-version: [ "1.76.0", "stable" ] fail-fast: false steps: @@ -6,6 +6,8 @@ dist/ docs/mkdocs/site test-results.xml __pycache__ +# Vim +.*.sw* # generated by setuptools_scm bumble/_version.py .vscode/launch.json diff --git a/.vscode/settings.json b/.vscode/settings.json index b535ada..777c47b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,7 @@ { "cSpell.words": [ "Abortable", + "aiohttp", "altsetting", "ansiblue", "ansicyan", @@ -9,6 +10,7 @@ "ansired", "ansiyellow", "appendleft", + "ascs", "ASHA", "asyncio", "ATRAC", @@ -43,6 +45,7 @@ "keyup", "levelname", "libc", + "liblc", "libusb", "MITM", "MSBC", @@ -78,6 +81,7 @@ "unmuted", "usbmodem", "vhci", + "wasmtime", "websockets", "xcursor", "ycursor" diff --git a/apps/bench.py b/apps/bench.py index 83625f0..f0e8b58 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -509,9 +509,11 @@ class Ping: packet = struct.pack( '>bbI', PacketType.SEQUENCE, - PACKET_FLAG_LAST - if self.current_packet_index == self.tx_packet_count - 1 - else 0, + ( + PACKET_FLAG_LAST + if self.current_packet_index == self.tx_packet_count - 1 + else 0 + ), self.current_packet_index, ) + bytes(self.tx_packet_size - 6) logging.info(color(f'Sending packet {self.current_packet_index}', 'yellow')) @@ -897,14 +899,26 @@ class L2capServer(StreamedPacketIO): # RfcommClient # ----------------------------------------------------------------------------- class RfcommClient(StreamedPacketIO): - def __init__(self, device, channel, uuid, l2cap_mtu, max_frame_size, window_size): + def __init__( + self, + device, + channel, + uuid, + l2cap_mtu, + max_frame_size, + initial_credits, + max_credits, + credits_threshold, + ): super().__init__() self.device = device self.channel = channel self.uuid = uuid self.l2cap_mtu = l2cap_mtu self.max_frame_size = max_frame_size - self.window_size = window_size + self.initial_credits = initial_credits + self.max_credits = max_credits + self.credits_threshold = credits_threshold self.rfcomm_session = None self.ready = asyncio.Event() @@ -938,12 +952,17 @@ class RfcommClient(StreamedPacketIO): logging.info(color(f'### Opening session for channel {channel}...', 'yellow')) try: dlc_options = {} - if self.max_frame_size: + if self.max_frame_size is not None: dlc_options['max_frame_size'] = self.max_frame_size - if self.window_size: - dlc_options['window_size'] = self.window_size + if self.initial_credits is not None: + dlc_options['initial_credits'] = self.initial_credits rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options) logging.info(color(f'### Session open: {rfcomm_session}', 'yellow')) + if self.max_credits is not None: + rfcomm_session.rx_max_credits = self.max_credits + if self.credits_threshold is not None: + rfcomm_session.rx_credits_threshold = self.credits_threshold + except bumble.core.ConnectionError as error: logging.info(color(f'!!! Session open failed: {error}', 'red')) await rfcomm_mux.disconnect() @@ -967,8 +986,19 @@ class RfcommClient(StreamedPacketIO): # RfcommServer # ----------------------------------------------------------------------------- class RfcommServer(StreamedPacketIO): - def __init__(self, device, channel, l2cap_mtu): + def __init__( + self, + device, + channel, + l2cap_mtu, + max_frame_size, + initial_credits, + max_credits, + credits_threshold, + ): super().__init__() + self.max_credits = max_credits + self.credits_threshold = credits_threshold self.dlc = None self.ready = asyncio.Event() @@ -979,7 +1009,12 @@ class RfcommServer(StreamedPacketIO): rfcomm_server = bumble.rfcomm.Server(device, **server_options) # Listen for incoming DLC connections - channel_number = rfcomm_server.listen(self.on_dlc, channel) + dlc_options = {} + if max_frame_size is not None: + dlc_options['max_frame_size'] = max_frame_size + if initial_credits is not None: + dlc_options['initial_credits'] = initial_credits + channel_number = rfcomm_server.listen(self.on_dlc, channel, **dlc_options) # Setup the SDP to advertise this channel device.sdp_service_records = make_sdp_records(channel_number) @@ -1002,6 +1037,10 @@ class RfcommServer(StreamedPacketIO): dlc.sink = self.on_packet self.io_sink = dlc.write self.dlc = dlc + if self.max_credits is not None: + dlc.rx_max_credits = self.max_credits + if self.credits_threshold is not None: + dlc.rx_credits_threshold = self.credits_threshold async def drain(self): assert self.dlc @@ -1062,9 +1101,9 @@ class Central(Connection.Listener): if self.phy not in (None, HCI_LE_1M_PHY): # Add an connections parameters entry for this PHY. - self.connection_parameter_preferences[ - self.phy - ] = connection_parameter_preferences + self.connection_parameter_preferences[self.phy] = ( + connection_parameter_preferences + ) else: self.connection_parameter_preferences = None @@ -1232,6 +1271,7 @@ class Peripheral(Device.Listener, Connection.Listener): 'cyan', ) ) + await self.connected.wait() logging.info(color('### Connected', 'cyan')) @@ -1318,7 +1358,9 @@ def create_mode_factory(ctx, default_mode): uuid=ctx.obj['rfcomm_uuid'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], max_frame_size=ctx.obj['rfcomm_max_frame_size'], - window_size=ctx.obj['rfcomm_window_size'], + initial_credits=ctx.obj['rfcomm_initial_credits'], + max_credits=ctx.obj['rfcomm_max_credits'], + credits_threshold=ctx.obj['rfcomm_credits_threshold'], ) if mode == 'rfcomm-server': @@ -1326,6 +1368,10 @@ def create_mode_factory(ctx, default_mode): device, channel=ctx.obj['rfcomm_channel'], l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'], + max_frame_size=ctx.obj['rfcomm_max_frame_size'], + initial_credits=ctx.obj['rfcomm_initial_credits'], + max_credits=ctx.obj['rfcomm_max_credits'], + credits_threshold=ctx.obj['rfcomm_credits_threshold'], ) raise ValueError('invalid mode') @@ -1424,9 +1470,19 @@ def create_role_factory(ctx, default_role): help='RFComm maximum frame size', ) @click.option( - '--rfcomm-window-size', + '--rfcomm-initial-credits', + type=int, + help='RFComm initial credits', +) +@click.option( + '--rfcomm-max-credits', + type=int, + help='RFComm max credits', +) +@click.option( + '--rfcomm-credits-threshold', type=int, - help='RFComm window size', + help='RFComm credits threshold', ) @click.option( '--l2cap-psm', @@ -1527,7 +1583,9 @@ def bench( rfcomm_uuid, rfcomm_l2cap_mtu, rfcomm_max_frame_size, - rfcomm_window_size, + rfcomm_initial_credits, + rfcomm_max_credits, + rfcomm_credits_threshold, l2cap_psm, l2cap_mtu, l2cap_mps, @@ -1542,7 +1600,9 @@ def bench( ctx.obj['rfcomm_uuid'] = rfcomm_uuid ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size - ctx.obj['rfcomm_window_size'] = rfcomm_window_size + ctx.obj['rfcomm_initial_credits'] = rfcomm_initial_credits + ctx.obj['rfcomm_max_credits'] = rfcomm_max_credits + ctx.obj['rfcomm_credits_threshold'] = rfcomm_credits_threshold ctx.obj['l2cap_psm'] = l2cap_psm ctx.obj['l2cap_mtu'] = l2cap_mtu ctx.obj['l2cap_mps'] = l2cap_mps @@ -1591,8 +1651,8 @@ def central( mode_factory = create_mode_factory(ctx, 'gatt-client') classic = ctx.obj['classic'] - asyncio.run( - Central( + async def run_central(): + await Central( transport, peripheral_address, classic, @@ -1604,7 +1664,8 @@ def central( encrypt or authenticate, ctx.obj['extended_data_length'], ).run() - ) + + asyncio.run(run_central()) @bench.command() @@ -1615,15 +1676,16 @@ def peripheral(ctx, transport): role_factory = create_role_factory(ctx, 'receiver') mode_factory = create_mode_factory(ctx, 'gatt-server') - asyncio.run( - Peripheral( + async def run_peripheral(): + await Peripheral( transport, ctx.obj['classic'], ctx.obj['extended_data_length'], role_factory, mode_factory, ).run() - ) + + asyncio.run(run_peripheral()) def main(): diff --git a/apps/lea_unicast/app.py b/apps/lea_unicast/app.py new file mode 100644 index 0000000..ae3b442 --- /dev/null +++ b/apps/lea_unicast/app.py @@ -0,0 +1,577 @@ +# Copyright 2021-2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import datetime +import enum +import functools +from importlib import resources +import json +import os +import logging +import pathlib +from typing import Optional, List, cast +import weakref +import struct + +import ctypes +import wasmtime +import wasmtime.loader +import liblc3 # type: ignore +import logging + +import click +import aiohttp.web + +import bumble +from bumble.core import AdvertisingData +from bumble.colors import color +from bumble.device import Device, DeviceConfiguration, AdvertisingParameters +from bumble.transport import open_transport +from bumble.profiles import bap +from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +DEFAULT_UI_PORT = 7654 + + +def _sink_pac_record() -> bap.PacRecord: + return bap.PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=bap.CodecSpecificCapabilities( + supported_sampling_frequencies=( + bap.SupportedSamplingFrequency.FREQ_8000 + | bap.SupportedSamplingFrequency.FREQ_16000 + | bap.SupportedSamplingFrequency.FREQ_24000 + | bap.SupportedSamplingFrequency.FREQ_32000 + | bap.SupportedSamplingFrequency.FREQ_48000 + ), + supported_frame_durations=( + bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_count=[1, 2], + min_octets_per_codec_frame=26, + max_octets_per_codec_frame=240, + supported_max_codec_frames_per_sdu=2, + ), + ) + + +def _source_pac_record() -> bap.PacRecord: + return bap.PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=bap.CodecSpecificCapabilities( + supported_sampling_frequencies=( + bap.SupportedSamplingFrequency.FREQ_8000 + | bap.SupportedSamplingFrequency.FREQ_16000 + | bap.SupportedSamplingFrequency.FREQ_24000 + | bap.SupportedSamplingFrequency.FREQ_32000 + | bap.SupportedSamplingFrequency.FREQ_48000 + ), + supported_frame_durations=( + bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_count=[1], + min_octets_per_codec_frame=30, + max_octets_per_codec_frame=100, + supported_max_codec_frames_per_sdu=1, + ), + ) + + +# ----------------------------------------------------------------------------- +# WASM - liblc3 +# ----------------------------------------------------------------------------- +store = wasmtime.loader.store +_memory = cast(wasmtime.Memory, liblc3.memory) +STACK_POINTER = _memory.data_len(store) +_memory.grow(store, 1) +# Mapping wasmtime memory to linear address +memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address( + ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore +) + + +class Liblc3PcmFormat(enum.IntEnum): + S16 = 0 + S24 = 1 + S24_3LE = 2 + FLOAT = 3 + + +MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, 48000) +MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, 48000) + +DECODER_STACK_POINTER = STACK_POINTER +ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2 +DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2 +ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192 +DEFAULT_PCM_SAMPLE_RATE = 48000 +DEFAULT_PCM_FORMAT = Liblc3PcmFormat.S16 +DEFAULT_PCM_BYTES_PER_SAMPLE = 2 + + +encoders: List[int] = [] +decoders: List[int] = [] + + +def setup_encoders( + sample_rate_hz: int, frame_duration_us: int, num_channels: int +) -> None: + logger.info( + f"setup_encoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels" + ) + encoders[:num_channels] = [ + liblc3.lc3_setup_encoder( + frame_duration_us, + sample_rate_hz, + DEFAULT_PCM_SAMPLE_RATE, # Input sample rate + ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i, + ) + for i in range(num_channels) + ] + + +def setup_decoders( + sample_rate_hz: int, frame_duration_us: int, num_channels: int +) -> None: + logger.info( + f"setup_decoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels" + ) + decoders[:num_channels] = [ + liblc3.lc3_setup_decoder( + frame_duration_us, + sample_rate_hz, + DEFAULT_PCM_SAMPLE_RATE, # Output sample rate + DECODER_STACK_POINTER + MAX_DECODER_SIZE * i, + ) + for i in range(num_channels) + ] + + +def decode( + frame_duration_us: int, + num_channels: int, + input_bytes: bytes, +) -> bytes: + if not input_bytes: + return b'' + + input_buffer_offset = DECODE_BUFFER_STACK_POINTER + input_buffer_size = len(input_bytes) + input_bytes_per_frame = input_buffer_size // num_channels + + # Copy into wasm + memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore + + output_buffer_offset = input_buffer_offset + input_buffer_size + output_buffer_size = ( + liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE) + * DEFAULT_PCM_BYTES_PER_SAMPLE + * num_channels + ) + + for i in range(num_channels): + res = liblc3.lc3_decode( + decoders[i], + input_buffer_offset + input_bytes_per_frame * i, + input_bytes_per_frame, + DEFAULT_PCM_FORMAT, + output_buffer_offset + i * DEFAULT_PCM_BYTES_PER_SAMPLE, + num_channels, # Stride + ) + + if res != 0: + logging.error(f"Parsing failed, res={res}") + + # Extract decoded data from the output buffer + return bytes( + memory[output_buffer_offset : output_buffer_offset + output_buffer_size] + ) + + +def encode( + sdu_length: int, + num_channels: int, + stride: int, + input_bytes: bytes, +) -> bytes: + if not input_bytes: + return b'' + + input_buffer_offset = ENCODE_BUFFER_STACK_POINTER + input_buffer_size = len(input_bytes) + + # Copy into wasm + memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore + + output_buffer_offset = input_buffer_offset + input_buffer_size + output_buffer_size = sdu_length + output_frame_size = output_buffer_size // num_channels + + for i in range(num_channels): + res = liblc3.lc3_encode( + encoders[i], + DEFAULT_PCM_FORMAT, + input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i, + stride, + output_frame_size, + output_buffer_offset + output_frame_size * i, + ) + + if res != 0: + logging.error(f"Parsing failed, res={res}") + + # Extract decoded data from the output buffer + return bytes( + memory[output_buffer_offset : output_buffer_offset + output_buffer_size] + ) + + +async def lc3_source_task( + filename: str, + sdu_length: int, + frame_duration_us: int, + device: Device, + cis_handle: int, +) -> None: + with open(filename, 'rb') as f: + header = f.read(44) + assert header[8:12] == b'WAVE' + + pcm_num_channel, pcm_sample_rate, _byte_rate, _block_align, bits_per_sample = ( + struct.unpack("<HIIHH", header[22:36]) + ) + assert pcm_sample_rate == DEFAULT_PCM_SAMPLE_RATE + assert bits_per_sample == DEFAULT_PCM_BYTES_PER_SAMPLE * 8 + + frame_bytes = ( + liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE) + * DEFAULT_PCM_BYTES_PER_SAMPLE + ) + packet_sequence_number = 0 + + while True: + next_round = datetime.datetime.now() + datetime.timedelta( + microseconds=frame_duration_us + ) + pcm_data = f.read(frame_bytes) + sdu = encode(sdu_length, pcm_num_channel, pcm_num_channel, pcm_data) + + iso_packet = HCI_IsoDataPacket( + connection_handle=cis_handle, + data_total_length=sdu_length + 4, + packet_sequence_number=packet_sequence_number, + pb_flag=0b10, + packet_status_flag=0, + iso_sdu_length=sdu_length, + iso_sdu_fragment=sdu, + ) + device.host.send_hci_packet(iso_packet) + packet_sequence_number += 1 + sleep_time = next_round - datetime.datetime.now() + await asyncio.sleep(sleep_time.total_seconds()) + + +# ----------------------------------------------------------------------------- +class UiServer: + speaker: weakref.ReferenceType[Speaker] + port: int + + def __init__(self, speaker: Speaker, port: int) -> None: + self.speaker = weakref.ref(speaker) + self.port = port + self.channel_socket = None + + async def start_http(self) -> None: + """Start the UI HTTP server.""" + + app = aiohttp.web.Application() + app.add_routes( + [ + aiohttp.web.get('/', self.get_static), + aiohttp.web.get('/index.html', self.get_static), + aiohttp.web.get('/channel', self.get_channel), + ] + ) + + runner = aiohttp.web.AppRunner(app) + await runner.setup() + site = aiohttp.web.TCPSite(runner, 'localhost', self.port) + print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green')) + await site.start() + + async def get_static(self, request): + path = request.path + if path == '/': + path = '/index.html' + if path.endswith('.html'): + content_type = 'text/html' + elif path.endswith('.js'): + content_type = 'text/javascript' + elif path.endswith('.css'): + content_type = 'text/css' + elif path.endswith('.svg'): + content_type = 'image/svg+xml' + else: + content_type = 'text/plain' + text = ( + resources.files("bumble.apps.lea_unicast") + .joinpath(pathlib.Path(path).relative_to('/')) + .read_text(encoding="utf-8") + ) + return aiohttp.web.Response(text=text, content_type=content_type) + + async def get_channel(self, request): + ws = aiohttp.web.WebSocketResponse() + await ws.prepare(request) + + # Process messages until the socket is closed. + self.channel_socket = ws + async for message in ws: + if message.type == aiohttp.WSMsgType.TEXT: + logger.debug(f'<<< received message: {message.data}') + await self.on_message(message.data) + elif message.type == aiohttp.WSMsgType.ERROR: + logger.debug( + f'channel connection closed with exception {ws.exception()}' + ) + + self.channel_socket = None + logger.debug('--- channel connection closed') + + return ws + + async def on_message(self, message_str: str): + # Parse the message as JSON + message = json.loads(message_str) + + # Dispatch the message + message_type = message['type'] + message_params = message.get('params', {}) + handler = getattr(self, f'on_{message_type}_message') + if handler: + await handler(**message_params) + + async def on_hello_message(self): + await self.send_message( + 'hello', + bumble_version=bumble.__version__, + codec=self.speaker().codec, + streamState=self.speaker().stream_state.name, + ) + if connection := self.speaker().connection: + await self.send_message( + 'connection', + peer_address=connection.peer_address.to_string(False), + peer_name=connection.peer_name, + ) + + async def send_message(self, message_type: str, **kwargs) -> None: + if self.channel_socket is None: + return + + message = {'type': message_type, 'params': kwargs} + await self.channel_socket.send_json(message) + + async def send_audio(self, data: bytes) -> None: + if self.channel_socket is None: + return + + try: + await self.channel_socket.send_bytes(data) + except Exception as error: + logger.warning(f'exception while sending audio packet: {error}') + + +# ----------------------------------------------------------------------------- +class Speaker: + + def __init__( + self, + device_config_path: Optional[str], + ui_port: int, + transport: str, + lc3_input_file_path: str, + ): + self.device_config_path = device_config_path + self.transport = transport + self.lc3_input_file_path = lc3_input_file_path + + # Create an HTTP server for the UI + self.ui_server = UiServer(speaker=self, port=ui_port) + + async def run(self) -> None: + await self.ui_server.start_http() + + async with await open_transport(self.transport) as hci_transport: + # Create a device + if self.device_config_path: + device_config = DeviceConfiguration.from_file(self.device_config_path) + else: + device_config = DeviceConfiguration( + name="Bumble LE Headphone", + class_of_device=0x244418, + keystore="JsonKeyStore", + advertising_interval_min=25, + advertising_interval_max=25, + address=Address('F1:F2:F3:F4:F5:F6'), + ) + + device_config.le_enabled = True + device_config.cis_enabled = True + self.device = Device.from_config_with_hci( + device_config, hci_transport.source, hci_transport.sink + ) + + self.device.add_service( + bap.PublishedAudioCapabilitiesService( + supported_source_context=bap.ContextType(0xFFFF), + available_source_context=bap.ContextType(0xFFFF), + supported_sink_context=bap.ContextType(0xFFFF), # All context types + available_sink_context=bap.ContextType(0xFFFF), # All context types + sink_audio_locations=( + bap.AudioLocation.FRONT_LEFT | bap.AudioLocation.FRONT_RIGHT + ), + sink_pac=[_sink_pac_record()], + source_audio_locations=bap.AudioLocation.FRONT_LEFT, + source_pac=[_source_pac_record()], + ) + ) + + ascs = bap.AudioStreamControlService( + self.device, sink_ase_id=[1], source_ase_id=[2] + ) + self.device.add_service(ascs) + + advertising_data = bytes( + AdvertisingData( + [ + ( + AdvertisingData.COMPLETE_LOCAL_NAME, + bytes(device_config.name, 'utf-8'), + ), + ( + AdvertisingData.FLAGS, + bytes([AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]), + ), + ( + AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, + bytes(bap.PublishedAudioCapabilitiesService.UUID), + ), + ] + ) + ) + bytes(bap.UnicastServerAdvertisingData()) + + def on_pdu(pdu: HCI_IsoDataPacket, ase: bap.AseStateMachine): + codec_config = ase.codec_specific_configuration + assert isinstance(codec_config, bap.CodecSpecificConfiguration) + pcm = decode( + codec_config.frame_duration.us, + codec_config.audio_channel_allocation.channel_count, + pdu.iso_sdu_fragment, + ) + self.device.abort_on('disconnection', self.ui_server.send_audio(pcm)) + + def on_ase_state_change(ase: bap.AseStateMachine) -> None: + if ase.state == bap.AseStateMachine.State.STREAMING: + codec_config = ase.codec_specific_configuration + assert isinstance(codec_config, bap.CodecSpecificConfiguration) + assert ase.cis_link + if ase.role == bap.AudioRole.SOURCE: + ase.cis_link.abort_on( + 'disconnection', + lc3_source_task( + filename=self.lc3_input_file_path, + sdu_length=( + codec_config.codec_frames_per_sdu + * codec_config.octets_per_codec_frame + ), + frame_duration_us=codec_config.frame_duration.us, + device=self.device, + cis_handle=ase.cis_link.handle, + ), + ) + else: + ase.cis_link.sink = functools.partial(on_pdu, ase=ase) + elif ase.state == bap.AseStateMachine.State.CODEC_CONFIGURED: + codec_config = ase.codec_specific_configuration + assert isinstance(codec_config, bap.CodecSpecificConfiguration) + if ase.role == bap.AudioRole.SOURCE: + setup_encoders( + codec_config.sampling_frequency.hz, + codec_config.frame_duration.us, + codec_config.audio_channel_allocation.channel_count, + ) + else: + setup_decoders( + codec_config.sampling_frequency.hz, + codec_config.frame_duration.us, + codec_config.audio_channel_allocation.channel_count, + ) + + for ase in ascs.ase_state_machines.values(): + ase.on('state_change', functools.partial(on_ase_state_change, ase=ase)) + + await self.device.power_on() + await self.device.create_advertising_set( + advertising_data=advertising_data, + auto_restart=True, + advertising_parameters=AdvertisingParameters( + primary_advertising_interval_min=100, + primary_advertising_interval_max=100, + ), + ) + + await hci_transport.source.terminated + + +@click.command() +@click.option( + '--ui-port', + 'ui_port', + metavar='HTTP_PORT', + default=DEFAULT_UI_PORT, + show_default=True, + help='HTTP port for the UI server', +) +@click.option('--device-config', metavar='FILENAME', help='Device configuration file') +@click.argument('transport') +@click.argument('lc3_file') +def speaker(ui_port: int, device_config: str, transport: str, lc3_file: str) -> None: + """Run the speaker.""" + + asyncio.run(Speaker(device_config, ui_port, transport, lc3_file).run()) + + +# ----------------------------------------------------------------------------- +def main(): + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) + speaker() + + +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + main() # pylint: disable=no-value-for-parameter diff --git a/apps/lea_unicast/index.html b/apps/lea_unicast/index.html new file mode 100644 index 0000000..fb1e61c --- /dev/null +++ b/apps/lea_unicast/index.html @@ -0,0 +1,68 @@ +<html data-bs-theme="dark"> + +<head> + <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet" + integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous"> + <script src="https://unpkg.com/pcm-player"></script> +</head> + +<body> + <nav class="navbar navbar-dark bg-primary"> + <div class="container"> + <span class="navbar-brand mb-0 h1">Bumble Unicast Server</span> + </div> + </nav> + <br> + + <div class="container"> + <button type="button" class="btn btn-danger" id="connect-audio" onclick="connectAudio()">Connect Audio</button> + <button class="btn btn-primary" type="button" disabled> + <span class="spinner-border spinner-border-sm" id="ws-status-spinner" aria-hidden="true"></span> + <span role="status" id="ws-status">WebSocket Connecting...</span> + </button> + </div> + + + <script> + let player = null; + const wsStatus = document.getElementById("ws-status"); + const wsStatusSpinner = document.getElementById("ws-status-spinner"); + + const socket = new WebSocket('ws://127.0.0.1:7654/channel'); + socket.binaryType = "arraybuffer"; + socket.onmessage = function (message) { + if (typeof message.data === 'string' || message.data instanceof String) { + console.log(`channel MESSAGE: ${message.data}`); + } else { + console.log(typeof (message.data)) + // BINARY audio data. + if (player == null) return; + player.feed(message.data); + } + }; + + socket.onopen = (message) => { + wsStatusSpinner.remove(); + wsStatus.textContent = "WebSocket Connected"; + } + + socket.onclose = (message) => { + wsStatus.textContent = "WebSocket Disconnected"; + } + + function connectAudio() { + player = new PCMPlayer({ + inputCodec: 'Int16', + channels: 2, + sampleRate: 48000, + flushTime: 10, + }); + const button = document.getElementById("connect-audio") + button.disabled = true; + button.textContent = "Audio Connected"; + } + </script> + </div> +</body> + +</html>
\ No newline at end of file diff --git a/apps/lea_unicast/liblc3.wasm b/apps/lea_unicast/liblc3.wasm Binary files differnew file mode 100755 index 0000000..e905105 --- /dev/null +++ b/apps/lea_unicast/liblc3.wasm diff --git a/apps/rfcomm_bridge.py b/apps/rfcomm_bridge.py new file mode 100644 index 0000000..728e7cf --- /dev/null +++ b/apps/rfcomm_bridge.py @@ -0,0 +1,511 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import os +import time +from typing import Optional + +import click + +from bumble.colors import color +from bumble.device import Device, DeviceConfiguration, Connection +from bumble import core +from bumble import hci +from bumble import rfcomm +from bumble import transport +from bumble import utils + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +DEFAULT_RFCOMM_UUID = "E6D55659-C8B4-4B85-96BB-B1143AF6D3AE" +DEFAULT_MTU = 4096 +DEFAULT_CLIENT_TCP_PORT = 9544 +DEFAULT_SERVER_TCP_PORT = 9545 + +TRACE_MAX_SIZE = 48 + + +# ----------------------------------------------------------------------------- +class Tracer: + """ + Trace data buffers transmitted from one endpoint to another, with stats. + """ + + def __init__(self, channel_name: str) -> None: + self.channel_name = channel_name + self.last_ts: float = 0.0 + + def trace_data(self, data: bytes) -> None: + now = time.time() + elapsed_s = now - self.last_ts if self.last_ts else 0 + elapsed_ms = int(elapsed_s * 1000) + instant_throughput_kbps = ((len(data) / elapsed_s) / 1000) if elapsed_s else 0.0 + + hex_str = data[:TRACE_MAX_SIZE].hex() + ( + "..." if len(data) > TRACE_MAX_SIZE else "" + ) + print( + f"[{self.channel_name}] {len(data):4} bytes " + f"(+{elapsed_ms:4}ms, {instant_throughput_kbps: 7.2f}kB/s) " + f" {hex_str}" + ) + + self.last_ts = now + + +# ----------------------------------------------------------------------------- +class ServerBridge: + """ + RFCOMM server bridge: waits for a peer to connect an RFCOMM channel. + The RFCOMM channel may be associated with a UUID published in an SDP service + description, or simply be on a system-assigned channel number. + When the connection is made, the bridge connects a TCP socket to a remote host and + bridges the data in both directions, with flow control. + When the RFCOMM channel is closed, the bridge disconnects the TCP socket + and waits for a new channel to be connected. + """ + + READ_CHUNK_SIZE = 4096 + + def __init__( + self, channel: int, uuid: str, trace: bool, tcp_host: str, tcp_port: int + ) -> None: + self.device: Optional[Device] = None + self.channel = channel + self.uuid = uuid + self.tcp_host = tcp_host + self.tcp_port = tcp_port + self.rfcomm_channel: Optional[rfcomm.DLC] = None + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None + + async def start(self, device: Device) -> None: + self.device = device + + # Create and register a server + rfcomm_server = rfcomm.Server(self.device) + + # Listen for incoming DLC connections + self.channel = rfcomm_server.listen(self.on_rfcomm_channel, self.channel) + + # Setup the SDP to advertise this channel + service_record_handle = 0x00010001 + self.device.sdp_service_records = { + service_record_handle: rfcomm.make_service_sdp_records( + service_record_handle, self.channel, core.UUID(self.uuid) + ) + } + + # We're ready for a connection + self.device.on("connection", self.on_connection) + await self.set_available(True) + + print( + color( + ( + f"### Listening for RFCOMM connection on {device.public_address}, " + f"channel {self.channel}" + ), + "yellow", + ) + ) + + async def set_available(self, available: bool): + # Become discoverable and connectable + assert self.device + await self.device.set_connectable(available) + await self.device.set_discoverable(available) + + def on_connection(self, connection): + print(color(f"@@@ Bluetooth connection: {connection}", "blue")) + connection.on("disconnection", self.on_disconnection) + + # Don't accept new connections until we're disconnected + utils.AsyncRunner.spawn(self.set_available(False)) + + def on_disconnection(self, reason: int): + print( + color("@@@ Bluetooth disconnection:", "red"), + hci.HCI_Constant.error_name(reason), + ) + + # We're ready for a new connection + utils.AsyncRunner.spawn(self.set_available(True)) + + # Called when an RFCOMM channel is established + @utils.AsyncRunner.run_in_task() + async def on_rfcomm_channel(self, rfcomm_channel): + print(color("*** RFCOMM channel:", "cyan"), rfcomm_channel) + + # Connect to the TCP server + print( + color( + f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}", + "yellow", + ) + ) + try: + reader, writer = await asyncio.open_connection(self.tcp_host, self.tcp_port) + except OSError: + print(color("!!! Connection failed", "red")) + await rfcomm_channel.disconnect() + return + + # Pipe data from RFCOMM to TCP + def on_rfcomm_channel_closed(): + print(color("*** RFCOMM channel closed", "cyan")) + writer.close() + + def write_rfcomm_data(data): + if self.rfcomm_tracer: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.sink = write_rfcomm_data + rfcomm_channel.on("close", on_rfcomm_channel_closed) + + # Pipe data from TCP to RFCOMM + while True: + try: + data = await reader.read(self.READ_CHUNK_SIZE) + + if len(data) == 0: + print(color("### TCP end of stream", "yellow")) + if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: + await rfcomm_channel.disconnect() + return + + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + + rfcomm_channel.write(data) + await rfcomm_channel.drain() + except Exception as error: + print(f"!!! Exception: {error}") + break + + writer.close() + await writer.wait_closed() + print(color("~~~ Bye bye", "magenta")) + + +# ----------------------------------------------------------------------------- +class ClientBridge: + """ + RFCOMM client bridge: connects to a BR/EDR device, then waits for an inbound + TCP connection on a specified port number. When a TCP client connects, an + RFCOMM connection to the device is established, and the data is bridged in both + directions, with flow control. + When the TCP connection is closed by the client, the RFCOMM channel is + disconnected, but the connection to the device remains, ready for a new TCP client + to connect. + """ + + READ_CHUNK_SIZE = 4096 + + def __init__( + self, + channel: int, + uuid: str, + trace: bool, + address: str, + tcp_host: str, + tcp_port: int, + encrypt: bool, + ): + self.channel = channel + self.uuid = uuid + self.trace = trace + self.address = address + self.tcp_host = tcp_host + self.tcp_port = tcp_port + self.encrypt = encrypt + self.device: Optional[Device] = None + self.connection: Optional[Connection] = None + self.rfcomm_client: Optional[rfcomm.Client] + self.rfcomm_mux: Optional[rfcomm.Multiplexer] + self.tcp_connected: bool = False + + self.tcp_tracer: Optional[Tracer] + self.rfcomm_tracer: Optional[Tracer] + + if trace: + self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan")) + self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta")) + else: + self.rfcomm_tracer = None + self.tcp_tracer = None + + async def connect(self) -> None: + if self.connection: + return + + print(color(f"@@@ Connecting to Bluetooth {self.address}", "blue")) + assert self.device + self.connection = await self.device.connect( + self.address, transport=core.BT_BR_EDR_TRANSPORT + ) + print(color(f"@@@ Bluetooth connection: {self.connection}", "blue")) + self.connection.on("disconnection", self.on_disconnection) + + if self.encrypt: + print(color("@@@ Encrypting Bluetooth connection", "blue")) + await self.connection.encrypt() + print(color("@@@ Bluetooth connection encrypted", "blue")) + + self.rfcomm_client = rfcomm.Client(self.connection) + try: + self.rfcomm_mux = await self.rfcomm_client.start() + except BaseException as e: + print(color("!!! Failed to setup RFCOMM connection", "red"), e) + raise + + async def start(self, device: Device) -> None: + self.device = device + await device.set_connectable(False) + await device.set_discoverable(False) + + # Called when a TCP connection is established + async def on_tcp_connection(reader, writer): + print(color("<<< TCP connection", "magenta")) + if self.tcp_connected: + print( + color("!!! TCP connection already active, rejecting new one", "red") + ) + writer.close() + return + self.tcp_connected = True + + try: + await self.pipe(reader, writer) + except BaseException as error: + print(color("!!! Exception while piping data:", "red"), error) + return + finally: + writer.close() + await writer.wait_closed() + self.tcp_connected = False + + await asyncio.start_server( + on_tcp_connection, + host=self.tcp_host if self.tcp_host != "_" else None, + port=self.tcp_port, + ) + print( + color( + f"### Listening for TCP connections on port {self.tcp_port}", "magenta" + ) + ) + + async def pipe( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + # Resolve the channel number from the UUID if needed + if self.channel == 0: + await self.connect() + assert self.connection + channel = await rfcomm.find_rfcomm_channel_with_uuid( + self.connection, self.uuid + ) + if channel: + print(color(f"### Found RFCOMM channel {channel}", "yellow")) + else: + print(color(f"!!! RFCOMM channel with UUID {self.uuid} not found")) + return + else: + channel = self.channel + + # Connect a new RFCOMM channel + await self.connect() + assert self.rfcomm_mux + print(color(f"*** Opening RFCOMM channel {channel}", "green")) + try: + rfcomm_channel = await self.rfcomm_mux.open_dlc(channel) + print(color(f"*** RFCOMM channel open: {rfcomm_channel}", "green")) + except Exception as error: + print(color(f"!!! RFCOMM open failed: {error}", "red")) + return + + # Pipe data from RFCOMM to TCP + def on_rfcomm_channel_closed(): + print(color("*** RFCOMM channel closed", "green")) + + def write_rfcomm_data(data): + if self.trace: + self.rfcomm_tracer.trace_data(data) + + writer.write(data) + + rfcomm_channel.on("close", on_rfcomm_channel_closed) + rfcomm_channel.sink = write_rfcomm_data + + # Pipe data from TCP to RFCOMM + while True: + try: + data = await reader.read(self.READ_CHUNK_SIZE) + + if len(data) == 0: + print(color("### TCP end of stream", "yellow")) + if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED: + await rfcomm_channel.disconnect() + self.tcp_connected = False + return + + if self.tcp_tracer: + self.tcp_tracer.trace_data(data) + + rfcomm_channel.write(data) + await rfcomm_channel.drain() + except Exception as error: + print(f"!!! Exception: {error}") + break + + print(color("~~~ Bye bye", "magenta")) + + def on_disconnection(self, reason: int) -> None: + print( + color("@@@ Bluetooth disconnection:", "red"), + hci.HCI_Constant.error_name(reason), + ) + self.connection = None + + +# ----------------------------------------------------------------------------- +async def run(device_config, hci_transport, bridge): + print("<<< connecting to HCI...") + async with await transport.open_transport_or_link(hci_transport) as ( + hci_source, + hci_sink, + ): + print("<<< connected") + + if device_config: + device = Device.from_config_file_with_hci( + device_config, hci_source, hci_sink + ) + else: + device = Device.from_config_with_hci( + DeviceConfiguration(), hci_source, hci_sink + ) + device.classic_enabled = True + + # Let's go + await device.power_on() + try: + await bridge.start(device) + + # Wait until the transport terminates + await hci_source.wait_for_termination() + except core.ConnectionError as error: + print(color(f"!!! Bluetooth connection failed: {error}", "red")) + except Exception as error: + print(f"Exception while running bridge: {error}") + + +# ----------------------------------------------------------------------------- +@click.group() +@click.pass_context +@click.option( + "--device-config", + metavar="CONFIG_FILE", + help="Device configuration file", +) +@click.option( + "--hci-transport", metavar="TRANSPORT_NAME", help="HCI transport", required=True +) +@click.option("--trace", is_flag=True, help="Trace bridged data to stdout") +@click.option( + "--channel", + metavar="CHANNEL_NUMER", + help="RFCOMM channel number", + type=int, + default=0, +) +@click.option( + "--uuid", + metavar="UUID", + help="UUID for the RFCOMM channel", + default=DEFAULT_RFCOMM_UUID, +) +def cli( + context, + device_config, + hci_transport, + trace, + channel, + uuid, +): + context.ensure_object(dict) + context.obj["device_config"] = device_config + context.obj["hci_transport"] = hci_transport + context.obj["trace"] = trace + context.obj["channel"] = channel + context.obj["uuid"] = uuid + + +# ----------------------------------------------------------------------------- +@cli.command() +@click.pass_context +@click.option("--tcp-host", help="TCP host", default="localhost") +@click.option("--tcp-port", help="TCP port", default=DEFAULT_SERVER_TCP_PORT) +def server(context, tcp_host, tcp_port): + bridge = ServerBridge( + context.obj["channel"], + context.obj["uuid"], + context.obj["trace"], + tcp_host, + tcp_port, + ) + asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge)) + + +# ----------------------------------------------------------------------------- +@cli.command() +@click.pass_context +@click.argument("bluetooth-address") +@click.option("--tcp-host", help="TCP host", default="_") +@click.option("--tcp-port", help="TCP port", default=DEFAULT_CLIENT_TCP_PORT) +@click.option("--encrypt", is_flag=True, help="Encrypt the connection") +def client(context, bluetooth_address, tcp_host, tcp_port, encrypt): + bridge = ClientBridge( + context.obj["channel"], + context.obj["uuid"], + context.obj["trace"], + bluetooth_address, + tcp_host, + tcp_port, + encrypt, + ) + asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge)) + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper()) +if __name__ == "__main__": + cli(obj={}) # pylint: disable=no-value-for-parameter diff --git a/apps/speaker/speaker.py b/apps/speaker/speaker.py index 84e05a0..fc2230a 100644 --- a/apps/speaker/speaker.py +++ b/apps/speaker/speaker.py @@ -76,6 +76,7 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- DEFAULT_UI_PORT = 7654 + # ----------------------------------------------------------------------------- class AudioExtractor: @staticmethod diff --git a/apps/unbond.py b/apps/unbond.py index 5ffd746..01cb9e5 100644 --- a/apps/unbond.py +++ b/apps/unbond.py @@ -24,6 +24,7 @@ from bumble.device import Device from bumble.keys import JsonKeyStore from bumble.transport import open_transport + # ----------------------------------------------------------------------------- async def unbond_with_keystore(keystore, address): if address is None: diff --git a/bumble/a2dp.py b/bumble/a2dp.py index 653a042..cac14e9 100644 --- a/bumble/a2dp.py +++ b/bumble/a2dp.py @@ -652,7 +652,9 @@ class SbcPacketSource: # Prepare for next packets sequence_number += 1 + sequence_number &= 0xFFFF timestamp += sum((frame.sample_count for frame in frames)) + timestamp &= 0xFFFFFFFF frames = [frame] frames_size = len(frame.payload) else: diff --git a/bumble/att.py b/bumble/att.py index 2bec4ea..0fce3ce 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -655,7 +655,7 @@ class ATT_Write_Command(ATT_PDU): @ATT_PDU.subclass( [ ('attribute_handle', HANDLE_FIELD_SPEC), - ('attribute_value', '*') + ('attribute_value', '*'), # ('authentication_signature', 'TODO') ] ) diff --git a/bumble/avdtp.py b/bumble/avdtp.py index f785109..713f7b7 100644 --- a/bumble/avdtp.py +++ b/bumble/avdtp.py @@ -325,8 +325,8 @@ class MediaPacket: self.padding = padding self.extension = extension self.marker = marker - self.sequence_number = sequence_number - self.timestamp = timestamp + self.sequence_number = sequence_number & 0xFFFF + self.timestamp = timestamp & 0xFFFFFFFF self.ssrc = ssrc self.csrc_list = csrc_list self.payload_type = payload_type @@ -341,7 +341,12 @@ class MediaPacket: | len(self.csrc_list), self.marker << 7 | self.payload_type, ] - ) + struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc) + ) + struct.pack( + '>HII', + self.sequence_number, + self.timestamp, + self.ssrc, + ) for csrc in self.csrc_list: header += struct.pack('>I', csrc) return header + self.payload @@ -1545,9 +1550,10 @@ class Protocol(EventEmitter): assert False # Should never reach this - async def get_capabilities( - self, seid: int - ) -> Union[Get_Capabilities_Response, Get_All_Capabilities_Response,]: + async def get_capabilities(self, seid: int) -> Union[ + Get_Capabilities_Response, + Get_All_Capabilities_Response, + ]: if self.version > (1, 2): return await self.send_command(Get_All_Capabilities_Command(seid)) diff --git a/bumble/avrcp.py b/bumble/avrcp.py index fec2b2c..11f4eff 100644 --- a/bumble/avrcp.py +++ b/bumble/avrcp.py @@ -1745,9 +1745,11 @@ class Protocol(pyee.EventEmitter): avc.CommandFrame.CommandType.CONTROL, avc.Frame.SubunitType.PANEL, 0, - avc.PassThroughFrame.StateFlag.PRESSED - if pressed - else avc.PassThroughFrame.StateFlag.RELEASED, + ( + avc.PassThroughFrame.StateFlag.PRESSED + if pressed + else avc.PassThroughFrame.StateFlag.RELEASED + ), key, b'', ) diff --git a/bumble/controller.py b/bumble/controller.py index eb20292..f4cbe95 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -134,15 +134,15 @@ class Controller: self.hci_sink = None self.link = link - self.central_connections: Dict[ - Address, Connection - ] = {} # Connections where this controller is the central - self.peripheral_connections: Dict[ - Address, Connection - ] = {} # Connections where this controller is the peripheral - self.classic_connections: Dict[ - Address, Connection - ] = {} # Connections in BR/EDR + self.central_connections: Dict[Address, Connection] = ( + {} + ) # Connections where this controller is the central + self.peripheral_connections: Dict[Address, Connection] = ( + {} + ) # Connections where this controller is the peripheral + self.classic_connections: Dict[Address, Connection] = ( + {} + ) # Connections in BR/EDR self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle diff --git a/bumble/device.py b/bumble/device.py index 48f9d58..f9e6b9d 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -17,12 +17,19 @@ # ----------------------------------------------------------------------------- from __future__ import annotations from enum import IntEnum +import copy import functools import json import asyncio import logging import secrets -from contextlib import asynccontextmanager, AsyncExitStack, closing +import sys +from contextlib import ( + asynccontextmanager, + AsyncExitStack, + closing, + AbstractAsyncContextManager, +) from dataclasses import dataclass, field from collections.abc import Iterable from typing import ( @@ -40,6 +47,7 @@ from typing import ( overload, TYPE_CHECKING, ) +from typing_extensions import Self from pyee import EventEmitter @@ -276,12 +284,12 @@ class Advertisement: data_bytes: bytes = b'' # Constants - TX_POWER_NOT_AVAILABLE: ClassVar[ - int - ] = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE - RSSI_NOT_AVAILABLE: ClassVar[ - int - ] = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( + HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + ) + RSSI_NOT_AVAILABLE: ClassVar[int] = ( + HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + ) def __post_init__(self) -> None: self.data = AdvertisingData.from_bytes(self.data_bytes) @@ -558,7 +566,9 @@ class AdvertisingParameters: ) primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL - primary_advertising_channel_map: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap = ( + primary_advertising_channel_map: ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap + ) = ( AdvertisingChannelMap.CHANNEL_37 | AdvertisingChannelMap.CHANNEL_38 | AdvertisingChannelMap.CHANNEL_39 @@ -957,8 +967,9 @@ class ScoLink(CompositeEventEmitter): acl_connection: Connection handle: int link_type: int + sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None - def __post_init__(self): + def __post_init__(self) -> None: super().__init__() async def disconnect( @@ -980,8 +991,9 @@ class CisLink(CompositeEventEmitter): cis_id: int # CIS ID assigned by Central device cig_id: int # CIG ID assigned by Central device state: State = State.PENDING + sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None - def __post_init__(self): + def __post_init__(self) -> None: super().__init__() async def disconnect( @@ -1138,14 +1150,12 @@ class Connection(CompositeEventEmitter): @overload async def create_l2cap_channel( self, spec: l2cap.ClassicChannelSpec - ) -> l2cap.ClassicChannel: - ... + ) -> l2cap.ClassicChannel: ... @overload async def create_l2cap_channel( self, spec: l2cap.LeCreditBasedChannelSpec - ) -> l2cap.LeCreditBasedChannel: - ... + ) -> l2cap.LeCreditBasedChannel: ... async def create_l2cap_channel( self, spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec] @@ -1252,75 +1262,47 @@ class Connection(CompositeEventEmitter): # ----------------------------------------------------------------------------- +@dataclass class DeviceConfiguration: - def __init__(self) -> None: - # Setup defaults - self.name = DEVICE_DEFAULT_NAME - self.address = Address(DEVICE_DEFAULT_ADDRESS) - self.class_of_device = DEVICE_DEFAULT_CLASS_OF_DEVICE - self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA - self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL - self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL - self.le_enabled = True - # LE host enable 2nd parameter - self.le_simultaneous_enabled = False - self.classic_enabled = False - self.classic_sc_enabled = True - self.classic_ssp_enabled = True - self.classic_smp_enabled = True - self.classic_accept_any = True - self.connectable = True - self.discoverable = True - self.advertising_data = bytes( - AdvertisingData( - [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))] - ) + # Setup defaults + name: str = DEVICE_DEFAULT_NAME + address: Address = Address(DEVICE_DEFAULT_ADDRESS) + class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE + scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA + advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + le_enabled: bool = True + # LE host enable 2nd parameter + le_simultaneous_enabled: bool = False + classic_enabled: bool = False + classic_sc_enabled: bool = True + classic_ssp_enabled: bool = True + classic_smp_enabled: bool = True + classic_accept_any: bool = True + connectable: bool = True + discoverable: bool = True + advertising_data: bytes = bytes( + AdvertisingData( + [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))] ) - self.irk = bytes(16) # This really must be changed for any level of security - self.keystore = None + ) + irk: bytes = bytes(16) # This really must be changed for any level of security + keystore: Optional[str] = None + address_resolution_offload: bool = False + cis_enabled: bool = False + + def __post_init__(self) -> None: self.gatt_services: List[Dict[str, Any]] = [] - self.address_resolution_offload = False - self.cis_enabled = False def load_from_dict(self, config: Dict[str, Any]) -> None: + config = copy.deepcopy(config) + # Load simple properties - self.name = config.get('name', self.name) - if address := config.get('address', None): + if address := config.pop('address', None): self.address = Address(address) - self.class_of_device = config.get('class_of_device', self.class_of_device) - self.advertising_interval_min = config.get( - 'advertising_interval', self.advertising_interval_min - ) - self.advertising_interval_max = self.advertising_interval_min - self.keystore = config.get('keystore') - self.le_enabled = config.get('le_enabled', self.le_enabled) - self.le_simultaneous_enabled = config.get( - 'le_simultaneous_enabled', self.le_simultaneous_enabled - ) - self.classic_enabled = config.get('classic_enabled', self.classic_enabled) - self.classic_sc_enabled = config.get( - 'classic_sc_enabled', self.classic_sc_enabled - ) - self.classic_ssp_enabled = config.get( - 'classic_ssp_enabled', self.classic_ssp_enabled - ) - self.classic_smp_enabled = config.get( - 'classic_smp_enabled', self.classic_smp_enabled - ) - self.classic_accept_any = config.get( - 'classic_accept_any', self.classic_accept_any - ) - self.connectable = config.get('connectable', self.connectable) - self.discoverable = config.get('discoverable', self.discoverable) - self.gatt_services = config.get('gatt_services', self.gatt_services) - self.address_resolution_offload = config.get( - 'address_resolution_offload', self.address_resolution_offload - ) - self.cis_enabled = config.get('cis_enabled', self.cis_enabled) # Load or synthesize an IRK - irk = config.get('irk') - if irk: + if irk := config.pop('irk', None): self.irk = bytes.fromhex(irk) elif self.address != Address(DEVICE_DEFAULT_ADDRESS): # Construct an IRK from the address bytes @@ -1332,21 +1314,53 @@ class DeviceConfiguration: # Fallback - when both IRK and address are not set, randomly generate an IRK. self.irk = secrets.token_bytes(16) + if (name := config.pop('name', None)) is not None: + self.name = name + # Load advertising data - advertising_data = config.get('advertising_data') - if advertising_data: + if advertising_data := config.pop('advertising_data', None): self.advertising_data = bytes.fromhex(advertising_data) - elif config.get('name') is not None: + elif name is not None: self.advertising_data = bytes( AdvertisingData( [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))] ) ) - def load_from_file(self, filename): + # Load advertising interval (for backward compatibility) + if advertising_interval := config.pop('advertising_interval', None): + self.advertising_interval_min = advertising_interval + self.advertising_interval_max = advertising_interval + if ( + 'advertising_interval_max' in config + or 'advertising_interval_min' in config + ): + logger.warning( + 'Trying to set both advertising_interval and ' + 'advertising_interval_min/max, advertising_interval will be' + 'ignored.' + ) + + # Load data in primitive types. + for key, value in config.items(): + setattr(self, key, value) + + def load_from_file(self, filename: str) -> None: with open(filename, 'r', encoding='utf-8') as file: self.load_from_dict(json.load(file)) + @classmethod + def from_file(cls: Type[Self], filename: str) -> Self: + config = cls() + config.load_from_file(filename) + return config + + @classmethod + def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self: + device_config = cls() + device_config.load_from_dict(config) + return device_config + # ----------------------------------------------------------------------------- # Decorators used with the following Device class @@ -1470,8 +1484,7 @@ class Device(CompositeEventEmitter): @classmethod def from_config_file(cls, filename: str) -> Device: - config = DeviceConfiguration() - config.load_from_file(filename) + config = DeviceConfiguration.from_file(filename) return cls(config=config) @classmethod @@ -1488,8 +1501,7 @@ class Device(CompositeEventEmitter): def from_config_file_with_hci( cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink ) -> Device: - config = DeviceConfiguration() - config.load_from_file(filename) + config = DeviceConfiguration.from_file(filename) return cls.from_config_with_hci(config, hci_source, hci_sink) def __init__( @@ -1529,6 +1541,12 @@ class Device(CompositeEventEmitter): Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY + # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated. + if sys.version_info >= (3, 10): + self._cis_lock = asyncio.Lock() + else: + self._cis_lock = AsyncExitStack() + # Own address type cache self.connect_own_address_type = None @@ -1723,16 +1741,14 @@ class Device(CompositeEventEmitter): self, connection: Connection, spec: l2cap.ClassicChannelSpec, - ) -> l2cap.ClassicChannel: - ... + ) -> l2cap.ClassicChannel: ... @overload async def create_l2cap_channel( self, connection: Connection, spec: l2cap.LeCreditBasedChannelSpec, - ) -> l2cap.LeCreditBasedChannel: - ... + ) -> l2cap.LeCreditBasedChannel: ... async def create_l2cap_channel( self, @@ -1753,16 +1769,14 @@ class Device(CompositeEventEmitter): self, spec: l2cap.ClassicChannelSpec, handler: Optional[Callable[[l2cap.ClassicChannel], Any]] = None, - ) -> l2cap.ClassicChannelServer: - ... + ) -> l2cap.ClassicChannelServer: ... @overload def create_l2cap_server( self, spec: l2cap.LeCreditBasedChannelSpec, handler: Optional[Callable[[l2cap.LeCreditBasedChannel], Any]] = None, - ) -> l2cap.LeCreditBasedChannelServer: - ... + ) -> l2cap.LeCreditBasedChannelServer: ... def create_l2cap_server( self, @@ -2188,7 +2202,7 @@ class Device(CompositeEventEmitter): # controller. await self.send_command( HCI_LE_Remove_Advertising_Set_Command( - advertising_handle=advertising_data + advertising_handle=advertising_handle ), check_result=False, ) @@ -3289,17 +3303,19 @@ class Device(CompositeEventEmitter): handler = self.on( 'remote_name', - lambda address, remote_name: pending_name.set_result(remote_name) - if address == peer_address - else None, + lambda address, remote_name: ( + pending_name.set_result(remote_name) + if address == peer_address + else None + ), ) failure_handler = self.on( 'remote_name_failure', - lambda address, error_code: pending_name.set_exception( - HCI_Error(error_code) - ) - if address == peer_address - else None, + lambda address, error_code: ( + pending_name.set_exception(HCI_Error(error_code)) + if address == peer_address + else None + ), ) try: @@ -3404,49 +3420,71 @@ class Device(CompositeEventEmitter): for cis_handle, _ in cis_acl_pairs } - @watcher.on(self, 'cis_establishment') def on_cis_establishment(cis_link: CisLink) -> None: if pending_future := pending_cis_establishments.get(cis_link.handle): pending_future.set_result(cis_link) - result = await self.send_command( + def on_cis_establishment_failure(cis_handle: int, status: int) -> None: + if pending_future := pending_cis_establishments.get(cis_handle): + pending_future.set_exception(HCI_Error(status)) + + watcher.on(self, 'cis_establishment', on_cis_establishment) + watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure) + await self.send_command( HCI_LE_Create_CIS_Command( cis_connection_handle=[p[0] for p in cis_acl_pairs], acl_connection_handle=[p[1] for p in cis_acl_pairs], ), + check_result=True, ) - if result.status != HCI_COMMAND_STATUS_PENDING: - logger.warning( - 'HCI_LE_Create_CIS_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' - ) - raise HCI_StatusError(result) return await asyncio.gather(*pending_cis_establishments.values()) # [LE only] @experimental('Only for testing.') async def accept_cis_request(self, handle: int) -> CisLink: - result = await self.send_command( - HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), - ) - if result.status != HCI_COMMAND_STATUS_PENDING: - logger.warning( - 'HCI_LE_Accept_CIS_Request_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' - ) - raise HCI_StatusError(result) + """[LE Only] Accepts an incoming CIS request. - pending_cis_establishment = asyncio.get_running_loop().create_future() + When the specified CIS handle is already created, this method returns the + existed CIS link object immediately. - with closing(EventWatcher()) as watcher: + Args: + handle: CIS handle to accept. - @watcher.on(self, 'cis_establishment') - def on_cis_establishment(cis_link: CisLink) -> None: - if cis_link.handle == handle: - pending_cis_establishment.set_result(cis_link) + Returns: + CIS link object on the given handle. + """ + if not (cis_link := self.cis_links.get(handle)): + raise InvalidStateError(f'No pending CIS request of handle {handle}') + + # There might be multiple ASE sharing a CIS channel. + # If one of them has accepted the request, the others should just leverage it. + async with self._cis_lock: + if cis_link.state == CisLink.State.ESTABLISHED: + return cis_link + + with closing(EventWatcher()) as watcher: + pending_establishment = asyncio.get_running_loop().create_future() + + def on_establishment() -> None: + pending_establishment.set_result(None) + + def on_establishment_failure(status: int) -> None: + pending_establishment.set_exception(HCI_Error(status)) + + watcher.on(cis_link, 'establishment', on_establishment) + watcher.on(cis_link, 'establishment_failure', on_establishment_failure) + + await self.send_command( + HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), + check_result=True, + ) + + await pending_establishment + return cis_link - return await pending_cis_establishment + # Mypy believes this is reachable when context is an ExitStack. + raise InvalidStateError('Unreachable') # [LE only] @experimental('Only for testing.') @@ -3455,15 +3493,10 @@ class Device(CompositeEventEmitter): handle: int, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, ) -> None: - result = await self.send_command( + await self.send_command( HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason), + check_result=True, ) - if result.status != HCI_COMMAND_STATUS_PENDING: - logger.warning( - 'HCI_LE_Reject_CIS_Request_Command failed: ' - f'{HCI_Constant.error_name(result.status)}' - ) - raise HCI_StatusError(result) async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: """[LE Only] Reads remote LE supported features. @@ -3475,19 +3508,25 @@ class Device(CompositeEventEmitter): LE features supported by the remote device. """ with closing(EventWatcher()) as watcher: - read_feature_future: asyncio.Future[ - LeFeatureMask - ] = asyncio.get_running_loop().create_future() + read_feature_future: asyncio.Future[LeFeatureMask] = ( + asyncio.get_running_loop().create_future() + ) def on_le_remote_features(handle: int, features: int): if handle == connection.handle: read_feature_future.set_result(LeFeatureMask(features)) + def on_failure(handle: int, status: int): + if handle == connection.handle: + read_feature_future.set_exception(HCI_Error(status)) + watcher.on(self.host, 'le_remote_features', on_le_remote_features) + watcher.on(self.host, 'le_remote_features_failure', on_failure) await self.send_command( HCI_LE_Read_Remote_Features_Command( connection_handle=connection.handle ), + check_result=True, ) return await read_feature_future @@ -3662,7 +3701,6 @@ class Device(CompositeEventEmitter): # We were connected via a legacy advertisement. if self.legacy_advertiser: own_address_type = self.legacy_advertiser.own_address_type - self.legacy_advertiser = None else: # This should not happen, but just in case, pick a default. logger.warning("connection without an advertiser") @@ -3693,15 +3731,14 @@ class Device(CompositeEventEmitter): ) self.connections[connection_handle] = connection - if ( - role == HCI_PERIPHERAL_ROLE - and self.legacy_advertiser - and self.legacy_advertiser.auto_restart - ): - connection.once( - 'disconnection', - lambda _: self.abort_on('flush', self.legacy_advertiser.start()), - ) + if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser: + if self.legacy_advertiser.auto_restart: + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', self.legacy_advertiser.start()), + ) + else: + self.legacy_advertiser = None if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: # We can emit now, we have all the info we need @@ -4109,8 +4146,8 @@ class Device(CompositeEventEmitter): @host_event_handler @experimental('Only for testing') def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: - if sco_link := self.sco_links.get(sco_handle): - sco_link.emit('pdu', packet) + if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink: + sco_link.sink(packet) # [LE only] @host_event_handler @@ -4166,15 +4203,15 @@ class Device(CompositeEventEmitter): def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') if cis_link := self.cis_links.pop(cis_handle): - cis_link.emit('establishment_failure') + cis_link.emit('establishment_failure', status) self.emit('cis_establishment_failure', cis_handle, status) # [LE only] @host_event_handler @experimental('Only for testing') def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: - if cis_link := self.cis_links.get(handle): - cis_link.emit('pdu', packet) + if (cis_link := self.cis_links.get(handle)) and cis_link.sink: + cis_link.sink(packet) @host_event_handler @with_connection_from_handle diff --git a/bumble/gap.py b/bumble/gap.py index 29df89f..c07a30d 100644 --- a/bumble/gap.py +++ b/bumble/gap.py @@ -36,6 +36,7 @@ logger = logging.getLogger(__name__) # Classes # ----------------------------------------------------------------------------- + # ----------------------------------------------------------------------------- class GenericAccessService(Service): def __init__(self, device_name, appearance=(0, 0)): diff --git a/bumble/gatt.py b/bumble/gatt.py index 71c01f4..896cec0 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -342,9 +342,11 @@ class Service(Attribute): uuid = UUID(uuid) super().__init__( - GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE - if primary - else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, + ( + GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE + if primary + else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE + ), Attribute.READABLE, uuid.to_pdu_bytes(), ) @@ -560,9 +562,9 @@ class CharacteristicAdapter: def __init__(self, characteristic: Union[Characteristic, AttributeProxy]): self.wrapped_characteristic = characteristic - self.subscribers: Dict[ - Callable, Callable - ] = {} # Map from subscriber to proxy subscriber + self.subscribers: Dict[Callable, Callable] = ( + {} + ) # Map from subscriber to proxy subscriber if isinstance(characteristic, Characteristic): self.read_value = self.read_encoded_value diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index 2079a65..c71aabd 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -91,6 +91,22 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- +# Utils +# ----------------------------------------------------------------------------- + + +def show_services(services: Iterable[ServiceProxy]) -> None: + for service in services: + print(color(str(service), 'cyan')) + + for characteristic in service.characteristics: + print(color(' ' + str(characteristic), 'magenta')) + + for descriptor in characteristic.descriptors: + print(color(' ' + str(descriptor), 'green')) + + +# ----------------------------------------------------------------------------- # Proxies # ----------------------------------------------------------------------------- class AttributeProxy(EventEmitter): @@ -352,9 +368,7 @@ class Client: if c.uuid == uuid ] - def get_attribute_grouping( - self, attribute_handle: int - ) -> Optional[ + def get_attribute_grouping(self, attribute_handle: int) -> Optional[ Union[ ServiceProxy, Tuple[ServiceProxy, CharacteristicProxy], diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 3be4185..be2b88e 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -445,9 +445,9 @@ class Server(EventEmitter): assert self.pending_confirmations[connection.handle] is None # Create a future value to hold the eventual response - pending_confirmation = self.pending_confirmations[ - connection.handle - ] = asyncio.get_running_loop().create_future() + pending_confirmation = self.pending_confirmations[connection.handle] = ( + asyncio.get_running_loop().create_future() + ) try: self.send_gatt_pdu(connection.handle, indication.to_bytes()) diff --git a/bumble/hci.py b/bumble/hci.py index 013a2d3..9ef40bf 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -23,7 +23,7 @@ import functools import logging import secrets import struct -from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar from bumble import crypto from .colors import color @@ -2003,7 +2003,7 @@ class HCI_Packet: Abstract Base class for HCI packets ''' - hci_packet_type: int + hci_packet_type: ClassVar[int] @staticmethod def from_bytes(packet: bytes) -> HCI_Packet: @@ -4249,9 +4249,11 @@ class HCI_LE_Set_Extended_Scan_Parameters_Command(HCI_Command): fields.append( ( f'{scanning_phy_str}.scan_type: ', - 'PASSIVE' - if self.scan_types[i] == self.PASSIVE_SCANNING - else 'ACTIVE', + ( + 'PASSIVE' + if self.scan_types[i] == self.PASSIVE_SCANNING + else 'ACTIVE' + ), ) ) fields.append( @@ -5010,9 +5012,9 @@ class HCI_LE_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_LE_Meta_Event.subevent_classes[ - HCI_LE_ADVERTISING_REPORT_EVENT -] = HCI_LE_Advertising_Report_Event +HCI_LE_Meta_Event.subevent_classes[HCI_LE_ADVERTISING_REPORT_EVENT] = ( + HCI_LE_Advertising_Report_Event +) # ----------------------------------------------------------------------------- @@ -5264,9 +5266,9 @@ class HCI_LE_Extended_Advertising_Report_Event(HCI_LE_Meta_Event): return f'{color(self.subevent_name(self.subevent_code), "magenta")}:\n{reports}' -HCI_LE_Meta_Event.subevent_classes[ - HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT -] = HCI_LE_Extended_Advertising_Report_Event +HCI_LE_Meta_Event.subevent_classes[HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT] = ( + HCI_LE_Extended_Advertising_Report_Event +) # ----------------------------------------------------------------------------- @@ -6190,12 +6192,23 @@ class HCI_SynchronousDataPacket(HCI_Packet): # ----------------------------------------------------------------------------- +@dataclasses.dataclass class HCI_IsoDataPacket(HCI_Packet): ''' See Bluetooth spec @ 5.4.5 HCI ISO Data Packets ''' - hci_packet_type = HCI_ISO_DATA_PACKET + hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET + + connection_handle: int + data_total_length: int + iso_sdu_fragment: bytes + pb_flag: int + ts_flag: int = 0 + time_stamp: Optional[int] = None + packet_sequence_number: Optional[int] = None + iso_sdu_length: Optional[int] = None + packet_status_flag: Optional[int] = None @staticmethod def from_bytes(packet: bytes) -> HCI_IsoDataPacket: @@ -6239,28 +6252,6 @@ class HCI_IsoDataPacket(HCI_Packet): iso_sdu_fragment=iso_sdu_fragment, ) - def __init__( - self, - connection_handle: int, - pb_flag: int, - ts_flag: int, - data_total_length: int, - time_stamp: Optional[int], - packet_sequence_number: Optional[int], - iso_sdu_length: Optional[int], - packet_status_flag: Optional[int], - iso_sdu_fragment: bytes, - ) -> None: - self.connection_handle = connection_handle - self.pb_flag = pb_flag - self.ts_flag = ts_flag - self.data_total_length = data_total_length - self.time_stamp = time_stamp - self.packet_sequence_number = packet_sequence_number - self.iso_sdu_length = iso_sdu_length - self.packet_status_flag = packet_status_flag - self.iso_sdu_fragment = iso_sdu_fragment - def __bytes__(self) -> bytes: return self.to_bytes() diff --git a/bumble/hfp.py b/bumble/hfp.py index 27bb097..69dab26 100644 --- a/bumble/hfp.py +++ b/bumble/hfp.py @@ -15,6 +15,9 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + +import collections import collections.abc import logging import asyncio @@ -22,15 +25,32 @@ import dataclasses import enum import traceback import pyee -from typing import Dict, List, Union, Set, Any, Optional, TYPE_CHECKING +import re +from typing import ( + Dict, + List, + Union, + Set, + Any, + Optional, + Type, + Tuple, + ClassVar, + Iterable, + TYPE_CHECKING, +) +from typing_extensions import Self from bumble import at +from bumble import device from bumble import rfcomm +from bumble import sdp from bumble.colors import color from bumble.core import ( ProtocolError, BT_GENERIC_AUDIO_SERVICE, BT_HANDSFREE_SERVICE, + BT_HANDSFREE_AUDIO_GATEWAY_SERVICE, BT_L2CAP_PROTOCOL_ID, BT_RFCOMM_PROTOCOL_ID, ) @@ -39,15 +59,6 @@ from bumble.hci import ( CodingFormat, CodecID, ) -from bumble.sdp import ( - DataElement, - ServiceAttribute, - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, -) # ----------------------------------------------------------------------------- @@ -193,17 +204,22 @@ class HfIndicator(enum.IntEnum): BATTERY_LEVEL = 0x02 # Battery level feature -class CallHoldOperation(enum.IntEnum): +class CallHoldOperation(enum.Enum): """ Call Hold supported operations (normative). AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services. """ - RELEASE_ALL_HELD_CALLS = 0 # Release all held calls - RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other - HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other - ADD_HELD_CALL = 3 # Adds a held call to conversation + RELEASE_ALL_HELD_CALLS = "0" # Release all held calls + RELEASE_ALL_ACTIVE_CALLS = "1" # Release all active calls, accept other + RELEASE_SPECIFIC_CALL = "1x" # Release a specific call X + HOLD_ALL_ACTIVE_CALLS = "2" # Place all active calls on hold, accept other + HOLD_ALL_CALLS_EXCEPT = "2x" # Place all active calls except call X + ADD_HELD_CALL = "3" # Adds a held call to conversation + CONNECT_TWO_CALLS = ( + "4" # Connects the two calls and disconnects the subscriber from both calls + ) class ResponseHoldStatus(enum.IntEnum): @@ -324,16 +340,103 @@ class CallInfo: status: CallInfoStatus mode: CallInfoMode multi_party: CallInfoMultiParty - number: Optional[int] = None + number: Optional[str] = None type: Optional[int] = None +@dataclasses.dataclass +class CallLineIdentification: + """ + Calling Line Identification notification. + + TS 127 007 - V6.8.0, 7.6 Calling line identification presentation +CLIP, but only + number, type and alpha are meaningful in HFP. + + Attributes: + number: String type phone number of format specified by `type`. + type: Type of address octet in integer format (refer TS 24.008 [8] subclause + 10.5.4.7). + subaddr: String type subaddress of format specified by `satype`. + satype: Type of subaddress octet in integer format (refer TS 24.008 [8] + subclause 10.5.4.8). + alpha: Optional string type alphanumeric representation of number corresponding + to the entry found in phonebook; used character set should be the one selected + with command Select TE Character Set +CSCS. + cli_validity: 0 CLI valid, 1 CLI has been withheld by the originator, 2 CLI is + not available due to interworking problems or limitations of originating + network. + """ + + number: str + type: int + subaddr: Optional[str] = None + satype: Optional[int] = None + alpha: Optional[str] = None + cli_validity: Optional[int] = None + + @classmethod + def parse_from(cls: Type[Self], parameters: List[bytes]) -> Self: + return cls( + number=parameters[0].decode(), + type=int(parameters[1]), + subaddr=parameters[2].decode() if len(parameters) >= 3 else None, + satype=( + int(parameters[3]) if len(parameters) >= 4 and parameters[3] else None + ), + alpha=parameters[4].decode() if len(parameters) >= 5 else None, + cli_validity=( + int(parameters[5]) if len(parameters) >= 6 and parameters[5] else None + ), + ) + + def to_clip_string(self) -> str: + return ','.join( + str(arg) if arg else '' + for arg in [ + self.number, + self.type, + self.subaddr, + self.satype, + self.alpha, + self.cli_validity, + ] + ) + + +class VoiceRecognitionState(enum.IntEnum): + """ + vrec values provided in AT+BVRA command. + + Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. + """ + + DISABLE = 0 + ENABLE = 1 + # (Enhanced Voice Recognition Status only) HF is ready to accept audio. + ENHANCED_READY = 2 + + +class CmeError(enum.IntEnum): + """ + CME ERROR codes (partial listed). + + TS 127 007 - V6.8.0, 9.2.1 General errors + """ + + PHONE_FAILURE = 0 + OPERATION_NOT_ALLOWED = 3 + OPERATION_NOT_SUPPORTED = 4 + MEMORY_FULL = 20 + INVALID_INDEX = 21 + NOT_FOUND = 22 + + # ----------------------------------------------------------------------------- # Hands-Free Control Interoperability Requirements # ----------------------------------------------------------------------------- # Response codes. -RESPONSE_CODES = [ +RESPONSE_CODES = { "+APLSIRI", "+BAC", "+BCC", @@ -364,10 +467,10 @@ RESPONSE_CODES = [ "+XAPL", "A", "D", -] +} # Unsolicited responses and statuses. -UNSOLICITED_CODES = [ +UNSOLICITED_CODES = { "+APLSIRI", "+BCS", "+BIND", @@ -385,10 +488,10 @@ UNSOLICITED_CODES = [ "NO ANSWER", "NO CARRIER", "RING", -] +} # Status codes -STATUS_CODES = [ +STATUS_CODES = { "+CME ERROR", "BLACKLISTED", "BUSY", @@ -397,16 +500,25 @@ STATUS_CODES = [ "NO ANSWER", "NO CARRIER", "OK", -] +} @dataclasses.dataclass -class Configuration: +class HfConfiguration: supported_hf_features: List[HfFeature] supported_hf_indicators: List[HfIndicator] supported_audio_codecs: List[AudioCodec] +@dataclasses.dataclass +class AgConfiguration: + supported_ag_features: Iterable[AgFeature] + supported_ag_indicators: collections.abc.Sequence[AgIndicatorState] + supported_hf_indicators: Iterable[HfIndicator] + supported_ag_call_hold_operations: Iterable[CallHoldOperation] + supported_audio_codecs: Iterable[AudioCodec] + + class AtResponseType(enum.Enum): """ Indicates if a response is expected from an AT command, and if multiple responses are accepted. @@ -417,31 +529,165 @@ class AtResponseType(enum.Enum): MULTIPLE = 2 +@dataclasses.dataclass class AtResponse: code: str parameters: list - def __init__(self, response: bytearray): - code_and_parameters = response.split(b':') + @classmethod + def parse_from(cls: Type[Self], buffer: bytearray) -> Self: + code_and_parameters = buffer.split(b':') parameters = ( code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray() ) - self.code = code_and_parameters[0].decode() - self.parameters = at.parse_parameters(parameters) + return cls( + code=code_and_parameters[0].decode(), + parameters=at.parse_parameters(parameters), + ) + + +@dataclasses.dataclass +class AtCommand: + class SubCode(str, enum.Enum): + NONE = '' + SET = '=' + TEST = '=?' + READ = '?' + + code: str + sub_code: SubCode + parameters: list + + _PARSE_PATTERN: ClassVar[re.Pattern] = re.compile( + r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)' + ) + + @classmethod + def parse_from(cls: Type[Self], buffer: bytearray) -> Self: + if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())): + if buffer.startswith(b'ATA'): + return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[]) + if buffer.startswith(b'ATD'): + return cls( + code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]] + ) + raise HfpProtocolError('Invalid command') + + parameters = [] + if parameters_text := match.group('parameters'): + parameters = at.parse_parameters(parameters_text.encode()) + + return cls( + code=match.group('code'), + sub_code=AtCommand.SubCode(match.group('sub_code') or ''), + parameters=parameters, + ) @dataclasses.dataclass class AgIndicatorState: - description: str - index: int + """State wrapper of AG indicator. + + Attributes: + indicator: Indicator of this indicator state. + supported_values: Supported values of this indicator. + current_status: Current status of this indicator. + index: (HF only) Index of this indicator. + enabled: (AG only) Whether this indicator is enabled to report. + on_test_text: Text message reported in AT+CIND=? of this indicator. + """ + + indicator: AgIndicator supported_values: Set[int] current_status: int + index: Optional[int] = None + enabled: bool = True + + @property + def on_test_text(self) -> str: + min_value = min(self.supported_values) + max_value = max(self.supported_values) + if len(self.supported_values) == (max_value - min_value + 1): + supported_values_text = f'({min_value}-{max_value})' + else: + supported_values_text = ( + f'({",".join(str(v) for v in self.supported_values)})' + ) + return f'(\"{self.indicator.value}\",{supported_values_text})' + + @classmethod + def call(cls: Type[Self]) -> Self: + """Default call indicator state.""" + return cls( + indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def callsetup(cls: Type[Self]) -> Self: + """Default callsetup indicator state.""" + return cls( + indicator=AgIndicator.CALL_SETUP, + supported_values={0, 1, 2, 3}, + current_status=0, + ) + + @classmethod + def callheld(cls: Type[Self]) -> Self: + """Default call indicator state.""" + return cls( + indicator=AgIndicator.CALL_HELD, + supported_values={0, 1, 2}, + current_status=0, + ) + + @classmethod + def service(cls: Type[Self]) -> Self: + """Default service indicator state.""" + return cls( + indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def signal(cls: Type[Self]) -> Self: + """Default signal indicator state.""" + return cls( + indicator=AgIndicator.SIGNAL, + supported_values={0, 1, 2, 3, 4, 5}, + current_status=0, + ) + + @classmethod + def roam(cls: Type[Self]) -> Self: + """Default roam indicator state.""" + return cls( + indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0 + ) + + @classmethod + def battchg(cls: Type[Self]) -> Self: + """Default battery charge indicator state.""" + return cls( + indicator=AgIndicator.BATTERY_CHARGE, + supported_values={0, 1, 2, 3, 4, 5}, + current_status=0, + ) @dataclasses.dataclass class HfIndicatorState: + """State wrapper of HF indicator. + + Attributes: + indicator: Indicator of this indicator state. + supported: Whether this indicator is supported. + enabled: Whether this indicator is enabled. + current_status: Current (last-reported) status value of this indicaotr. + """ + + indicator: HfIndicator supported: bool = False enabled: bool = False + current_status: int = 0 class HfProtocol(pyee.EventEmitter): @@ -457,8 +703,26 @@ class HfProtocol(pyee.EventEmitter): ag_indicator: When AG update their indicators, notify the new state. Args: ag_indicator: AgIndicator + speaker_volume: Emitted when AG update speaker volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG update microphone volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG sends a ringtone request. + Args: + None + cli_notification: Emitted when notify the call metadata on line. + Args: + cli_notification: CallLineIdentification + voice_recognition: Emitted when AG starts voice recognition autonomously. + Args: + vrec: VoiceRecognitionState """ + class HfLoopTermination(HfpProtocolError): + """Termination signal for run() loop.""" + supported_hf_features: int supported_audio_codecs: List[AudioCodec] @@ -472,14 +736,18 @@ class HfProtocol(pyee.EventEmitter): command_lock: asyncio.Lock if TYPE_CHECKING: response_queue: asyncio.Queue[AtResponse] - unsolicited_queue: asyncio.Queue[AtResponse] + unsolicited_queue: asyncio.Queue[Optional[AtResponse]] else: response_queue: asyncio.Queue unsolicited_queue: asyncio.Queue read_buffer: bytearray active_codec: AudioCodec - def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None: + def __init__( + self, + dlc: rfcomm.DLC, + configuration: HfConfiguration, + ) -> None: super().__init__() # Configure internal state. @@ -489,13 +757,14 @@ class HfProtocol(pyee.EventEmitter): self.unsolicited_queue = asyncio.Queue() self.read_buffer = bytearray() self.active_codec = AudioCodec.CVSD + self._slc_initialized = False # Build local features. self.supported_hf_features = sum(configuration.supported_hf_features) self.supported_audio_codecs = configuration.supported_audio_codecs self.hf_indicators = { - indicator: HfIndicatorState() + indicator: HfIndicatorState(indicator=indicator) for indicator in configuration.supported_hf_indicators } @@ -506,6 +775,10 @@ class HfProtocol(pyee.EventEmitter): # Bind the AT reader to the RFCOMM channel. self.dlc.sink = self._read_at + # Stop the run() loop when L2CAP is closed. + self.dlc.multiplexer.l2cap_channel.on( + 'close', lambda: self.unsolicited_queue.put_nowait(None) + ) def supports_hf_feature(self, feature: HfFeature) -> bool: return (self.supported_hf_features & feature) != 0 @@ -530,7 +803,7 @@ class HfProtocol(pyee.EventEmitter): # Isolate the AT response code and parameters. raw_response = self.read_buffer[header + 2 : trailer] - response = AtResponse(raw_response) + response = AtResponse.parse_from(raw_response) logger.debug(f"<<< {raw_response.decode()}") # Consume the response bytes. @@ -616,7 +889,7 @@ class HfProtocol(pyee.EventEmitter): # If both the HF and AG do support the Codec Negotiation feature # then the HF shall send the AT+BAC=<HF available codecs> command to # the AG to notify the AG of the available codecs in the HF. - codecs = [str(c) for c in self.supported_audio_codecs] + codecs = [str(c.value) for c in self.supported_audio_codecs] await self.execute_command(f"AT+BAC={','.join(codecs)}") # 4.2.1.3 AG Indicators @@ -634,7 +907,7 @@ class HfProtocol(pyee.EventEmitter): self.ag_indicators = [] for index, indicator in enumerate(response.parameters): - description = indicator[0].decode() + description = AgIndicator(indicator[0].decode()) supported_values = [] for value in indicator[1]: value = value.split(b'-') @@ -664,7 +937,7 @@ class HfProtocol(pyee.EventEmitter): if self.supports_hf_feature( HfFeature.THREE_WAY_CALLING - ) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING): + ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): # After the HF has enabled the “Indicators status update” function in # the AG, and if the “Call waiting and 3-way calling” bit was set in the # supported features bitmap by both the HF and the AG, the HF shall @@ -677,9 +950,8 @@ class HfProtocol(pyee.EventEmitter): ) self.supported_ag_call_hold_operations = [ - CallHoldOperation(int(operation)) + CallHoldOperation(operation.decode()) for operation in response.parameters[0] - if not b'x' in operation ] # 4.2.1.4 HF Indicators @@ -692,7 +964,7 @@ class HfProtocol(pyee.EventEmitter): # shall send the AT+BIND=<HF supported HF indicators> command to the AG # to notify the AG of the supported indicators’ assigned numbers in the # HF. The AG shall respond with OK - indicators = [str(i) for i in self.hf_indicators.keys()] + indicators = [str(i.value) for i in self.hf_indicators] await self.execute_command(f"AT+BIND={','.join(indicators)}") # After having provided the AG with the HF indicators it supports, @@ -728,6 +1000,7 @@ class HfProtocol(pyee.EventEmitter): self.hf_indicators[indicator].enabled = True logger.info("SLC setup completed") + self._slc_initialized = True async def setup_audio_connection(self): """4.11.2 Audio Connection Setup by HF.""" @@ -808,28 +1081,46 @@ class HfProtocol(pyee.EventEmitter): mode=CallInfoMode(int(response.parameters[3])), multi_party=CallInfoMultiParty(int(response.parameters[4])), ) + if len(response.parameters) >= 6: + call_info.number = response.parameters[5].decode() if len(response.parameters) >= 7: - call_info.number = int(response.parameters[5]) call_info.type = int(response.parameters[6]) calls.append(call_info) return calls async def update_ag_indicator(self, index: int, value: int): - self.ag_indicators[index].current_status = value - self.emit('ag_indicator', self.ag_indicators[index]) - logger.info( - f"AG indicator updated: {self.ag_indicators[index].description}, {value}" - ) + # CIEV is in 1-index, while ag_indicators is in 0-index. + ag_indicator = self.ag_indicators[index - 1] + ag_indicator.current_status = value + self.emit('ag_indicator', ag_indicator) + logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}") async def handle_unsolicited(self): """Handle unsolicited result codes sent by the audio gateway.""" result = await self.unsolicited_queue.get() + if not result: + raise HfProtocol.HfLoopTermination() if result.code == "+BCS": await self.setup_codec_connection(int(result.parameters[0])) elif result.code == "+CIEV": await self.update_ag_indicator( int(result.parameters[0]), int(result.parameters[1]) ) + elif result.code == "+VGS": + self.emit('speaker_volume', int(result.parameters[0])) + elif result.code == "+VGM": + self.emit('microphone_volume', int(result.parameters[0])) + elif result.code == "RING": + self.emit('ring') + elif result.code == "+CLIP": + self.emit( + 'cli_notification', CallLineIdentification.parse_from(result.parameters) + ) + elif result.code == "+BVRA": + # TODO: Support Enhanced Voice Recognition. + self.emit( + 'voice_recognition', VoiceRecognitionState(int(result.parameters[0])) + ) else: logging.info(f"unhandled unsolicited response {result.code}") @@ -841,14 +1132,479 @@ class HfProtocol(pyee.EventEmitter): """ try: - await self.initiate_slc() + if not self._slc_initialized: + await self.initiate_slc() while True: await self.handle_unsolicited() + except HfProtocol.HfLoopTermination: + logger.info('Loop terminated') except Exception: logger.error("HFP-HF protocol failed with the following error:") logger.error(traceback.format_exc()) +class AgProtocol(pyee.EventEmitter): + """ + Implementation for the Audio-Gateway side of the Hands-Free profile. + + Reference specification Hands-Free Profile v1.8. + + Emitted events: + slc_complete: Emit when SLC procedure is completed. + codec_negotiation: When codec is renegotiated, notify the new codec. + Args: + active_codec: AudioCodec + hf_indicator: When HF update their indicators, notify the new state. + Args: + hf_indicator: HfIndicatorState + codec_connection_request: Emit when HF sends AT+BCC to request codec connection. + answer: Emit when HF sends ATA to answer phone call. + hang_up: Emit when HF sends AT+CHUP to hang up phone call. + dial: Emit when HF sends ATD to dial phone call. + voice_recognition: Emit when HF requests voice recognition state. + Args: + vrec: VoiceRecognitionState + call_hold: Emit when HF requests call hold operation. + Args: + operation: CallHoldOperation + call_index: Optional[int] + speaker_volume: Emitted when AG update speaker volume autonomously. + Args: + volume: Int + microphone_volume: Emitted when AG update microphone volume autonomously. + Args: + volume: Int + """ + + supported_hf_features: int + supported_hf_indicators: Set[HfIndicator] + supported_audio_codecs: List[AudioCodec] + + supported_ag_features: int + supported_ag_call_hold_operations: List[CallHoldOperation] + + ag_indicators: List[AgIndicatorState] + hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState] + + dlc: rfcomm.DLC + + read_buffer: bytearray + active_codec: AudioCodec + calls: List[CallInfo] + + indicator_report_enabled: bool + inband_ringtone_enabled: bool + cme_error_enabled: bool + cli_notification_enabled: bool + call_waiting_enabled: bool + _remained_slc_setup_features: Set[HfFeature] + + def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None: + super().__init__() + + # Configure internal state. + self.dlc = dlc + self.read_buffer = bytearray() + self.active_codec = AudioCodec.CVSD + self.calls = [] + + # Build local features. + self.supported_ag_features = sum(configuration.supported_ag_features) + self.supported_ag_call_hold_operations = list( + configuration.supported_ag_call_hold_operations + ) + self.ag_indicators = list(configuration.supported_ag_indicators) + self.supported_hf_indicators = set(configuration.supported_hf_indicators) + self.inband_ringtone_enabled = True + self._remained_slc_setup_features = set() + + # Clear remote features. + self.supported_hf_features = 0 + self.supported_audio_codecs = [] + self.indicator_report_enabled = False + self.cme_error_enabled = False + self.cli_notification_enabled = False + self.call_waiting_enabled = False + + self.hf_indicators = collections.OrderedDict() + + # Bind the AT reader to the RFCOMM channel. + self.dlc.sink = self._read_at + + def supports_hf_feature(self, feature: HfFeature) -> bool: + return (self.supported_hf_features & feature) != 0 + + def supports_ag_feature(self, feature: AgFeature) -> bool: + return (self.supported_ag_features & feature) != 0 + + def _read_at(self, data: bytes): + """ + Reads AT messages from the RFCOMM channel. + """ + # Append to the read buffer. + self.read_buffer.extend(data) + + # Locate the trailer. + trailer = self.read_buffer.find(b'\r') + if trailer == -1: + return + + # Isolate the AT response code and parameters. + raw_command = self.read_buffer[:trailer] + command = AtCommand.parse_from(raw_command) + logger.debug(f"<<< {raw_command.decode()}") + + # Consume the response bytes. + self.read_buffer = self.read_buffer[trailer + 1 :] + + if command.sub_code == AtCommand.SubCode.TEST: + handler_name = f'_on_{command.code.lower()}_test' + elif command.sub_code == AtCommand.SubCode.READ: + handler_name = f'_on_{command.code.lower()}_read' + else: + handler_name = f'_on_{command.code.lower()}' + + if handler := getattr(self, handler_name, None): + handler(*command.parameters) + else: + logger.warning('Handler %s not found', handler_name) + self.send_response('ERROR') + + def send_response(self, response: str) -> None: + """Sends an AT response.""" + self.dlc.write(f'\r\n{response}\r\n') + + def send_cme_error(self, error_code: CmeError) -> None: + """Sends an CME ERROR response. + + If CME Error is not enabled by HF, sends ERROR instead. + """ + if self.cme_error_enabled: + self.send_response(f'+CME ERROR: {error_code.value}') + else: + self.send_error() + + def send_ok(self) -> None: + """Sends an OK response.""" + self.send_response('OK') + + def send_error(self) -> None: + """Sends an ERROR response.""" + self.send_response('ERROR') + + def set_inband_ringtone_enabled(self, enabled: bool) -> None: + """Enables or disables in-band ringtone.""" + + self.inband_ringtone_enabled = enabled + self.send_response(f'+BSIR: {1 if enabled else 0}') + + def set_speaker_volume(self, level: int) -> None: + """Reports speaker volume.""" + + self.send_response(f'+VGS: {level}') + + def set_microphone_volume(self, level: int) -> None: + """Reports microphone volume.""" + + self.send_response(f'+VGM: {level}') + + def send_ring(self) -> None: + """Sends RING command to trigger ringtone on HF.""" + + self.send_response('RING') + + def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None: + """Updates AG indicator. + + Args: + indicator: Name of the indicator. + value: new value of the indicator. + """ + + search_result = next( + ( + (index, state) + for index, state in enumerate(self.ag_indicators) + if state.indicator == indicator + ), + None, + ) + if not search_result: + raise KeyError(f'{indicator} is not supported.') + + index, indicator_state = search_result + if not self.indicator_report_enabled: + logger.warning('AG indicator report is disabled') + if not indicator_state.enabled: + logger.warning(f'AG indicator {indicator} is disabled') + + indicator_state.current_status = value + self.send_response(f'+CIEV: {index+1},{value}') + + async def negotiate_codec(self, codec: AudioCodec) -> None: + """Starts codec negotiation.""" + + if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): + logger.warning('Local does not support Codec Negotiation') + if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION): + logger.warning('Peer does not support Codec Negotiation') + if codec not in self.supported_audio_codecs: + logger.warning(f'{codec} is not supported by peer') + + at_bcs_future = asyncio.get_running_loop().create_future() + self.once('codec_negotiation', at_bcs_future.set_result) + self.send_response(f'+BCS: {codec.value}') + if (new_codec := await at_bcs_future) != codec: + raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}') + + def send_cli_notification(self, cli: CallLineIdentification) -> None: + """Sends +CLIP CLI notification.""" + + if not self.cli_notification_enabled: + logger.warning('Try to send CLIP while CLI notification is not enabled') + + self.send_response(f'+CLIP: {cli.to_clip_string()}') + + def _check_remained_slc_commands(self) -> None: + if not self._remained_slc_setup_features: + self.emit('slc_complete') + + def _on_brsf(self, hf_features: bytes) -> None: + self.supported_hf_features = int(hf_features) + self.send_response(f'+BRSF: {self.supported_ag_features}') + self.send_ok() + + if self.supports_hf_feature( + HfFeature.HF_INDICATORS + ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): + self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS) + + if self.supports_hf_feature( + HfFeature.THREE_WAY_CALLING + ) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): + self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING) + + def _on_bac(self, *args) -> None: + self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] + self.send_ok() + + def _on_bcs(self, codec: bytes) -> None: + self.active_codec = AudioCodec(int(codec)) + self.send_ok() + self.emit('codec_negotiation', self.active_codec) + + def _on_bvra(self, vrec: bytes) -> None: + self.send_ok() + self.emit('voice_recognition', VoiceRecognitionState(int(vrec))) + + def _on_chld(self, operation_code: bytes) -> None: + call_index: Optional[int] = None + if len(operation_code) > 1: + call_index = int(operation_code[1:]) + operation_code = operation_code[:1] + b'x' + try: + operation = CallHoldOperation(operation_code.decode()) + except: + logger.error(f'Invalid operation: {operation_code.decode()}') + self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) + return + + if operation not in self.supported_ag_call_hold_operations: + logger.error(f'Unsupported operation: {operation_code.decode()}') + self.send_cme_error(CmeError.OPERATION_NOT_SUPPORTED) + + if call_index is not None and not any( + call.index == call_index for call in self.calls + ): + logger.error(f'No matching call {call_index}') + self.send_cme_error(CmeError.INVALID_INDEX) + + # Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :) + + self.send_ok() + self.emit('call_hold', operation, call_index) + + def _on_chld_test(self) -> None: + if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING): + self.send_error() + return + + self.send_response( + '+CHLD: ({})'.format( + ','.join( + operation.value + for operation in self.supported_ag_call_hold_operations + ) + ) + ) + self.send_ok() + self._remained_slc_setup_features.remove(HfFeature.THREE_WAY_CALLING) + self._check_remained_slc_commands() + + def _on_cind_test(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + indicator_list_str = ",".join( + indicator.on_test_text for indicator in self.ag_indicators + ) + self.send_response(f'+CIND: {indicator_list_str}') + self.send_ok() + + def _on_cind_read(self) -> None: + if not self.ag_indicators: + self.send_cme_error(CmeError.NOT_FOUND) + return + + indicator_list_str = ",".join( + str(indicator.current_status) for indicator in self.ag_indicators + ) + self.send_response(f'+CIND: {indicator_list_str}') + self.send_ok() + + self._check_remained_slc_commands() + + def _on_cmer( + self, + mode: bytes, + keypad: Optional[bytes] = None, + display: Optional[bytes] = None, + indicator: bytes = b'', + ) -> None: + if ( + int(mode) != 3 + or (keypad and int(keypad)) + or (display and int(display)) + or int(indicator) not in (0, 1) + ): + logger.error( + f'Unexpected values: mode={mode!r}, keypad={keypad!r}, ' + f'display={display!r}, indicator={indicator!r}' + ) + self.send_cme_error(CmeError.INVALID_INDEX) + + self.indicator_report_enabled = bool(int(indicator)) + self.send_ok() + + def _on_cmee(self, enabled: bytes) -> None: + self.cme_error_enabled = bool(int(enabled)) + self.send_ok() + + def _on_ccwa(self, enabled: bytes) -> None: + self.call_waiting_enabled = bool(int(enabled)) + self.send_ok() + + def _on_bind(self, *args) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + peer_supported_indicators = set( + HfIndicator(int(indicator)) for indicator in args + ) + self.hf_indicators = collections.OrderedDict( + { + indicator: HfIndicatorState(indicator=indicator) + for indicator in self.supported_hf_indicators.intersection( + peer_supported_indicators + ) + } + ) + self.send_ok() + + def _on_bind_test(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + hf_indicator_list_str = ",".join( + str(indicator.value) for indicator in self.supported_hf_indicators + ) + self.send_response(f'+BIND: ({hf_indicator_list_str})') + self.send_ok() + + def _on_bind_read(self) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + for indicator in self.hf_indicators: + self.send_response(f'+BIND: {indicator.value},1') + + self.send_ok() + + self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS) + self._check_remained_slc_commands() + + def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None: + if not self.supports_ag_feature(AgFeature.HF_INDICATORS): + self.send_error() + return + + index = HfIndicator(int(index_bytes)) + if index not in self.hf_indicators: + self.send_error() + return + + self.hf_indicators[index].current_status = int(value_bytes) + self.emit('hf_indicator', self.hf_indicators[index]) + self.send_ok() + + def _on_bia(self, *args) -> None: + for enabled, state in zip(args, self.ag_indicators): + state.enabled = bool(int(enabled)) + self.send_ok() + + def _on_bcc(self) -> None: + self.emit('codec_connection_request') + self.send_ok() + + def _on_a(self) -> None: + """ATA handler.""" + self.emit('answer') + self.send_ok() + + def _on_d(self, number: bytes) -> None: + """ATD handler.""" + self.emit('dial', number.decode()) + self.send_ok() + + def _on_chup(self) -> None: + self.emit('hang_up') + self.send_ok() + + def _on_clcc(self) -> None: + for call in self.calls: + number_text = f',\"{call.number}\"' if call.number is not None else '' + type_text = f',{call.type}' if call.type is not None else '' + response = ( + f'+CLCC: {call.index}' + f',{call.direction.value}' + f',{call.status.value}' + f',{call.mode.value}' + f',{call.multi_party.value}' + f'{number_text}' + f'{type_text}' + ) + self.send_response(response) + self.send_ok() + + def _on_clip(self, enabled: bytes) -> None: + if not self.supports_hf_feature(HfFeature.CLI_PRESENTATION_CAPABILITY): + logger.error('Remote doesn not support CLI but sends AT+CLIP') + self.cli_notification_enabled = True if enabled == b'1' else False + self.send_ok() + + def _on_vgs(self, level: bytes) -> None: + self.emit('speaker_volume', int(level)) + self.send_ok() + + def _on_vgm(self, level: bytes) -> None: + self.emit('microphone_volume', int(level)) + self.send_ok() + + # ----------------------------------------------------------------------------- # Normative SDP definitions # ----------------------------------------------------------------------------- @@ -902,9 +1658,12 @@ class AgSdpFeature(enum.IntFlag): VOICE_RECOGNITION_TEST = 0x80 -def sdp_records( - service_record_handle: int, rfcomm_channel: int, configuration: Configuration -) -> List[ServiceAttribute]: +def make_hf_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: HfConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: """ Generates the SDP record for HFP Hands-Free support. @@ -936,53 +1695,234 @@ def sdp_records( hf_supported_features |= HfSdpFeature.WIDE_BAND return [ - ServiceAttribute( - SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, - DataElement.unsigned_integer_32(service_record_handle), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), + ), + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), + ] + ), + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( + [ + sdp.DataElement.sequence( + [ + sdp.DataElement.uuid(BT_HANDSFREE_SERVICE), + sdp.DataElement.unsigned_integer_16(version), + ] + ) + ] + ), + ), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(hf_supported_features), + ), + ] + + +def make_ag_sdp_records( + service_record_handle: int, + rfcomm_channel: int, + configuration: AgConfiguration, + version: ProfileVersion = ProfileVersion.V1_8, +) -> List[sdp.ServiceAttribute]: + """ + Generates the SDP record for HFP Audio-Gateway support. + + The record exposes the features supported in the input configuration, + and the allocated RFCOMM channel. + """ + + ag_supported_features = 0 + + if AgFeature.EC_NR in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.EC_NR + if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING + if ( + AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS + in configuration.supported_ag_features + ): + ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS + if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST + if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: + ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION + if AudioCodec.MSBC in configuration.supported_audio_codecs: + ag_supported_features |= AgSdpFeature.WIDE_BAND + + return [ + sdp.ServiceAttribute( + sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_32(service_record_handle), ), - ServiceAttribute( - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), + sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), ] ), ), - ServiceAttribute( - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), - DataElement.sequence( + sdp.DataElement.sequence( + [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] + ), + sdp.DataElement.sequence( [ - DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), - DataElement.unsigned_integer_8(rfcomm_channel), + sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + sdp.DataElement.unsigned_integer_8(rfcomm_channel), ] ), ] ), ), - ServiceAttribute( - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - DataElement.sequence( + sdp.ServiceAttribute( + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.DataElement.sequence( [ - DataElement.sequence( + sdp.DataElement.sequence( [ - DataElement.uuid(BT_HANDSFREE_SERVICE), - DataElement.unsigned_integer_16(ProfileVersion.V1_8), + sdp.DataElement.uuid(BT_HANDSFREE_AUDIO_GATEWAY_SERVICE), + sdp.DataElement.unsigned_integer_16(version), ] ) ] ), ), - ServiceAttribute( - SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, - DataElement.unsigned_integer_16(hf_supported_features), + sdp.ServiceAttribute( + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.DataElement.unsigned_integer_16(ag_supported_features), ), ] +async def find_hf_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]: + """Searches a Hands-Free SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Tuple of (<RFCOMM channel>, <Profile Version>, <HF SDP features>) + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HANDSFREE_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[HfSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = HfSdpFeature(attribute.value.value) + elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: + class_id_list = attribute.value.value + uuid = class_id_list[0].value + # AG record may also contain HF UUID in its profile descriptor list. + # If found, skip this record. + if uuid == BT_HANDSFREE_AUDIO_GATEWAY_SERVICE: + channel, version, features = (None, None, None) + break + + if channel is not None and version is not None and features is not None: + return (channel, version, features) + return None + + +async def find_ag_sdp_record( + connection: device.Connection, +) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]: + """Searches an Audio-Gateway SDP record from remote device. + + Args: + connection: ACL connection to make SDP search. + + Returns: + Tuple of (<RFCOMM channel>, <Profile Version>, <AG SDP features>) + """ + async with sdp.Client(connection) as sdp_client: + search_result = await sdp_client.search_attributes( + uuids=[BT_HANDSFREE_AUDIO_GATEWAY_SERVICE], + attribute_ids=[ + sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, + sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, + ], + ) + for attribute_lists in search_result: + channel: Optional[int] = None + version: Optional[ProfileVersion] = None + features: Optional[AgSdpFeature] = None + for attribute in attribute_lists: + # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. + if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: + protocol_descriptor_list = attribute.value.value + channel = protocol_descriptor_list[1].value[1].value + elif ( + attribute.id + == sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID + ): + profile_descriptor_list = attribute.value.value + version = ProfileVersion(profile_descriptor_list[0].value[1].value) + elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID: + features = AgSdpFeature(attribute.value.value) + if not channel or not version or features is None: + logger.warning(f"Bad result {attribute_lists}.") + return None + return (channel, version, features) + return None + + # ----------------------------------------------------------------------------- # ESCO Codec Default Parameters # ----------------------------------------------------------------------------- @@ -1006,7 +1946,9 @@ class EscoParameters: transmit_coding_format: CodingFormat receive_coding_format: CodingFormat packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType - retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort + retransmission_effort: ( + HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort + ) max_latency: int # Common @@ -1014,12 +1956,12 @@ class EscoParameters: output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM) input_coded_data_size: int = 16 output_coded_data_size: int = 16 - input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = ( - HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT - ) - output_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = ( - HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT - ) + input_pcm_data_format: ( + HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat + ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT + output_pcm_data_format: ( + HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat + ) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT input_pcm_sample_payload_msb_position: int = 0 output_pcm_sample_payload_msb_position: int = 0 input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = ( diff --git a/bumble/hid.py b/bumble/hid.py index fc5c807..1b4aa00 100644 --- a/bumble/hid.py +++ b/bumble/hid.py @@ -48,6 +48,7 @@ HID_INTERRUPT_PSM = 0x0013 class Message: message_type: MessageType + # Report types class ReportType(enum.IntEnum): OTHER_REPORT = 0x00 diff --git a/bumble/host.py b/bumble/host.py index fd0a247..64b6668 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -184,7 +184,7 @@ class Host(AbortableEventEmitter): self.long_term_key_provider = None self.link_key_provider = None self.pairing_io_capability_provider = None # Classic only - self.snooper = None + self.snooper: Optional[Snooper] = None # Connect to the source and sink if specified if controller_source: @@ -530,7 +530,9 @@ class Host(AbortableEventEmitter): # Check the return parameters if required if check_result: - if isinstance(response.return_parameters, int): + if isinstance(response, hci.HCI_Command_Status_Event): + status = response.status + elif isinstance(response.return_parameters, int): status = response.return_parameters elif isinstance(response.return_parameters, bytes): # return parameters first field is a one byte status code @@ -719,14 +721,16 @@ class Host(AbortableEventEmitter): for connection_handle, num_completed_packets in zip( event.connection_handles, event.num_completed_packets ): - if not (connection := self.connections.get(connection_handle)): + if connection := self.connections.get(connection_handle): + connection.acl_packet_queue.on_packets_completed(num_completed_packets) + elif not ( + self.cis_links.get(connection_handle) + or self.sco_links.get(connection_handle) + ): logger.warning( 'received packet completion event for unknown handle ' f'0x{connection_handle:04X}' ) - continue - - connection.acl_packet_queue.on_packets_completed(num_completed_packets) # Classic only def on_hci_connection_request_event(self, event): diff --git a/bumble/keys.py b/bumble/keys.py index 198d5c4..facaa37 100644 --- a/bumble/keys.py +++ b/bumble/keys.py @@ -25,7 +25,8 @@ import asyncio import logging import os import json -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type +from typing_extensions import Self from .colors import color from .hci import Address @@ -128,10 +129,10 @@ class PairingKeys: def print(self, prefix=''): keys_dict = self.to_dict() - for (container_property, value) in keys_dict.items(): + for container_property, value in keys_dict.items(): if isinstance(value, dict): print(f'{prefix}{color(container_property, "cyan")}:') - for (key_property, key_value) in value.items(): + for key_property, key_value in value.items(): print(f'{prefix} {color(key_property, "green")}: {key_value}') else: print(f'{prefix}{color(container_property, "cyan")}: {value}') @@ -158,7 +159,7 @@ class KeyStore: async def get_resolving_keys(self): all_keys = await self.get_all() resolving_keys = [] - for (name, keys) in all_keys: + for name, keys in all_keys: if keys.irk is not None: if keys.address_type is None: address_type = Address.RANDOM_DEVICE_ADDRESS @@ -171,7 +172,7 @@ class KeyStore: async def print(self, prefix=''): entries = await self.get_all() separator = '' - for (name, keys) in entries: + for name, keys in entries: print(separator + prefix + color(name, 'yellow')) keys.print(prefix=prefix + ' ') separator = '\n' @@ -253,8 +254,10 @@ class JsonKeyStore(KeyStore): logger.debug(f'JSON keystore: {self.filename}') - @staticmethod - def from_device(device: Device, filename=None) -> Optional[JsonKeyStore]: + @classmethod + def from_device( + cls: Type[Self], device: Device, filename: Optional[str] = None + ) -> Self: if not filename: # Extract the filename from the config if there is one if device.config.keystore is not None: @@ -270,7 +273,7 @@ class JsonKeyStore(KeyStore): else: namespace = JsonKeyStore.DEFAULT_NAMESPACE - return JsonKeyStore(namespace, filename) + return cls(namespace, filename) async def load(self): # Try to open the file, without failing. If the file does not exist, it diff --git a/bumble/l2cap.py b/bumble/l2cap.py index cec14b8..b4f0121 100644 --- a/bumble/l2cap.py +++ b/bumble/l2cap.py @@ -70,6 +70,7 @@ L2CAP_LE_SIGNALING_CID = 0x05 L2CAP_MIN_LE_MTU = 23 L2CAP_MIN_BR_EDR_MTU = 48 +L2CAP_MAX_BR_EDR_MTU = 65535 L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept @@ -832,7 +833,9 @@ class ClassicChannel(EventEmitter): # Wait for the connection to succeed or fail try: - return await self.connection_result + return await self.connection.abort_on( + 'disconnection', self.connection_result + ) finally: self.connection_result = None @@ -2225,7 +2228,7 @@ class ChannelManager: # Connect try: await channel.connect() - except Exception as e: + except BaseException as e: del connection_channels[source_cid] raise e diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index e54d2d5..4904274 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -287,9 +287,9 @@ class HostService(HostServicer): self.log.debug(f"WaitDisconnection: {connection_handle}") if connection := self.device.lookup_connection(connection_handle): - disconnection_future: asyncio.Future[ - None - ] = asyncio.get_running_loop().create_future() + disconnection_future: asyncio.Future[None] = ( + asyncio.get_running_loop().create_future() + ) def on_disconnection(_: None) -> None: disconnection_future.set_result(None) @@ -370,9 +370,9 @@ class HostService(HostServicer): scan_response_data=scan_response_data, ) - pending_connection: asyncio.Future[ - bumble.device.Connection - ] = asyncio.get_running_loop().create_future() + pending_connection: asyncio.Future[bumble.device.Connection] = ( + asyncio.get_running_loop().create_future() + ) if request.connectable: @@ -516,9 +516,9 @@ class HostService(HostServicer): await asyncio.sleep(1) continue - pending_connection: asyncio.Future[ - bumble.device.Connection - ] = asyncio.get_running_loop().create_future() + pending_connection: asyncio.Future[bumble.device.Connection] = ( + asyncio.get_running_loop().create_future() + ) self.log.debug('Wait for LE connection...') connection = await pending_connection @@ -563,12 +563,14 @@ class HostService(HostServicer): legacy=request.legacy, active=not request.passive, own_address_type=request.own_address_type, - scan_interval=int(request.interval) - if request.interval - else DEVICE_DEFAULT_SCAN_INTERVAL, - scan_window=int(request.window) - if request.window - else DEVICE_DEFAULT_SCAN_WINDOW, + scan_interval=( + int(request.interval) + if request.interval + else DEVICE_DEFAULT_SCAN_INTERVAL + ), + scan_window=( + int(request.window) if request.window else DEVICE_DEFAULT_SCAN_WINDOW + ), scanning_phys=scanning_phys, ) @@ -782,9 +784,11 @@ class HostService(HostServicer): *struct.pack('<H', dt.peripheral_connection_interval_min), *struct.pack( '<H', - dt.peripheral_connection_interval_max - if dt.peripheral_connection_interval_max - else dt.peripheral_connection_interval_min, + ( + dt.peripheral_connection_interval_max + if dt.peripheral_connection_interval_max + else dt.peripheral_connection_interval_min + ), ), ] ), diff --git a/bumble/pandora/security.py b/bumble/pandora/security.py index b36fb18..2cbb78a 100644 --- a/bumble/pandora/security.py +++ b/bumble/pandora/security.py @@ -383,9 +383,9 @@ class SecurityService(SecurityServicer): connection.transport ] == request.level_variant() - wait_for_security: asyncio.Future[ - str - ] = asyncio.get_running_loop().create_future() + wait_for_security: asyncio.Future[str] = ( + asyncio.get_running_loop().create_future() + ) authenticate_task: Optional[asyncio.Future[None]] = None pair_task: Optional[asyncio.Future[None]] = None diff --git a/bumble/profiles/bap.py b/bumble/profiles/bap.py index dd57f01..c0123b1 100644 --- a/bumble/profiles/bap.py +++ b/bumble/profiles/bap.py @@ -24,8 +24,9 @@ import enum import struct import functools import logging -from typing import Optional, List, Union, Type, Dict, Any, Tuple, cast +from typing import Optional, List, Union, Type, Dict, Any, Tuple +from bumble import core from bumble import colors from bumble import device from bumble import hci @@ -77,6 +78,10 @@ class AudioLocation(enum.IntFlag): LEFT_SURROUND = 0x04000000 RIGHT_SURROUND = 0x08000000 + @property + def channel_count(self) -> int: + return bin(self.value).count('1') + class AudioInputType(enum.IntEnum): '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type''' @@ -217,6 +222,13 @@ class FrameDuration(enum.IntEnum): DURATION_7500_US = 0x00 DURATION_10000_US = 0x01 + @property + def us(self) -> int: + return { + FrameDuration.DURATION_7500_US: 7500, + FrameDuration.DURATION_10000_US: 10000, + }[self] + class SupportedFrameDuration(enum.IntFlag): '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration''' @@ -228,6 +240,14 @@ class SupportedFrameDuration(enum.IntFlag): DURATION_10000_US_PREFERRED = 0b0010 +class AnnouncementType(enum.IntEnum): + '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements''' + + # fmt: off + GENERAL = 0x00 + TARGETED = 0x01 + + # ----------------------------------------------------------------------------- # ASE Operations # ----------------------------------------------------------------------------- @@ -453,6 +473,34 @@ class AudioRole(enum.IntEnum): SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER +@dataclasses.dataclass +class UnicastServerAdvertisingData: + """Advertising Data for ASCS.""" + + announcement_type: AnnouncementType = AnnouncementType.TARGETED + available_audio_contexts: ContextType = ContextType.MEDIA + metadata: bytes = b'' + + def __bytes__(self) -> bytes: + return bytes( + core.AdvertisingData( + [ + ( + core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, + struct.pack( + '<2sBIB', + gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(), + self.announcement_type, + self.available_audio_contexts, + len(self.metadata), + ) + + self.metadata, + ) + ] + ) + ) + + # ----------------------------------------------------------------------------- # Utils # ----------------------------------------------------------------------------- @@ -497,7 +545,7 @@ class CodecSpecificCapabilities: supported_sampling_frequencies: SupportedSamplingFrequency supported_frame_durations: SupportedFrameDuration - supported_audio_channel_counts: Sequence[int] + supported_audio_channel_count: Sequence[int] min_octets_per_codec_frame: int max_octets_per_codec_frame: int supported_max_codec_frames_per_sdu: int @@ -506,7 +554,7 @@ class CodecSpecificCapabilities: def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities: offset = 0 # Allowed default values. - supported_audio_channel_counts = [1] + supported_audio_channel_count = [1] supported_max_codec_frames_per_sdu = 1 while offset < len(data): length, type = struct.unpack_from('BB', data, offset) @@ -519,7 +567,7 @@ class CodecSpecificCapabilities: elif type == CodecSpecificCapabilities.Type.FRAME_DURATION: supported_frame_durations = SupportedFrameDuration(value) elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT: - supported_audio_channel_counts = bits_to_channel_counts(value) + supported_audio_channel_count = bits_to_channel_counts(value) elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME: min_octets_per_sample = value & 0xFFFF max_octets_per_sample = value >> 16 @@ -530,7 +578,7 @@ class CodecSpecificCapabilities: return CodecSpecificCapabilities( supported_sampling_frequencies=supported_sampling_frequencies, supported_frame_durations=supported_frame_durations, - supported_audio_channel_counts=supported_audio_channel_counts, + supported_audio_channel_count=supported_audio_channel_count, min_octets_per_codec_frame=min_octets_per_sample, max_octets_per_codec_frame=max_octets_per_sample, supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu, @@ -547,7 +595,7 @@ class CodecSpecificCapabilities: self.supported_frame_durations, 2, CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT, - channel_counts_to_bits(self.supported_audio_channel_counts), + channel_counts_to_bits(self.supported_audio_channel_count), 5, CodecSpecificCapabilities.Type.OCTETS_PER_FRAME, self.min_octets_per_codec_frame, @@ -833,15 +881,22 @@ class AseStateMachine(gatt.Characteristic): cig_id: int, cis_id: int, ) -> None: - if cis_id == self.cis_id and self.state == self.State.ENABLING: + if ( + cig_id == self.cig_id + and cis_id == self.cis_id + and self.state == self.State.ENABLING + ): acl_connection.abort_on( 'flush', self.service.device.accept_cis_request(cis_handle) ) def on_cis_establishment(self, cis_link: device.CisLink) -> None: - if cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING: - self.state = self.State.STREAMING - self.cis_link = cis_link + if ( + cis_link.cig_id == self.cig_id + and cis_link.cis_id == self.cis_id + and self.state == self.State.ENABLING + ): + cis_link.on('disconnection', self.on_cis_disconnection) async def post_cis_established(): await self.service.device.send_command( @@ -854,9 +909,15 @@ class AseStateMachine(gatt.Characteristic): codec_configuration=b'', ) ) + if self.role == AudioRole.SINK: + self.state = self.State.STREAMING await self.service.device.notify_subscribers(self, self.value) cis_link.acl_connection.abort_on('flush', post_cis_established()) + self.cis_link = cis_link + + def on_cis_disconnection(self, _reason) -> None: + self.cis_link = None def on_config_codec( self, @@ -954,11 +1015,17 @@ class AseStateMachine(gatt.Characteristic): AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, AseReasonCode.NONE, ) - self.state = self.State.DISABLING + if self.role == AudioRole.SINK: + self.state = self.State.QOS_CONFIGURED + else: + self.state = self.State.DISABLING return (AseResponseCode.SUCCESS, AseReasonCode.NONE) def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]: - if self.state != AseStateMachine.State.DISABLING: + if ( + self.role != AudioRole.SOURCE + or self.state != AseStateMachine.State.DISABLING + ): return ( AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, AseReasonCode.NONE, @@ -1009,6 +1076,7 @@ class AseStateMachine(gatt.Characteristic): def state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}') self._state = new_state + self.emit('state_change') @property def value(self): @@ -1081,6 +1149,7 @@ class AudioStreamControlService(gatt.TemplateService): ase_state_machines: Dict[int, AseStateMachine] ase_control_point: gatt.Characteristic + _active_client: Optional[device.Connection] = None def __init__( self, @@ -1118,7 +1187,16 @@ class AudioStreamControlService(gatt.TemplateService): else: return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE) + def _on_client_disconnected(self, _reason: int) -> None: + for ase in self.ase_state_machines.values(): + ase.state = AseStateMachine.State.IDLE + self._active_client = None + def on_write_ase_control_point(self, connection, data): + if not self._active_client and connection: + self._active_client = connection + connection.once('disconnection', self._on_client_disconnected) + operation = ASE_Operation.from_bytes(data) responses = [] logger.debug(f'*** ASCS Write {operation} ***') diff --git a/bumble/profiles/device_information_service.py b/bumble/profiles/device_information_service.py index 09bfd6c..ecb1c0f 100644 --- a/bumble/profiles/device_information_service.py +++ b/bumble/profiles/device_information_service.py @@ -19,8 +19,8 @@ import struct from typing import Optional, Tuple -from ..gatt_client import ProfileServiceProxy -from ..gatt import ( +from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy +from bumble.gatt import ( GATT_DEVICE_INFORMATION_SERVICE, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC, GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC, @@ -59,7 +59,7 @@ class DeviceInformationService(TemplateService): firmware_revision: Optional[str] = None, software_revision: Optional[str] = None, system_id: Optional[Tuple[int, int]] = None, # (OUI, Manufacturer ID) - ieee_regulatory_certification_data_list: Optional[bytes] = None + ieee_regulatory_certification_data_list: Optional[bytes] = None, # TODO: pnp_id ): characteristics = [ @@ -104,10 +104,19 @@ class DeviceInformationService(TemplateService): class DeviceInformationServiceProxy(ProfileServiceProxy): SERVICE_CLASS = DeviceInformationService - def __init__(self, service_proxy): + manufacturer_name: Optional[UTF8CharacteristicAdapter] + model_number: Optional[UTF8CharacteristicAdapter] + serial_number: Optional[UTF8CharacteristicAdapter] + hardware_revision: Optional[UTF8CharacteristicAdapter] + firmware_revision: Optional[UTF8CharacteristicAdapter] + software_revision: Optional[UTF8CharacteristicAdapter] + system_id: Optional[DelegatedCharacteristicAdapter] + ieee_regulatory_certification_data_list: Optional[CharacteristicProxy] + + def __init__(self, service_proxy: ServiceProxy): self.service_proxy = service_proxy - for (field, uuid) in ( + for field, uuid in ( ('manufacturer_name', GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC), ('model_number', GATT_MODEL_NUMBER_STRING_CHARACTERISTIC), ('serial_number', GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC), diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 6ca0f50..2d8a627 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging import asyncio +import collections import dataclasses import enum from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING @@ -54,6 +55,7 @@ logger = logging.getLogger(__name__) # fmt: off RFCOMM_PSM = 0x0003 +DEFAULT_RX_QUEUE_SIZE = 32 class FrameType(enum.IntEnum): SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first @@ -104,9 +106,11 @@ CRC_TABLE = bytes([ 0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF ]) -RFCOMM_DEFAULT_L2CAP_MTU = 2048 -RFCOMM_DEFAULT_WINDOW_SIZE = 7 -RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 +RFCOMM_DEFAULT_L2CAP_MTU = 2048 +RFCOMM_DEFAULT_INITIAL_CREDITS = 7 +RFCOMM_DEFAULT_MAX_CREDITS = 32 +RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2 +RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 @@ -363,12 +367,12 @@ class RFCOMM_MCC_PN: ack_timer: int max_frame_size: int max_retransmissions: int - window_size: int + initial_credits: int def __post_init__(self) -> None: - if self.window_size < 1 or self.window_size > 7: + if self.initial_credits < 1 or self.initial_credits > 7: logger.warning( - f'Error Recovery Window size {self.window_size} is out of range [1, 7].' + f'Initial credits {self.initial_credits} is out of range [1, 7].' ) @staticmethod @@ -380,7 +384,7 @@ class RFCOMM_MCC_PN: ack_timer=data[3], max_frame_size=data[4] | data[5] << 8, max_retransmissions=data[6], - window_size=data[7] & 0x07, + initial_credits=data[7] & 0x07, ) def __bytes__(self) -> bytes: @@ -394,7 +398,7 @@ class RFCOMM_MCC_PN: (self.max_frame_size >> 8) & 0xFF, self.max_retransmissions & 0xFF, # Only 3 bits are meaningful. - self.window_size & 0x07, + self.initial_credits & 0x07, ] ) @@ -444,39 +448,58 @@ class DLC(EventEmitter): DISCONNECTED = 0x04 RESET = 0x05 - connection_result: Optional[asyncio.Future] - sink: Optional[Callable[[bytes], None]] - def __init__( self, multiplexer: Multiplexer, dlci: int, - max_frame_size: int, - window_size: int, + tx_max_frame_size: int, + tx_initial_credits: int, + rx_max_frame_size: int, + rx_initial_credits: int, ) -> None: super().__init__() self.multiplexer = multiplexer self.dlci = dlci - self.max_frame_size = max_frame_size - self.window_size = window_size - self.rx_credits = window_size - self.rx_threshold = window_size // 2 - self.tx_credits = window_size + self.rx_max_frame_size = rx_max_frame_size + self.rx_initial_credits = rx_initial_credits + self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS + self.rx_credits = rx_initial_credits + self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD + self.tx_max_frame_size = tx_max_frame_size + self.tx_credits = tx_initial_credits self.tx_buffer = b'' self.state = DLC.State.INIT self.role = multiplexer.role self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 - self.sink = None - self.connection_result = None + self.connection_result: Optional[asyncio.Future] = None + self.disconnection_result: Optional[asyncio.Future] = None self.drained = asyncio.Event() self.drained.set() + # Queued packets when sink is not set. + self._enqueued_rx_packets: collections.deque[bytes] = collections.deque( + maxlen=DEFAULT_RX_QUEUE_SIZE + ) + self._sink: Optional[Callable[[bytes], None]] = None # Compute the MTU max_overhead = 4 + 1 # header with 2-byte length + fcs self.mtu = min( - max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead + tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead ) + @property + def sink(self) -> Optional[Callable[[bytes], None]]: + return self._sink + + @sink.setter + def sink(self, sink: Optional[Callable[[bytes], None]]) -> None: + self._sink = sink + # Dump queued packets to sink + if sink: + for packet in self._enqueued_rx_packets: + sink(packet) # pylint: disable=not-callable + self._enqueued_rx_packets.clear() + def change_state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}') self.state = new_state @@ -507,20 +530,35 @@ class DLC(EventEmitter): self.emit('open') def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: - if self.state != DLC.State.CONNECTING: + if self.state == DLC.State.CONNECTING: + # Exchange the modem status with the peer + msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) + mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) + logger.debug(f'>>> MCC MSC Command: {msc}') + self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) + + self.change_state(DLC.State.CONNECTED) + if self.connection_result: + self.connection_result.set_result(None) + self.connection_result = None + self.multiplexer.on_dlc_open_complete(self) + elif self.state == DLC.State.DISCONNECTING: + self.change_state(DLC.State.DISCONNECTED) + if self.disconnection_result: + self.disconnection_result.set_result(None) + self.disconnection_result = None + self.multiplexer.on_dlc_disconnection(self) + self.emit('close') + else: logger.warning( - color('!!! received SABM when not in CONNECTING state', 'red') + color( + ( + '!!! received UA frame when not in ' + 'CONNECTING or DISCONNECTING state' + ), + 'red', + ) ) - return - - # Exchange the modem status with the peer - msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) - mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) - logger.debug(f'>>> MCC MSC Command: {msc}') - self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) - - self.change_state(DLC.State.CONNECTED) - self.multiplexer.on_dlc_open_complete(self) def on_dm_frame(self, frame: RFCOMM_Frame) -> None: # TODO: handle all states @@ -549,8 +587,15 @@ class DLC(EventEmitter): f'rx_credits={self.rx_credits}: {data.hex()}' ) if data: - if self.sink: - self.sink(data) # pylint: disable=not-callable + if self._sink: + self._sink(data) # pylint: disable=not-callable + else: + self._enqueued_rx_packets.append(data) + if ( + self._enqueued_rx_packets.maxlen + and len(self._enqueued_rx_packets) >= self._enqueued_rx_packets.maxlen + ): + logger.warning(f'DLC [{self.dlci}] received packet queue is full') # Update the credits if self.rx_credits > 0: @@ -584,6 +629,19 @@ class DLC(EventEmitter): self.connection_result = asyncio.get_running_loop().create_future() self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci)) + async def disconnect(self) -> None: + if self.state != DLC.State.CONNECTED: + raise InvalidStateError('invalid state') + + self.disconnection_result = asyncio.get_running_loop().create_future() + self.change_state(DLC.State.DISCONNECTING) + self.send_frame( + RFCOMM_Frame.disc( + c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci + ) + ) + await self.disconnection_result + def accept(self) -> None: if self.state != DLC.State.INIT: raise InvalidStateError('invalid state') @@ -593,9 +651,9 @@ class DLC(EventEmitter): cl=0xE0, priority=7, ack_timer=0, - max_frame_size=self.max_frame_size, + max_frame_size=self.rx_max_frame_size, max_retransmissions=0, - window_size=self.window_size, + initial_credits=self.rx_initial_credits, ) mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn)) logger.debug(f'>>> PN Response: {pn}') @@ -603,8 +661,8 @@ class DLC(EventEmitter): self.change_state(DLC.State.CONNECTING) def rx_credits_needed(self) -> int: - if self.rx_credits <= self.rx_threshold: - return self.window_size - self.rx_credits + if self.rx_credits <= self.rx_credits_threshold: + return self.rx_max_credits - self.rx_credits return 0 @@ -664,6 +722,17 @@ class DLC(EventEmitter): async def drain(self) -> None: await self.drained.wait() + def abort(self) -> None: + logger.debug(f'aborting DLC: {self}') + if self.connection_result: + self.connection_result.cancel() + self.connection_result = None + if self.disconnection_result: + self.disconnection_result.cancel() + self.disconnection_result = None + self.change_state(DLC.State.RESET) + self.emit('close') + def __str__(self) -> str: return f'DLC(dlci={self.dlci},state={self.state.name})' @@ -686,7 +755,7 @@ class Multiplexer(EventEmitter): connection_result: Optional[asyncio.Future] disconnection_result: Optional[asyncio.Future] open_result: Optional[asyncio.Future] - acceptor: Optional[Callable[[int], bool]] + acceptor: Optional[Callable[[int], Optional[Tuple[int, int]]]] dlcs: Dict[int, DLC] def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: @@ -698,11 +767,15 @@ class Multiplexer(EventEmitter): self.connection_result = None self.disconnection_result = None self.open_result = None + self.open_pn: Optional[RFCOMM_MCC_PN] = None + self.open_rx_max_credits = 0 self.acceptor = None # Become a sink for the L2CAP channel l2cap_channel.sink = self.on_pdu + l2cap_channel.on('close', self.on_l2cap_channel_close) + def change_state(self, new_state: State) -> None: logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') self.state = new_state @@ -766,6 +839,7 @@ class Multiplexer(EventEmitter): 'rfcomm', ) ) + self.open_result = None else: logger.warning(f'unexpected state for DM: {self}') @@ -803,9 +877,16 @@ class Multiplexer(EventEmitter): else: if self.acceptor: channel_number = pn.dlci >> 1 - if self.acceptor(channel_number): + if dlc_params := self.acceptor(channel_number): # Create a new DLC - dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) + dlc = DLC( + self, + dlci=pn.dlci, + tx_max_frame_size=pn.max_frame_size, + tx_initial_credits=pn.initial_credits, + rx_max_frame_size=dlc_params[0], + rx_initial_credits=dlc_params[1], + ) self.dlcs[pn.dlci] = dlc # Re-emit the handshake completion event @@ -823,8 +904,17 @@ class Multiplexer(EventEmitter): # Response logger.debug(f'>>> PN Response: {pn}') if self.state == Multiplexer.State.OPENING: - dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) + assert self.open_pn + dlc = DLC( + self, + dlci=pn.dlci, + tx_max_frame_size=pn.max_frame_size, + tx_initial_credits=pn.initial_credits, + rx_max_frame_size=self.open_pn.max_frame_size, + rx_initial_credits=self.open_pn.initial_credits, + ) self.dlcs[pn.dlci] = dlc + self.open_pn = None dlc.connect() else: logger.warning('ignoring PN response') @@ -862,7 +952,7 @@ class Multiplexer(EventEmitter): self, channel: int, max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, - window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE, + initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, ) -> DLC: if self.state != Multiplexer.State.CONNECTED: if self.state == Multiplexer.State.OPENING: @@ -870,17 +960,19 @@ class Multiplexer(EventEmitter): raise InvalidStateError('not connected') - pn = RFCOMM_MCC_PN( + self.open_pn = RFCOMM_MCC_PN( dlci=channel << 1, cl=0xF0, priority=7, ack_timer=0, max_frame_size=max_frame_size, max_retransmissions=0, - window_size=window_size, + initial_credits=initial_credits, ) - mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=1, data=bytes(pn)) - logger.debug(f'>>> Sending MCC: {pn}') + mcc = RFCOMM_Frame.make_mcc( + mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn) + ) + logger.debug(f'>>> Sending MCC: {self.open_pn}') self.open_result = asyncio.get_running_loop().create_future() self.change_state(Multiplexer.State.OPENING) self.send_frame( @@ -890,15 +982,31 @@ class Multiplexer(EventEmitter): information=mcc, ) ) - result = await self.open_result - self.open_result = None - return result + return await self.open_result def on_dlc_open_complete(self, dlc: DLC) -> None: logger.debug(f'DLC [{dlc.dlci}] open complete') + self.change_state(Multiplexer.State.CONNECTED) + if self.open_result: self.open_result.set_result(dlc) + self.open_result = None + + def on_dlc_disconnection(self, dlc: DLC) -> None: + logger.debug(f'DLC [{dlc.dlci}] disconnection') + self.dlcs.pop(dlc.dlci, None) + + def on_l2cap_channel_close(self) -> None: + logger.debug('L2CAP channel closed, cleaning up') + if self.open_result: + self.open_result.cancel() + self.open_result = None + if self.disconnection_result: + self.disconnection_result.cancel() + self.disconnection_result = None + for dlc in self.dlcs.values(): + dlc.abort() def __str__(self) -> str: return f'Multiplexer(state={self.state.name})' @@ -957,15 +1065,13 @@ class Client: # ----------------------------------------------------------------------------- class Server(EventEmitter): - acceptors: Dict[int, Callable[[DLC], None]] - def __init__( self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU ) -> None: super().__init__() self.device = device - self.multiplexer = None - self.acceptors = {} + self.acceptors: Dict[int, Callable[[DLC], None]] = {} + self.dlc_configs: Dict[int, Tuple[int, int]] = {} # Register ourselves with the L2CAP channel manager self.l2cap_server = device.create_l2cap_server( @@ -973,7 +1079,13 @@ class Server(EventEmitter): handler=self.on_connection, ) - def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int: + def listen( + self, + acceptor: Callable[[DLC], None], + channel: int = 0, + max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, + initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, + ) -> int: if channel: if channel in self.acceptors: # Busy @@ -993,6 +1105,8 @@ class Server(EventEmitter): return 0 self.acceptors[channel] = acceptor + self.dlc_configs[channel] = (max_frame_size, initial_credits) + return channel def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: @@ -1010,15 +1124,14 @@ class Server(EventEmitter): # Notify self.emit('start', multiplexer) - def accept_dlc(self, channel_number: int) -> bool: - return channel_number in self.acceptors + def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]: + return self.dlc_configs.get(channel_number) def on_dlc(self, dlc: DLC) -> None: logger.debug(f'@@@ new DLC connected: {dlc}') # Let the acceptor know - acceptor = self.acceptors.get(dlc.dlci >> 1) - if acceptor: + if acceptor := self.acceptors.get(dlc.dlci >> 1): acceptor(dlc) def __enter__(self) -> Self: diff --git a/bumble/sdp.py b/bumble/sdp.py index 6423790..543c322 100644 --- a/bumble/sdp.py +++ b/bumble/sdp.py @@ -825,11 +825,13 @@ class Client: ) attribute_id_list = DataElement.sequence( [ - DataElement.unsigned_integer( - attribute_id[0], value_size=attribute_id[1] + ( + DataElement.unsigned_integer( + attribute_id[0], value_size=attribute_id[1] + ) + if isinstance(attribute_id, tuple) + else DataElement.unsigned_integer_16(attribute_id) ) - if isinstance(attribute_id, tuple) - else DataElement.unsigned_integer_16(attribute_id) for attribute_id in attribute_ids ] ) @@ -881,11 +883,13 @@ class Client: attribute_id_list = DataElement.sequence( [ - DataElement.unsigned_integer( - attribute_id[0], value_size=attribute_id[1] + ( + DataElement.unsigned_integer( + attribute_id[0], value_size=attribute_id[1] + ) + if isinstance(attribute_id, tuple) + else DataElement.unsigned_integer_16(attribute_id) ) - if isinstance(attribute_id, tuple) - else DataElement.unsigned_integer_16(attribute_id) for attribute_id in attribute_ids ] ) @@ -993,7 +997,7 @@ class Server: try: handler(sdp_pdu) except Exception as error: - logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') + logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') self.send_response( SDP_ErrorResponse( transaction_id=sdp_pdu.transaction_id, diff --git a/bumble/smp.py b/bumble/smp.py index 73fd439..3a88a31 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -737,9 +737,9 @@ class Session: # Create a future that can be used to wait for the session to complete if self.is_initiator: - self.pairing_result: Optional[ - asyncio.Future[None] - ] = asyncio.get_running_loop().create_future() + self.pairing_result: Optional[asyncio.Future[None]] = ( + asyncio.get_running_loop().create_future() + ) else: self.pairing_result = None diff --git a/bumble/transport/common.py b/bumble/transport/common.py index ef35c9f..ffbf7b0 100644 --- a/bumble/transport/common.py +++ b/bumble/transport/common.py @@ -59,15 +59,13 @@ class TransportLostError(Exception): # Typing Protocols # ----------------------------------------------------------------------------- class TransportSink(Protocol): - def on_packet(self, packet: bytes) -> None: - ... + def on_packet(self, packet: bytes) -> None: ... class TransportSource(Protocol): terminated: asyncio.Future[None] - def set_packet_sink(self, sink: TransportSink) -> None: - ... + def set_packet_sink(self, sink: TransportSink) -> None: ... # ----------------------------------------------------------------------------- @@ -427,6 +425,10 @@ class SnoopingTransport(Transport): class Source: sink: TransportSink + @property + def metadata(self) -> dict[str, Any]: + return getattr(self.source, 'metadata', {}) + def __init__(self, source: TransportSource, snooper: Snooper): self.source = source self.snooper = snooper diff --git a/bumble/transport/pyusb.py b/bumble/transport/pyusb.py index 61ce17e..68a1dfd 100644 --- a/bumble/transport/pyusb.py +++ b/bumble/transport/pyusb.py @@ -23,12 +23,25 @@ import time import usb.core import usb.util +from typing import Optional +from usb.core import Device as UsbDevice +from usb.core import USBError +from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER +from usb.legacy import REQ_SET_FEATURE, REQ_CLEAR_FEATURE, CLASS_HUB + from .common import Transport, ParserSource from .. import hci from ..colors import color # ----------------------------------------------------------------------------- +# Constant +# ----------------------------------------------------------------------------- +USB_PORT_FEATURE_POWER = 8 +POWER_CYCLE_DELAY = 1 +RESET_DELAY = 3 + +# ----------------------------------------------------------------------------- # Logging # ----------------------------------------------------------------------------- logger = logging.getLogger(__name__) @@ -214,6 +227,10 @@ async def open_pyusb_transport(spec: str) -> Transport: usb_find = libusb_package.find # Find the device according to the spec moniker + power_cycle = False + if spec.startswith('!'): + power_cycle = True + spec = spec[1:] if ':' in spec: vendor_id, product_id = spec.split(':') device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16)) @@ -245,6 +262,14 @@ async def open_pyusb_transport(spec: str) -> Transport: raise ValueError('device not found') logger.debug(f'USB Device: {device}') + # Power Cycle the device + if power_cycle: + try: + device = await _power_cycle(device) # type: ignore + except Exception as e: + logging.debug(e) + logging.info(f"Unable to power cycle {hex(device.idVendor)} {hex(device.idProduct)}") # type: ignore + # Collect the metadata device_metadata = {'vendor_id': device.idVendor, 'product_id': device.idProduct} @@ -308,3 +333,73 @@ async def open_pyusb_transport(spec: str) -> Transport: packet_sink.start() return UsbTransport(device, packet_source, packet_sink) + + +async def _power_cycle(device: UsbDevice) -> UsbDevice: + """ + For devices connected to compatible USB hubs: Performs a power cycle on a given USB device. + This involves temporarily disabling its port on the hub and then re-enabling it. + """ + device_path = f'{device.bus}-{".".join(map(str, device.port_numbers))}' # type: ignore + hub = _find_hub_by_device_path(device_path) + + if hub: + try: + device_port = device.port_numbers[-1] # type: ignore + _set_port_status(hub, device_port, False) + await asyncio.sleep(POWER_CYCLE_DELAY) + _set_port_status(hub, device_port, True) + await asyncio.sleep(RESET_DELAY) + + # Device needs to be find again otherwise it will appear as disconnected + return usb.core.find(idVendor=device.idVendor, idProduct=device.idProduct) # type: ignore + except USBError as e: + logger.error(f"Adjustment needed: Please revise the udev rule for device {hex(device.idVendor)}:{hex(device.idProduct)} for proper recognition.") # type: ignore + logger.error(e) + + return device + + +def _set_port_status(device: UsbDevice, port: int, on: bool): + """Sets the power status of a specific port on a USB hub.""" + device.ctrl_transfer( + bmRequestType=CTRL_TYPE_CLASS | CTRL_RECIPIENT_OTHER, + bRequest=REQ_SET_FEATURE if on else REQ_CLEAR_FEATURE, + wIndex=port, + wValue=USB_PORT_FEATURE_POWER, + ) + + +def _find_device_by_path(sys_path: str) -> Optional[UsbDevice]: + """Finds a USB device based on its system path.""" + bus_num, *port_parts = sys_path.split('-') + ports = [int(port) for port in port_parts[0].split('.')] + devices = usb.core.find(find_all=True, bus=int(bus_num)) + if devices: + for device in devices: + if device.bus == int(bus_num) and list(device.port_numbers) == ports: # type: ignore + return device + + return None + + +def _find_hub_by_device_path(sys_path: str) -> Optional[UsbDevice]: + """Finds the USB hub associated with a specific device path.""" + hub_sys_path = sys_path.rsplit('.', 1)[0] + hub_device = _find_device_by_path(hub_sys_path) + + if hub_device is None: + return None + else: + return hub_device if _is_hub(hub_device) else None + + +def _is_hub(device: UsbDevice) -> bool: + """Checks if a USB device is a hub""" + if device.bDeviceClass == CLASS_HUB: # type: ignore + return True + for config in device: + for interface in config: + if interface.bInterfaceClass == CLASS_HUB: # type: ignore + return True + return False diff --git a/bumble/transport/tcp_server.py b/bumble/transport/tcp_server.py index 77d0304..0a648fd 100644 --- a/bumble/transport/tcp_server.py +++ b/bumble/transport/tcp_server.py @@ -18,6 +18,7 @@ from __future__ import annotations import asyncio import logging +import socket from .common import Transport, StreamPacketSource @@ -28,6 +29,13 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- + + +# A pass-through function to ease mock testing. +async def _create_server(*args, **kw_args): + await asyncio.get_running_loop().create_server(*args, **kw_args) + + async def open_tcp_server_transport(spec: str) -> Transport: ''' Open a TCP server transport. @@ -38,7 +46,22 @@ async def open_tcp_server_transport(spec: str) -> Transport: Example: _:9001 ''' + local_host, local_port = spec.split(':') + return await _open_tcp_server_transport_impl( + host=local_host if local_host != '_' else None, port=int(local_port) + ) + +async def open_tcp_server_transport_with_socket(sock: socket.socket) -> Transport: + ''' + Open a TCP server transport with an existing socket. + + One reason to use this variant is to let python pick an unused port. + ''' + return await _open_tcp_server_transport_impl(sock=sock) + + +async def _open_tcp_server_transport_impl(**kwargs) -> Transport: class TcpServerTransport(Transport): async def close(self): await super().close() @@ -77,13 +100,10 @@ async def open_tcp_server_transport(spec: str) -> Transport: else: logger.debug('no client, dropping packet') - local_host, local_port = spec.split(':') packet_source = StreamPacketSource() packet_sink = TcpServerPacketSink() - await asyncio.get_running_loop().create_server( - lambda: TcpServerProtocol(packet_source, packet_sink), - host=local_host if local_host != '_' else None, - port=int(local_port), + await _create_server( + lambda: TcpServerProtocol(packet_source, packet_sink), **kwargs ) return TcpServerTransport(packet_source, packet_sink) diff --git a/bumble/transport/usb.py b/bumble/transport/usb.py index 6479016..69e9649 100644 --- a/bumble/transport/usb.py +++ b/bumble/transport/usb.py @@ -449,7 +449,7 @@ async def open_usb_transport(spec: str) -> Transport: # Look for the first interface with the right class and endpoints def find_endpoints(device): # pylint: disable-next=too-many-nested-blocks - for (configuration_index, configuration) in enumerate(device): + for configuration_index, configuration in enumerate(device): interface = None for interface in configuration: setting = None diff --git a/bumble/utils.py b/bumble/utils.py index e6aae4d..4c9407f 100644 --- a/bumble/utils.py +++ b/bumble/utils.py @@ -117,12 +117,12 @@ class EventWatcher: self.handlers = [] @overload - def on(self, emitter: EventEmitter, event: str) -> Callable[[_Handler], _Handler]: - ... + def on( + self, emitter: EventEmitter, event: str + ) -> Callable[[_Handler], _Handler]: ... @overload - def on(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler: - ... + def on(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler: ... def on( self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None @@ -144,12 +144,14 @@ class EventWatcher: return wrapper if handler is None else wrapper(handler) @overload - def once(self, emitter: EventEmitter, event: str) -> Callable[[_Handler], _Handler]: - ... + def once( + self, emitter: EventEmitter, event: str + ) -> Callable[[_Handler], _Handler]: ... @overload - def once(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler: - ... + def once( + self, emitter: EventEmitter, event: str, handler: _Handler + ) -> _Handler: ... def once( self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None diff --git a/examples/async_runner.py b/examples/async_runner.py index 9e71899..b29a80f 100644 --- a/examples/async_runner.py +++ b/examples/async_runner.py @@ -25,6 +25,7 @@ from bumble.utils import AsyncRunner my_work_queue1 = AsyncRunner.WorkQueue() my_work_queue2 = AsyncRunner.WorkQueue(create_task=False) + # ----------------------------------------------------------------------------- @AsyncRunner.run_in_task() async def func1(x, y): @@ -60,7 +61,7 @@ async def func4(x, y): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: print("MAIN: start, loop=", asyncio.get_running_loop()) print("MAIN: invoke func1") func1(1, 2) diff --git a/examples/battery_client.py b/examples/battery_client.py index 3cf11b4..e9105db 100644 --- a/examples/battery_client.py +++ b/examples/battery_client.py @@ -21,23 +21,29 @@ import os import logging from bumble.colors import color from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport from bumble.profiles.battery_service import BatteryServiceProxy # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: battery_client.py <transport-spec> <bluetooth-address>') print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/battery_server.py b/examples/battery_server.py index b7f941f..f509954 100644 --- a/examples/battery_server.py +++ b/examples/battery_server.py @@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python battery_server.py <device-config> <transport-spec>') print('example: python battery_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Add a Battery Service to the GATT sever battery_service = BatteryService(lambda _: random.randint(0, 100)) diff --git a/examples/device_information_client.py b/examples/device_information_client.py index 416aa2f..f6d51ba 100644 --- a/examples/device_information_client.py +++ b/examples/device_information_client.py @@ -21,12 +21,13 @@ import os import logging from bumble.colors import color from bumble.device import Device, Peer +from bumble.hci import Address from bumble.profiles.device_information_service import DeviceInformationServiceProxy from bumble.transport import open_transport # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print( 'Usage: device_information_client.py <transport-spec> <bluetooth-address>' @@ -35,11 +36,16 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/device_information_server.py b/examples/device_information_server.py index d437cae..92474bc 100644 --- a/examples/device_information_server.py +++ b/examples/device_information_server.py @@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python device_info_server.py <device-config> <transport-spec>') print('example: python device_info_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Add a Device Information Service to the GATT sever device_information_service = DeviceInformationService( @@ -64,7 +66,7 @@ async def main(): # Go! await device.power_on() await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/heart_rate_client.py b/examples/heart_rate_client.py index ecfcffb..ea7bc36 100644 --- a/examples/heart_rate_client.py +++ b/examples/heart_rate_client.py @@ -21,23 +21,29 @@ import os import logging from bumble.colors import color from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport from bumble.profiles.heart_rate_service import HeartRateServiceProxy # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>') print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') - async with await open_transport(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport(sys.argv[1]) as hci_transport: print('<<< connected') # Create and start a device - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) await device.power_on() # Connect to the peer diff --git a/examples/heart_rate_server.py b/examples/heart_rate_server.py index fad809f..e40d5db 100644 --- a/examples/heart_rate_server.py +++ b/examples/heart_rate_server.py @@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: python heart_rate_server.py <device-config> <transport-spec>') print('example: python heart_rate_server.py device1.json usb:0') return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Keep track of accumulated expended energy energy_start_time = time.time() diff --git a/examples/hfp_gateway.html b/examples/hfp_gateway.html new file mode 100644 index 0000000..1559c43 --- /dev/null +++ b/examples/hfp_gateway.html @@ -0,0 +1,350 @@ +<html data-bs-theme="dark"> + +<head> + <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet" + integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous"> + <script src="https://unpkg.com/pcm-player"></script> +</head> + +<body> + <nav class="navbar navbar-dark bg-primary"> + <div class="container"> + <span class="navbar-brand mb-0 h1">Bumble HFP Audio Gateway</span> + </div> + </nav> + <br> + + <div class="container"> + + <label class="form-label">Send AT Response</label> + <div class="input-group mb-3"> + <input type="text" class="form-control" placeholder="AT Response" aria-label="AT response" id="at_response"> + <button class="btn btn-primary" type="button" + onclick="send_at_response(document.getElementById('at_response').value)">Send</button> + </div> + + <div class="row"> + <div class="col-3"> + <label class="form-label">Speaker Volume</label> + <div class="input-group mb-3 col-auto"> + <input type="text" class="form-control" placeholder="0 - 15" aria-label="Speaker Volume" + id="speaker_volume"> + <button class="btn btn-primary" type="button" + onclick="send_at_response(`+VGS: ${document.getElementById('speaker_volume').value}`)">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">Mic Volume</label> + <div class="input-group mb-3 col-auto"> + <input type="text" class="form-control" placeholder="0 - 15" aria-label="Mic Volume" + id="mic_volume"> + <button class="btn btn-primary" type="button" + onclick="send_at_response(`+VGM: ${document.getElementById('mic_volume').value}`)">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">Browser Gain</label> + <input type="range" class="form-range" id="browser-gain" min="0" max="2" value="1" step="0.1" onchange="setGain()"> + </div> + </div> + + <div class="row"> + <div class="col-auto"> + <div class="input-group mb-3"> + <span class="input-group-text">Codec</span> + + <select class="form-select" id="codec"> + <option selected value="1">CVSD</option> + <option value="2">MSBC</option> + </select> + </div> + </div> + + <div class="col-auto"> + <button class="btn btn-primary" onclick="negotiate_codec()">Negotiate Codec</button> + </div> + <div class="col-auto"> + <button class="btn btn-primary" onclick="connect_sco()">Connect SCO</button> + </div> + <div class="col-auto"> + <button class="btn btn-primary" onclick="disconnect_sco()">Disconnect SCO</button> + </div> + <div class="col-auto"> + <button class="btn btn-danger" onclick="connectAudio()">Connect Audio</button> + </div> + </div> + + <hr> + + <div class="row"> + <h4>AG Indicators</h2> + <div class="col-3"> + <label class="form-label">call</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="call"> + <option selected value="0">Inactive</option> + <option value="1">Active</option> + </select> + <button class="btn btn-primary" type="button" onclick="update_ag_indicator('call')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">callsetup</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="callsetup"> + <option selected value="0">Idle</option> + <option value="1">Incoming</option> + <option value="2">Outgoing</option> + <option value="3">Remote Alerted</option> + </select> + <button class="btn btn-primary" type="button" + onclick="update_ag_indicator('callsetup')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">callheld</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="callsetup"> + <option selected value="0">0</option> + <option value="1">1</option> + <option value="2">2</option> + </select> + <button class="btn btn-primary" type="button" + onclick="update_ag_indicator('callheld')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">signal</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="signal"> + <option selected value="0">0</option> + <option value="1">1</option> + <option value="2">2</option> + <option value="3">3</option> + <option value="4">4</option> + <option value="5">5</option> + </select> + <button class="btn btn-primary" type="button" + onclick="update_ag_indicator('signal')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">roam</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="roam"> + <option selected value="0">0</option> + <option value="1">1</option> + </select> + <button class="btn btn-primary" type="button" onclick="update_ag_indicator('roam')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">battchg</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="battchg"> + <option selected value="0">0</option> + <option value="1">1</option> + <option value="2">2</option> + <option value="3">3</option> + <option value="4">4</option> + <option value="5">5</option> + </select> + <button class="btn btn-primary" type="button" + onclick="update_ag_indicator('battchg')">Set</button> + </div> + </div> + <div class="col-3"> + <label class="form-label">service</label> + <div class="input-group mb-3 col-auto"> + <select class="form-select" id="service"> + <option selected value="0">0</option> + <option value="1">1</option> + </select> + <button class="btn btn-primary" type="button" + onclick="update_ag_indicator('service')">Set</button> + </div> + </div> + </div> + + <hr> + + <button class="btn btn-primary" onclick="send_at_response('+BVRA: 1')">Start Voice Assistant</button> + <button class="btn btn-primary" onclick="send_at_response('+BVRA: 0')">Stop Voice Assistant</button> + + <hr> + + + <h4>Calls</h4> + <div id="call-lists"> + <template id="call-template"> + <div class="row call-row"> + <div class="input-group mb-3"> + <label class="input-group-text">Index</label> + <input class="form-control call-index" value="1"> + + <label class="input-group-text">Number</label> + <input class="form-control call-number"> + + <label class="input-group-text">Direction</label> + <select class="form-select call-direction"> + <option selected value="0">Originated</option> + <option value="1">Terminated</option> + </select> + + <label class="input-group-text">Status</label> + <select class="form-select call-status"> + <option value="0">ACTIVE</option> + <option value="1">HELD</option> + <option value="2">DIALING</option> + <option value="3">ALERTING</option> + <option value="4">INCOMING</option> + <option value="5">WAITING</option> + </select> + <button class="btn btn-primary call-remover">❌</button> + </div> + </div> + </template> + </div> + + <button class="btn btn-primary" onclick="add_call()">➕ Add Call</button> + <button class="btn btn-primary" onclick="update_calls()">🗘 Update Calls</button> + + <hr> + + <div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2"> + <h3>Log</h3> + <code id="log" style="white-space: pre-line;"></code> + </div> + </div> + + + <script> + let atResponseInput = document.getElementById("at_response") + let gainInput = document.getElementById('browser-gain') + let log = document.getElementById("log") + let socket = new WebSocket('ws://localhost:8888'); + let sampleRate = 0; + let player; + + socket.binaryType = "arraybuffer"; + socket.onopen = _ => { + log.textContent += 'SOCKET OPEN\n' + } + socket.onclose = _ => { + log.textContent += 'SOCKET CLOSED\n' + } + socket.onerror = (error) => { + log.textContent += 'SOCKET ERROR\n' + console.log(`ERROR: ${error}`) + } + socket.onmessage = function (message) { + if (typeof message.data === 'string' || message.data instanceof String) { + log.textContent += `<-- ${event.data}\n` + const jsonMessage = JSON.parse(event.data) + + if (jsonMessage.type == 'speaker_volume') { + document.getElementById('speaker_volume').value = jsonMessage.level; + } else if (jsonMessage.type == 'microphone_volume') { + document.getElementById('microphone_volume').value = jsonMessage.level; + } else if (jsonMessage.type == 'sco_state_change') { + sampleRate = jsonMessage.sample_rate; + console.log(sampleRate); + if (player != null) { + player = new PCMPlayer({ + inputCodec: 'Int16', + channels: 1, + sampleRate: sampleRate, + flushTime: 7.5, + }); + player.volume(gainInput.value); + } + } + } else { + // BINARY audio data. + if (player == null) return; + player.feed(message.data); + } + }; + + function send(message) { + if (socket && socket.readyState == WebSocket.OPEN) { + let jsonMessage = JSON.stringify(message) + log.textContent += `--> ${jsonMessage}\n` + socket.send(jsonMessage) + } else { + log.textContent += 'NOT CONNECTED\n' + } + } + + function send_at_response(response) { + send({ type: 'at_response', response: response }) + } + + function update_ag_indicator(indicator) { + const value = document.getElementById(indicator).value + send({ type: 'ag_indicator', indicator: indicator, value: value }) + } + + function connect_sco() { + send({ type: 'connect_sco' }) + } + + function negotiate_codec() { + const codec = document.getElementById('codec').value + send({ type: 'negotiate_codec', codec: codec }) + } + + function disconnect_sco() { + send({ type: 'disconnect_sco' }) + } + + function add_call() { + let callLists = document.getElementById('call-lists'); + let template = document.getElementById('call-template'); + + let newNode = document.importNode(template.content, true); + newNode.querySelector('.call-remover').onclick = function (event) { + event.target.closest('.call-row').remove(); + } + callLists.appendChild(newNode); + } + + function update_calls() { + let callLists = document.getElementById('call-lists'); + send({ + type: 'update_calls', + calls: Array.from( + callLists.querySelectorAll('.call-row')).map( + function (element) { + return { + index: element.querySelector('.call-index').value, + number: element.querySelector('.call-number').value, + direction: element.querySelector('.call-direction').value, + status: element.querySelector('.call-status').value, + } + } + ), + } + ) + } + + function connectAudio() { + player = new PCMPlayer({ + inputCodec: 'Int16', + channels: 1, + sampleRate: sampleRate, + flushTime: 7.5, + }); + player.volume(gainInput.value); + } + + function setGain() { + if (player != null) { + player.volume(gainInput.value); + } + } + </script> + </div> +</body> + +</html>
\ No newline at end of file diff --git a/examples/hfp_gateway.json b/examples/hfp_gateway.json index 5e3d72b..67bb278 100644 --- a/examples/hfp_gateway.json +++ b/examples/hfp_gateway.json @@ -1,4 +1,5 @@ { "name": "Bumble Phone", - "class_of_device": 6291980 + "class_of_device": 6291980, + "keystore": "JsonKeyStore" } diff --git a/examples/hfp_handsfree.html b/examples/hfp_handsfree.html index a86fc4a..30be04e 100644 --- a/examples/hfp_handsfree.html +++ b/examples/hfp_handsfree.html @@ -1,79 +1,132 @@ -<html> - <head> - <style> -* { - font-family: sans-serif; -} - -label { - display: block; -} - -input, label { - margin: .4rem 0; -} - </style> - </head> - <body> - Server Port <input id="port" type="text" value="8989"></input> <button onclick="connect()">Connect</button><br> - AT Command <input type="text" id="at_command" required size="10"> <button onclick="send_at_command()">Send</button><br> - Dial Phone Number <input type="text" id="dial_number" required size="10"> <button onclick="dial()">Dial</button><br> - <button onclick="answer()">Answer</button> - <button onclick="hangup()">Hang Up</button> - <button onclick="start_voice_assistant()">Start Voice Assistant</button> - <button onclick="stop_voice_assistant()">Stop Voice Assistant</button> +<html data-bs-theme="dark"> + +<head> + <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet" + integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous"> +</head> + +<body> + <nav class="navbar navbar-dark bg-primary"> + <div class="container"> + <span class="navbar-brand mb-0 h1">Bumble Handsfree</span> + </div> + </nav> + <br> + + <div class="container"> + + <label class="form-label">Server Port</label> + <div class="input-group mb-3"> + <input type="text" class="form-control" aria-label="Port Number" value="8989" id="port"> + <button class="btn btn-primary" type="button" onclick="connect()">Connect</button> + </div> + + <label class="form-label">Dial Phone Number</label> + <div class="input-group mb-3"> + <input type="text" class="form-control" placeholder="Phone Number" aria-label="Phone Number" + id="dial_number"> + <button class="btn btn-primary" type="button" + onclick="send_at_command(`ATD${dialNumberInput.value}`)">Dial</button> + </div> + + <label class="form-label">Send AT Command</label> + <div class="input-group mb-3"> + <input type="text" class="form-control" placeholder="AT Command" aria-label="AT command" id="at_command"> + <button class="btn btn-primary" type="button" + onclick="send_at_command(document.getElementById('at_command').value)">Send</button> + </div> + + <div class="row"> + <div class="col-auto"> + <label class="form-label">Battery Level</label> + <div class="input-group mb-3"> + <input type="text" class="form-control" placeholder="0 - 100" aria-label="Battery Level" + id="battery_level"> + <button class="btn btn-primary" type="button" + onclick="send_at_command(`AT+BIEV=2,${document.getElementById('battery_level').value}`)">Set</button> + </div> + </div> + <div class="col-auto"> + <label class="form-label">Speaker Volume</label> + <div class="input-group mb-3 col-auto"> + <input type="text" class="form-control" placeholder="0 - 15" aria-label="Speaker Volume" + id="speaker_volume"> + <button class="btn btn-primary" type="button" + onclick="send_at_command(`AT+VGS=${document.getElementById('speaker_volume').value}`)">Set</button> + </div> + </div> + <div class="col-auto"> + <label class="form-label">Mic Volume</label> + <div class="input-group mb-3 col-auto"> + <input type="text" class="form-control" placeholder="0 - 15" aria-label="Mic Volume" + id="mic_volume"> + <button class="btn btn-primary" type="button" + onclick="send_at_command(`AT+VGM=${document.getElementById('mic_volume').value}`)">Set</button> + </div> + </div> + </div> + + <button class="btn btn-primary" onclick="send_at_command('ATA')">Answer</button> + <button class="btn btn-primary" onclick="send_at_command('AT+CHUP')">Hang Up</button> + <button class="btn btn-primary" onclick="send_at_command('AT+BLDN')">Redial</button> + <button class="btn btn-primary" onclick="send({ type: 'query_call'})">Get Call Status</button> + + <br><br> + + <button class="btn btn-primary" onclick="send_at_command('AT+BVRA=1')">Start Voice Assistant</button> + <button class="btn btn-primary" onclick="send_at_command('AT+BVRA=0')">Stop Voice Assistant</button> + <hr> - <div id="socketState"></div> - <script> + + <div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2"> + <h3>Log</h3> + <code id="log" style="white-space: pre-line;"></code> + </div> + </div> + + + <script> let portInput = document.getElementById("port") let atCommandInput = document.getElementById("at_command") - let dialNumberInput = document.getElementById("dial_number") - let socketState = document.getElementById("socketState") + let log = document.getElementById("log") let socket function connect() { socket = new WebSocket(`ws://localhost:${portInput.value}`); socket.onopen = _ => { - socketState.innerText = 'OPEN' + log.textContent += 'OPEN\n' } socket.onclose = _ => { - socketState.innerText = 'CLOSED' + log.textContent += 'CLOSED\n' } socket.onerror = (error) => { - socketState.innerText = 'ERROR' + log.textContent += 'ERROR\n' console.log(`ERROR: ${error}`) } + socket.onmessage = (event) => { + log.textContent += `<-- ${event.data}\n` + let volume_state = JSON.parse(event.data) + volumeSetting.value = volume_state.volume_setting + changeCounter.value = volume_state.change_counter + muted.checked = volume_state.muted ? true : false + } } function send(message) { if (socket && socket.readyState == WebSocket.OPEN) { - socket.send(JSON.stringify(message)) + let jsonMessage = JSON.stringify(message) + log.textContent += `--> ${jsonMessage}\n` + socket.send(jsonMessage) + } else { + log.textContent += 'NOT CONNECTED\n' } } - function send_at_command() { - send({ type:'at_command', command: atCommandInput.value }) - } - - function answer() { - send({ type:'at_command', command: 'ATA' }) - } - - function hangup() { - send({ type:'at_command', command: 'AT+CHUP' }) - } - - function dial() { - send({ type:'at_command', command: `ATD${dialNumberInput.value}` }) + function send_at_command(command) { + send({ type: 'at_command', 'command': command }) } + </script> + </div> +</body> - function start_voice_assistant() { - send(({ type:'at_command', command: 'AT+BVRA=1' })) - } - - function stop_voice_assistant() { - send(({ type:'at_command', command: 'AT+BVRA=0' })) - } -</script> - </body> -</html> +</html>
\ No newline at end of file diff --git a/examples/keyboard.py b/examples/keyboard.py index 314a805..f2afe18 100644 --- a/examples/keyboard.py +++ b/examples/keyboard.py @@ -416,7 +416,7 @@ async def keyboard_device(device, command): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: python keyboard.py <device-config> <transport-spec> <command>' @@ -434,9 +434,11 @@ async def main(): ) return - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) command = sys.argv[3] if command == 'connect': diff --git a/examples/run_a2dp_info.py b/examples/run_a2dp_info.py index 3a35695..e05c87e 100644 --- a/examples/run_a2dp_info.py +++ b/examples/run_a2dp_info.py @@ -139,18 +139,20 @@ async def find_a2dp_service(connection): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>') print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Start the controller @@ -187,7 +189,7 @@ async def main(): client = await AVDTP_Protocol.connect(connection, avdtp_version) # Discover all endpoints on the remote device - endpoints = await client.discover_remote_endpoints() + endpoints = list(await client.discover_remote_endpoints()) print(f'@@@ Found {len(endpoints)} endpoints') for endpoint in endpoints: print('@@@', endpoint) diff --git a/examples/run_a2dp_sink.py b/examples/run_a2dp_sink.py index 61bdce3..ca1af84 100644 --- a/examples/run_a2dp_sink.py +++ b/examples/run_a2dp_sink.py @@ -19,6 +19,7 @@ import asyncio import sys import os import logging +from typing import Any, Dict from bumble.device import Device from bumble.transport import open_transport_or_link @@ -41,7 +42,7 @@ from bumble.a2dp import ( SbcMediaCodecInformation, ) -Context = {'output': None} +Context: Dict[Any, Any] = {'output': None} # ----------------------------------------------------------------------------- @@ -104,7 +105,7 @@ def on_rtp_packet(packet): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> ' @@ -114,14 +115,16 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') with open(sys.argv[3], 'wb') as sbc_file: Context['output'] = sbc_file # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the sink service @@ -162,7 +165,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_a2dp_source.py b/examples/run_a2dp_source.py index 4645229..a1f955b 100644 --- a/examples/run_a2dp_source.py +++ b/examples/run_a2dp_source.py @@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> ' @@ -126,11 +126,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the SRC service @@ -186,7 +188,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_advertiser.py b/examples/run_advertiser.py index fb59426..4c67d10 100644 --- a/examples/run_advertiser.py +++ b/examples/run_advertiser.py @@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]' @@ -50,10 +50,12 @@ async def main(): target = None print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) if advertising_type.is_scannable: device.scan_response_data = bytes( @@ -66,7 +68,7 @@ async def main(): await device.power_on() await device.start_advertising(advertising_type=advertising_type, target=target) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_asha_sink.py b/examples/run_asha_sink.py index 2d6f0d5..105eb75 100644 --- a/examples/run_asha_sink.py +++ b/examples/run_asha_sink.py @@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID( # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 4: print( 'Usage: python run_asha_sink.py <device-config> <transport-spec> ' @@ -60,8 +60,10 @@ async def main(): audio_out = open(sys.argv[3], 'wb') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) # Handler for audio control commands def on_audio_control_point_write(_connection, value): @@ -197,7 +199,7 @@ async def main(): await device.power_on() await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_avrcp.py b/examples/run_avrcp.py index 4bb4143..793e000 100644 --- a/examples/run_avrcp.py +++ b/examples/run_avrcp.py @@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_avrcp_controller.py <device-config> <transport-spec> ' @@ -341,11 +341,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Setup the SDP to expose the sink service diff --git a/examples/run_classic_connect.py b/examples/run_classic_connect.py index 0acaedd..362e6b8 100644 --- a/examples/run_classic_connect.py +++ b/examples/run_classic_connect.py @@ -32,7 +32,7 @@ from bumble.sdp import ( # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_classic_connect.py <device-config> <transport-spec> ' @@ -42,11 +42,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True device.le_enabled = False await device.power_on() diff --git a/examples/run_classic_discoverable.py b/examples/run_classic_discoverable.py index 076a9ec..52f47fc 100644 --- a/examples/run_classic_discoverable.py +++ b/examples/run_classic_discoverable.py @@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = { # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_classic_discoverable.py <device-config> <transport-spec>') print('example: run_classic_discoverable.py classic1.json usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True device.sdp_service_records = SDP_SERVICE_RECORDS await device.power_on() @@ -111,7 +113,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_classic_discovery.py b/examples/run_classic_discovery.py index 569c8b3..af35bb7 100644 --- a/examples/run_classic_discovery.py +++ b/examples/run_classic_discovery.py @@ -20,8 +20,8 @@ import sys import os import logging from bumble.colors import color - from bumble.device import Device +from bumble.hci import Address from bumble.transport import open_transport_or_link from bumble.core import DeviceClass @@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 2: print('Usage: run_classic_discovery.py <transport-spec>') print('example: run_classic_discovery.py usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) device.listener = DiscoveryListener() await device.power_on() await device.start_discovery() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_connect_and_encrypt.py b/examples/run_connect_and_encrypt.py index b541a0e..3b9d180 100644 --- a/examples/run_connect_and_encrypt.py +++ b/examples/run_connect_and_encrypt.py @@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> ' @@ -37,11 +37,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) await device.power_on() # Connect to the peer @@ -56,7 +58,7 @@ async def main(): print(f'!!! Encryption failed: {error}') return - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_controller.py b/examples/run_controller.py index 596ac8b..05dedfc 100644 --- a/examples/run_controller.py +++ b/examples/run_controller.py @@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 4: print( 'Usage: run_controller.py <controller-address> <device-config> ' @@ -49,7 +49,7 @@ async def main(): return print('>>> connecting to HCI...') - async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[3]) as hci_transport: print('>>> connected') # Create a local link @@ -57,7 +57,10 @@ async def main(): # Create a first controller using the packet source/sink as its host interface controller1 = Controller( - 'C1', host_source=hci_source, host_sink=hci_sink, link=link + 'C1', + host_source=hci_transport.source, + host_sink=hci_transport.sink, + link=link, ) controller1.random_address = sys.argv[1] @@ -98,7 +101,7 @@ async def main(): await device.start_advertising() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_controller_with_scanner.py b/examples/run_controller_with_scanner.py index 9603cff..9e935a9 100644 --- a/examples/run_controller_with_scanner.py +++ b/examples/run_controller_with_scanner.py @@ -20,9 +20,9 @@ import asyncio import sys import os from bumble.colors import color - from bumble.device import Device from bumble.controller import Controller +from bumble.hci import Address from bumble.link import LocalLink from bumble.transport import open_transport_or_link @@ -45,14 +45,14 @@ class ScannerListener(Device.Listener): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 2: print('Usage: run_controller.py <transport-spec>') print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000') return print('>>> connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('>>> connected') # Create a local link @@ -60,22 +60,25 @@ async def main(): # Create a first controller using the packet source/sink as its host interface controller1 = Controller( - 'C1', host_source=hci_source, host_sink=hci_sink, link=link + 'C1', + host_source=hci_transport.source, + host_sink=hci_transport.sink, + link=link, + public_address='E0:E1:E2:E3:E4:E5', ) - controller1.address = 'E0:E1:E2:E3:E4:E5' # Create a second controller using the same link controller2 = Controller('C2', link=link) # Create a device with a scanner listener device = Device.with_hci( - 'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2 + 'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2 ) device.listener = ScannerListener() await device.power_on() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_device_with_snooper.py b/examples/run_device_with_snooper.py index 69a187f..87307e8 100644 --- a/examples/run_device_with_snooper.py +++ b/examples/run_device_with_snooper.py @@ -20,30 +20,36 @@ import sys import os import logging from bumble.colors import color - +from bumble.hci import Address from bumble.device import Device from bumble.transport import open_transport_or_link from bumble.snoop import BtSnooper + # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) != 3: print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>') print('example: run_device_with_snooper.py usb:0 btsnoop.log') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) with open(sys.argv[2], "wb") as snoop_file: device.host.snooper = BtSnooper(snoop_file) await device.power_on() await device.start_scanning() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_gatt_client.py b/examples/run_gatt_client.py index dcf8a1b..4548159 100644 --- a/examples/run_gatt_client.py +++ b/examples/run_gatt_client.py @@ -69,7 +69,7 @@ class Listener(Device.Listener): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_gatt_client.py <device-config> <transport-spec> ' @@ -79,11 +79,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host, with a custom listener - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) await device.power_on() diff --git a/examples/run_gatt_client_and_server.py b/examples/run_gatt_client_and_server.py index 609fe18..e25d14c 100644 --- a/examples/run_gatt_client_and_server.py +++ b/examples/run_gatt_client_and_server.py @@ -19,21 +19,21 @@ import asyncio import os import logging from bumble.colors import color - from bumble.core import ProtocolError from bumble.controller import Controller from bumble.device import Device, Peer +from bumble.hci import Address from bumble.host import Host from bumble.link import LocalLink from bumble.gatt import ( Service, Characteristic, Descriptor, - show_services, GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_DEVICE_INFORMATION_SERVICE, ) +from bumble.gatt_client import show_services # ----------------------------------------------------------------------------- @@ -43,7 +43,7 @@ class ServerListener(Device.Listener): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: # Create a local link link = LocalLink() @@ -51,14 +51,18 @@ async def main(): client_controller = Controller("client controller", link=link) client_host = Host() client_host.controller = client_controller - client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host) + client_device = Device( + "client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host + ) await client_device.power_on() # Setup a stack for the server server_controller = Controller("server controller", link=link) server_host = Host() server_host.controller = server_controller - server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host) + server_device = Device( + "server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host + ) server_device.listener = ServerListener() await server_device.power_on() diff --git a/examples/run_gatt_server.py b/examples/run_gatt_server.py index 46d42a2..874115c 100644 --- a/examples/run_gatt_server.py +++ b/examples/run_gatt_server.py @@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: run_gatt_server.py <device-config> <transport-spec> ' @@ -81,11 +81,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) # Add a few entries to the device's GATT server @@ -146,7 +148,7 @@ async def main(): else: await device.start_advertising(auto_restart=True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py index c3b392d..851f97c 100644 --- a/examples/run_hfp_gateway.py +++ b/examples/run_hfp_gateway.py @@ -16,240 +16,270 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import json import sys import os +import io import logging +import websockets -from bumble.colors import color +from typing import Optional import bumble.core -from bumble.device import Device +from bumble.device import Device, ScoLink from bumble.transport import open_transport_or_link from bumble.core import ( - BT_HANDSFREE_SERVICE, - BT_RFCOMM_PROTOCOL_ID, BT_BR_EDR_TRANSPORT, ) -from bumble import rfcomm, hfp -from bumble.hci import HCI_SynchronousDataPacket -from bumble.sdp import ( - Client as SDP_Client, - DataElement, - ServiceAttribute, - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, -) +from bumble import hci, rfcomm, hfp logger = logging.getLogger(__name__) +ws: Optional[websockets.WebSocketServerProtocol] = None +ag_protocol: Optional[hfp.AgProtocol] = None +source_file: Optional[io.BufferedReader] = None -# ----------------------------------------------------------------------------- -# pylint: disable-next=too-many-nested-blocks -async def list_rfcomm_channels(device, connection): - # Connect to the SDP Server - sdp_client = SDP_Client(connection) - await sdp_client.connect() - - # Search for services that support the Handsfree Profile - search_result = await sdp_client.search_attributes( - [BT_HANDSFREE_SERVICE], - [ - SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + +def _default_configuration() -> hfp.AgConfiguration: + return hfp.AgConfiguration( + supported_ag_features=[ + hfp.AgFeature.HF_INDICATORS, + hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, + hfp.AgFeature.REJECT_CALL, + hfp.AgFeature.CODEC_NEGOTIATION, + hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.AgFeature.ENHANCED_CALL_STATUS, ], + supported_ag_indicators=[ + hfp.AgIndicatorState.call(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.callheld(), + hfp.AgIndicatorState.service(), + hfp.AgIndicatorState.signal(), + hfp.AgIndicatorState.roam(), + hfp.AgIndicatorState.battchg(), + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_ag_call_hold_operations=[], + supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], ) - print(color('==================================', 'blue')) - print(color('Handsfree Services:', 'yellow')) - rfcomm_channels = [] - # pylint: disable-next=too-many-nested-blocks - for attribute_list in search_result: - # Look for the RFCOMM Channel number - protocol_descriptor_list = ServiceAttribute.find_attribute_in_list( - attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID + + +def send_message(type: str, **kwargs) -> None: + if ws: + asyncio.create_task(ws.send(json.dumps({'type': type, **kwargs}))) + + +def on_speaker_volume(level: int): + send_message(type='speaker_volume', level=level) + + +def on_microphone_volume(level: int): + send_message(type='microphone_volume', level=level) + + +def on_sco_state_change(codec: int): + if codec == hfp.AudioCodec.CVSD: + sample_rate = 8000 + elif codec == hfp.AudioCodec.MSBC: + sample_rate = 16000 + else: + sample_rate = 0 + + send_message(type='sco_state_change', sample_rate=sample_rate) + + +def on_sco_packet(packet: hci.HCI_SynchronousDataPacket): + if ws: + asyncio.create_task(ws.send(packet.data)) + if source_file and (pcm_data := source_file.read(packet.data_total_length)): + assert ag_protocol + host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host + host.send_hci_packet( + hci.HCI_SynchronousDataPacket( + connection_handle=packet.connection_handle, + packet_status=0, + data_total_length=len(pcm_data), + data=pcm_data, + ) ) - if protocol_descriptor_list: - for protocol_descriptor in protocol_descriptor_list.value: - if len(protocol_descriptor.value) >= 2: - if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID: - print(color('SERVICE:', 'green')) - print( - color(' RFCOMM Channel:', 'cyan'), - protocol_descriptor.value[1].value, - ) - rfcomm_channels.append(protocol_descriptor.value[1].value) - - # List profiles - bluetooth_profile_descriptor_list = ( - ServiceAttribute.find_attribute_in_list( - attribute_list, - SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, - ) - ) - if bluetooth_profile_descriptor_list: - if bluetooth_profile_descriptor_list.value: - if ( - bluetooth_profile_descriptor_list.value[0].type - == DataElement.SEQUENCE - ): - bluetooth_profile_descriptors = ( - bluetooth_profile_descriptor_list.value - ) - else: - # Sometimes, instead of a list of lists, we just - # find a list. Fix that - bluetooth_profile_descriptors = [ - bluetooth_profile_descriptor_list - ] - - print(color(' Profiles:', 'green')) - for ( - bluetooth_profile_descriptor - ) in bluetooth_profile_descriptors: - version_major = ( - bluetooth_profile_descriptor.value[1].value >> 8 - ) - version_minor = ( - bluetooth_profile_descriptor.value[1].value - & 0xFF - ) - print( - ' ' - f'{bluetooth_profile_descriptor.value[0].value}' - f' - version {version_major}.{version_minor}' - ) - - # List service classes - service_class_id_list = ServiceAttribute.find_attribute_in_list( - attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID - ) - if service_class_id_list: - if service_class_id_list.value: - print(color(' Service Classes:', 'green')) - for service_class_id in service_class_id_list.value: - print(' ', service_class_id.value) - - await sdp_client.disconnect() - return rfcomm_channels + + +def on_hfp_state_change(connected: bool): + send_message(type='hfp_state_change', connected=connected) + + +async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str): + del path + global ws + ws = ws_client + + async for message in ws_client: + if not ag_protocol: + continue + + json_message = json.loads(message) + message_type = json_message['type'] + connection = ag_protocol.dlc.multiplexer.l2cap_channel.connection + device = connection.device + + try: + if message_type == 'at_response': + ag_protocol.send_response(json_message['response']) + elif message_type == 'ag_indicator': + ag_protocol.update_ag_indicator( + hfp.AgIndicator(json_message['indicator']), + int(json_message['value']), + ) + elif message_type == 'negotiate_codec': + codec = hfp.AudioCodec(int(json_message['codec'])) + await ag_protocol.negotiate_codec(codec) + elif message_type == 'connect_sco': + if ag_protocol.active_codec == hfp.AudioCodec.CVSD: + esco_param = hfp.ESCO_PARAMETERS[ + hfp.DefaultCodecParameters.ESCO_CVSD_S4 + ] + elif ag_protocol.active_codec == hfp.AudioCodec.MSBC: + esco_param = hfp.ESCO_PARAMETERS[ + hfp.DefaultCodecParameters.ESCO_MSBC_T2 + ] + else: + raise ValueError(f'Unsupported codec {codec}') + + await device.send_command( + hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( + connection_handle=connection.handle, **esco_param.asdict() + ) + ) + elif message_type == 'disconnect_sco': + # Copy the values to avoid iteration error. + for sco_link in list(device.sco_links.values()): + await sco_link.disconnect() + elif message_type == 'update_calls': + ag_protocol.calls = [ + hfp.CallInfo( + index=int(call['index']), + direction=hfp.CallInfoDirection(int(call['direction'])), + status=hfp.CallInfoStatus(int(call['status'])), + number=call['number'], + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + mode=hfp.CallInfoMode.VOICE, + ) + for call in json_message['calls'] + ] + + except Exception as e: + send_message(type='error', message=e) # ----------------------------------------------------------------------------- -async def main(): - if len(sys.argv) < 4: +async def main() -> None: + if len(sys.argv) < 3: print( 'Usage: run_hfp_gateway.py <device-config> <transport-spec> ' - '<bluetooth-address>' + '[bluetooth-address] [wav-file-for-source]' ) print( - ' specifying a channel number, or "discover" to list all RFCOMM channels' + 'example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8 sample.wav' ) - print('example: run_hfp_gateway.py hfp_gateway.json usb:0 E1:CA:72:48:C4:E8') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True await device.power_on() - # Connect to a peer - target_address = sys.argv[3] - print(f'=== Connecting to {target_address}...') - connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) - print(f'=== Connected to {connection.peer_address}!') - - # Get a list of all the Handsfree services (should only be 1) - channels = await list_rfcomm_channels(device, connection) - if len(channels) == 0: - print('!!! no service found') - return - - # Pick the first one - channel = channels[0] - - # Request authentication - print('*** Authenticating...') - await connection.authenticate() - print('*** Authenticated') - - # Enable encryption - print('*** Enabling encryption...') - await connection.encrypt() - print('*** Encryption on') - - # Create a client and start it - print('@@@ Starting to RFCOMM client...') - rfcomm_client = rfcomm.Client(connection) - rfcomm_mux = await rfcomm_client.start() - print('@@@ Started') - - print(f'### Opening session for channel {channel}...') - try: - session = await rfcomm_mux.open_dlc(channel) - print('### Session open', session) - except bumble.core.ConnectionError as error: - print(f'### Session open failed: {error}') - await rfcomm_mux.disconnect() - print('@@@ Disconnected from RFCOMM server') - return - - def on_sco(connection_handle: int, packet: HCI_SynchronousDataPacket): - # Reset packet and loopback - packet.packet_status = 0 - device.host.send_hci_packet(packet) - - device.host.on('sco_packet', on_sco) - - # Protocol loop (just for testing at this point) - protocol = hfp.HfpProtocol(session) - while True: - line = await protocol.next_line() - - if line.startswith('AT+BRSF='): - protocol.send_response_line('+BRSF: 30') - protocol.send_response_line('OK') - elif line.startswith('AT+CIND=?'): - protocol.send_response_line( - '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' - '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' - '("callheld",(0-2))' - ) - protocol.send_response_line('OK') - elif line.startswith('AT+CIND?'): - protocol.send_response_line('+CIND: 0,0,1,4,1,5,0') - protocol.send_response_line('OK') - elif line.startswith('AT+CMER='): - protocol.send_response_line('OK') - elif line.startswith('AT+CHLD=?'): - protocol.send_response_line('+CHLD: 0') - protocol.send_response_line('OK') - elif line.startswith('AT+BTRH?'): - protocol.send_response_line('+BTRH: 0') - protocol.send_response_line('OK') - elif line.startswith('AT+CLIP='): - protocol.send_response_line('OK') - elif line.startswith('AT+VGS='): - protocol.send_response_line('OK') - elif line.startswith('AT+BIA='): - protocol.send_response_line('OK') - elif line.startswith('AT+BVRA='): - protocol.send_response_line( - '+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"' - ) - elif line.startswith('AT+XEVENT='): - protocol.send_response_line('OK') - elif line.startswith('AT+XAPL='): - protocol.send_response_line('OK') - else: - print(color('UNSUPPORTED AT COMMAND', 'red')) - protocol.send_response_line('ERROR') - - await hci_source.wait_for_termination() + rfcomm_server = rfcomm.Server(device) + configuration = _default_configuration() + + def on_dlc(dlc: rfcomm.DLC): + global ag_protocol + ag_protocol = hfp.AgProtocol(dlc, configuration) + ag_protocol.on('speaker_volume', on_speaker_volume) + ag_protocol.on('microphone_volume', on_microphone_volume) + on_hfp_state_change(True) + dlc.multiplexer.l2cap_channel.on( + 'close', lambda: on_hfp_state_change(False) + ) + + channel = rfcomm_server.listen(on_dlc) + device.sdp_service_records = { + 1: hfp.make_ag_sdp_records(1, channel, configuration) + } + + def on_sco_connection(sco_link: ScoLink): + assert ag_protocol + on_sco_state_change(ag_protocol.active_codec) + sco_link.on('disconnection', lambda _: on_sco_state_change(0)) + sco_link.sink = on_sco_packet + + device.on('sco_connection', on_sco_connection) + if len(sys.argv) >= 4: + # Connect to a peer + target_address = sys.argv[3] + print(f'=== Connecting to {target_address}...') + connection = await device.connect( + target_address, transport=BT_BR_EDR_TRANSPORT + ) + print(f'=== Connected to {connection.peer_address}!') + + # Get a list of all the Handsfree services (should only be 1) + if not (hfp_record := await hfp.find_hf_sdp_record(connection)): + print('!!! no service found') + return + + # Pick the first one + channel, version, hf_sdp_features = hfp_record + print(f'HF version: {version}') + print(f'HF features: {hf_sdp_features}') + + # Request authentication + print('*** Authenticating...') + await connection.authenticate() + print('*** Authenticated') + + # Enable encryption + print('*** Enabling encryption...') + await connection.encrypt() + print('*** Encryption on') + + # Create a client and start it + print('@@@ Starting to RFCOMM client...') + rfcomm_client = rfcomm.Client(connection) + rfcomm_mux = await rfcomm_client.start() + print('@@@ Started') + + print(f'### Opening session for channel {channel}...') + try: + session = await rfcomm_mux.open_dlc(channel) + print('### Session open', session) + except bumble.core.ConnectionError as error: + print(f'### Session open failed: {error}') + await rfcomm_mux.disconnect() + print('@@@ Disconnected from RFCOMM server') + return + + on_dlc(session) + + await websockets.serve(ws_server, port=8888) + + if len(sys.argv) >= 5: + global source_file + source_file = open(sys.argv[4], 'rb') + # Skip header + source_file.seek(44) + + await hci_transport.source.terminated # ----------------------------------------------------------------------------- diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py index f4e445e..5433284 100644 --- a/examples/run_hfp_handsfree.py +++ b/examples/run_hfp_handsfree.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import contextlib import sys import os import logging @@ -31,39 +32,16 @@ from bumble.transport import open_transport_or_link from bumble import hfp from bumble.hfp import HfProtocol - -# ----------------------------------------------------------------------------- -class UiServer: - protocol: Optional[HfProtocol] = None - - async def start(self): - """Start a Websocket server to receive events from a web page.""" - - async def serve(websocket, _path): - while True: - try: - message = await websocket.recv() - print('Received: ', str(message)) - - parsed = json.loads(message) - message_type = parsed['type'] - if message_type == 'at_command': - if self.protocol is not None: - await self.protocol.execute_command(parsed['command']) - - except websockets.exceptions.ConnectionClosedOK: - pass - - # pylint: disable=no-member - await websockets.serve(serve, 'localhost', 8989) +ws: Optional[websockets.WebSocketServerProtocol] = None +hf_protocol: Optional[HfProtocol] = None # ----------------------------------------------------------------------------- -def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration): +def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration): print('*** DLC connected', dlc) - protocol = HfProtocol(dlc, configuration) - UiServer.protocol = protocol - asyncio.create_task(protocol.run()) + global hf_protocol + hf_protocol = HfProtocol(dlc, configuration) + asyncio.create_task(hf_protocol.run()) def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol): if connection == protocol.dlc.multiplexer.l2cap_channel.connection: @@ -88,7 +66,7 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration): ), ) - handler = functools.partial(on_sco_request, protocol=protocol) + handler = functools.partial(on_sco_request, protocol=hf_protocol) dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler) dlc.multiplexer.l2cap_channel.once( 'close', @@ -97,21 +75,28 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration): ), ) + def on_ag_indicator(indicator): + global ws + if ws: + asyncio.create_task(ws.send(str(indicator))) + + hf_protocol.on('ag_indicator', on_ag_indicator) + # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_classic_hfp.py <device-config> <transport-spec>') print('example: run_classic_hfp.py classic2.json usb:04b4:f901') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Hands-Free profile configuration. # TODO: load configuration from file. - configuration = hfp.Configuration( + configuration = hfp.HfConfiguration( supported_hf_features=[ hfp.HfFeature.THREE_WAY_CALLING, hfp.HfFeature.REMOTE_VOLUME_CONTROL, @@ -131,7 +116,9 @@ async def main(): ) # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create and register a server @@ -143,7 +130,9 @@ async def main(): # Advertise the HFP RFComm channel in the SDP device.sdp_service_records = { - 0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration) + 0x00010001: hfp.make_hf_sdp_records( + 0x00010001, channel_number, configuration + ) } # Let's go! @@ -154,10 +143,32 @@ async def main(): await device.set_connectable(True) # Start the UI websocket server to offer a few buttons and input boxes - ui_server = UiServer() - await ui_server.start() + async def serve(websocket: websockets.WebSocketServerProtocol, _path): + global ws + ws = websocket + async for message in websocket: + with contextlib.suppress(websockets.exceptions.ConnectionClosedOK): + print('Received: ', str(message)) + + parsed = json.loads(message) + message_type = parsed['type'] + if message_type == 'at_command': + if hf_protocol is not None: + response = str( + await hf_protocol.execute_command( + parsed['command'], + response_type=hfp.AtResponseType.MULTIPLE, + ) + ) + await websocket.send(response) + elif message_type == 'query_call': + if hf_protocol: + response = str(await hf_protocol.query_current_calls()) + await websocket.send(response) + + await websockets.serve(serve, 'localhost', 8989) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hid_device.py b/examples/run_hid_device.py index 9aebfc2..2287be0 100644 --- a/examples/run_hid_device.py +++ b/examples/run_hid_device.py @@ -229,6 +229,7 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor # Default protocol mode set to report protocol protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL + # ----------------------------------------------------------------------------- def sdp_records(): service_record_handle = 0x00010002 @@ -427,6 +428,7 @@ class DeviceData: # Device's live data - Mouse and Keyboard will be stored in this deviceData = DeviceData() + # ----------------------------------------------------------------------------- async def keyboard_device(hid_device): @@ -487,7 +489,7 @@ async def keyboard_device(hid_device): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print( 'Usage: python run_hid_device.py <device-config> <transport-spec> <command>' @@ -599,11 +601,13 @@ async def main(): asyncio.create_task(handle_virtual_cable_unplug()) print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create and register HID device @@ -740,7 +744,7 @@ async def main(): print("Executing in Web mode") await keyboard_device(hid_device) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_hid_host.py b/examples/run_hid_host.py index 7519b4e..cc17cc1 100644 --- a/examples/run_hid_host.py +++ b/examples/run_hid_host.py @@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader: # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_hid_host.py <device-config> <transport-spec> ' @@ -324,11 +324,13 @@ async def main(): asyncio.create_task(handle_virtual_cable_unplug()) print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< CONNECTED') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create HID host and start it @@ -557,7 +559,7 @@ async def main(): # Interrupt Channel await hid_host.connect_interrupt_channel() - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_notifier.py b/examples/run_notifier.py index 5f6def3..869e716 100644 --- a/examples/run_notifier.py +++ b/examples/run_notifier.py @@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 3: print('Usage: run_notifier.py <device-config> <transport-spec>') print('example: run_notifier.py device1.json usb:0') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device to manage the host - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.listener = Listener(device) # Add a few entries to the device's GATT server diff --git a/examples/run_rfcomm_client.py b/examples/run_rfcomm_client.py index 39ee776..9232dc9 100644 --- a/examples/run_rfcomm_client.py +++ b/examples/run_rfcomm_client.py @@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session): # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 5: print( 'Usage: run_rfcomm_client.py <device-config> <transport-spec> ' @@ -178,11 +178,13 @@ async def main(): return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True await device.power_on() @@ -192,8 +194,8 @@ async def main(): connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) print(f'=== Connected to {connection.peer_address}!') - channel = sys.argv[4] - if channel == 'discover': + channel_str = sys.argv[4] + if channel_str == 'discover': await list_rfcomm_channels(connection) return @@ -213,7 +215,7 @@ async def main(): rfcomm_mux = await rfcomm_client.start() print('@@@ Started') - channel = int(channel) + channel = int(channel_str) print(f'### Opening session for channel {channel}...') try: session = await rfcomm_mux.open_dlc(channel) @@ -229,7 +231,7 @@ async def main(): tcp_port = int(sys.argv[5]) asyncio.create_task(tcp_server(tcp_port, session)) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_rfcomm_server.py b/examples/run_rfcomm_server.py index 41915a4..14bc7eb 100644 --- a/examples/run_rfcomm_server.py +++ b/examples/run_rfcomm_server.py @@ -107,7 +107,7 @@ class TcpServer: # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 4: print( 'Usage: run_rfcomm_server.py <device-config> <transport-spec> ' @@ -124,11 +124,13 @@ async def main(): uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE' print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[2]) as hci_transport: print('<<< connected') # Create a device - device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) device.classic_enabled = True # Create a TCP server @@ -153,7 +155,7 @@ async def main(): await device.set_discoverable(True) await device.set_connectable(True) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_scanner.py b/examples/run_scanner.py index 4a094b9..5748b48 100644 --- a/examples/run_scanner.py +++ b/examples/run_scanner.py @@ -20,27 +20,31 @@ import sys import os import logging from bumble.colors import color - +from bumble.hci import Address from bumble.device import Device from bumble.transport import open_transport_or_link # ----------------------------------------------------------------------------- -async def main(): +async def main() -> None: if len(sys.argv) < 2: print('Usage: run_scanner.py <transport-spec> [filter]') print('example: run_scanner.py usb:0') return print('<<< connecting to HCI...') - async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink): + async with await open_transport_or_link(sys.argv[1]) as hci_transport: print('<<< connected') filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter' - device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) - @device.on('advertisement') - def _(advertisement): + def on_adv(advertisement): address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[ advertisement.address.address_type ] @@ -67,10 +71,11 @@ async def main(): f'{advertisement.data.to_string(separator)}' ) + device.on('advertisement', on_adv) await device.power_on() await device.start_scanning(filter_duplicates=filter_duplicates) - await hci_source.wait_for_termination() + await hci_transport.source.wait_for_termination() # ----------------------------------------------------------------------------- diff --git a/examples/run_unicast_server.py b/examples/run_unicast_server.py index 4fac1d6..95ae551 100644 --- a/examples/run_unicast_server.py +++ b/examples/run_unicast_server.py @@ -16,20 +16,28 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import datetime +import functools import logging import sys import os +import io import struct import secrets + +from typing import Dict + from bumble.core import AdvertisingData -from bumble.device import Device, CisLink, AdvertisingParameters +from bumble.device import Device from bumble.hci import ( CodecID, CodingFormat, - OwnAddressType, HCI_IsoDataPacket, ) from bumble.profiles.bap import ( + AseStateMachine, + UnicastServerAdvertisingData, + CodecSpecificConfiguration, CodecSpecificCapabilities, ContextType, AudioLocation, @@ -45,6 +53,32 @@ from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType from bumble.transport import open_transport_or_link +def _sink_pac_record() -> PacRecord: + return PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=CodecSpecificCapabilities( + supported_sampling_frequencies=( + SupportedSamplingFrequency.FREQ_8000 + | SupportedSamplingFrequency.FREQ_16000 + | SupportedSamplingFrequency.FREQ_24000 + | SupportedSamplingFrequency.FREQ_32000 + | SupportedSamplingFrequency.FREQ_48000 + ), + supported_frame_durations=( + SupportedFrameDuration.DURATION_7500_US_SUPPORTED + | SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_count=[1, 2], + min_octets_per_codec_frame=26, + max_octets_per_codec_frame=240, + supported_max_codec_frames_per_sdu=2, + ), + ) + + +file_outputs: Dict[AseStateMachine, io.BufferedWriter] = {} + + # ----------------------------------------------------------------------------- async def main() -> None: if len(sys.argv) < 3: @@ -71,49 +105,17 @@ async def main() -> None: PublishedAudioCapabilitiesService( supported_source_context=ContextType.PROHIBITED, available_source_context=ContextType.PROHIBITED, - supported_sink_context=ContextType.MEDIA, - available_sink_context=ContextType.MEDIA, + supported_sink_context=ContextType(0xFF), # All context types + available_sink_context=ContextType(0xFF), # All context types sink_audio_locations=( AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT ), - sink_pac=[ - # Codec Capability Setting 16_2 - PacRecord( - coding_format=CodingFormat(CodecID.LC3), - codec_specific_capabilities=CodecSpecificCapabilities( - supported_sampling_frequencies=( - SupportedSamplingFrequency.FREQ_16000 - ), - supported_frame_durations=( - SupportedFrameDuration.DURATION_10000_US_SUPPORTED - ), - supported_audio_channel_counts=[1], - min_octets_per_codec_frame=40, - max_octets_per_codec_frame=40, - supported_max_codec_frames_per_sdu=1, - ), - ), - # Codec Capability Setting 24_2 - PacRecord( - coding_format=CodingFormat(CodecID.LC3), - codec_specific_capabilities=CodecSpecificCapabilities( - supported_sampling_frequencies=( - SupportedSamplingFrequency.FREQ_48000 - ), - supported_frame_durations=( - SupportedFrameDuration.DURATION_10000_US_SUPPORTED - ), - supported_audio_channel_counts=[1], - min_octets_per_codec_frame=120, - max_octets_per_codec_frame=120, - supported_max_codec_frames_per_sdu=1, - ), - ), - ], + sink_pac=[_sink_pac_record()], ) ) - device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2])) + ascs = AudioStreamControlService(device, sink_ase_id=[1], source_ase_id=[2]) + device.add_service(ascs) advertising_data = ( bytes( @@ -141,45 +143,59 @@ async def main() -> None: ) ) + csis.get_advertising_data() - ) - subprocess = await asyncio.create_subprocess_shell( - f'dlc3 | ffplay pipe:0', - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdin = subprocess.stdin - assert stdin - - # Write a fake LC3 header to dlc3. - stdin.write( - bytes([0x1C, 0xCC]) # Header. - + struct.pack( - '<HHHHHHI', - 18, # Header length. - 48000 // 100, # Sampling Rate(/100Hz). - 0, # Bitrate(unused). - 1, # Channels. - 10000 // 10, # Frame duration(/10us). - 0, # RFU. - 0x0FFFFFFF, # Frame counts. - ) + + bytes(UnicastServerAdvertisingData()) ) - def on_pdu(pdu: HCI_IsoDataPacket): + def on_pdu(ase: AseStateMachine, pdu: HCI_IsoDataPacket): # LC3 format: |frame_length(2)| + |frame(length)|. + sdu = b'' if pdu.iso_sdu_length: - stdin.write(struct.pack('<H', pdu.iso_sdu_length)) - stdin.write(pdu.iso_sdu_fragment) - - def on_cis(cis_link: CisLink): - cis_link.on('pdu', on_pdu) - - device.once('cis_establishment', on_cis) + sdu = struct.pack('<H', pdu.iso_sdu_length) + sdu += pdu.iso_sdu_fragment + file_outputs[ase].write(sdu) + + def on_ase_state_change( + state: AseStateMachine.State, + ase: AseStateMachine, + ) -> None: + if state != AseStateMachine.State.STREAMING: + if file_output := file_outputs.pop(ase): + file_output.close() + else: + file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb') + codec_configuration = ase.codec_specific_configuration + assert isinstance(codec_configuration, CodecSpecificConfiguration) + # Write a LC3 header. + file_output.write( + bytes([0x1C, 0xCC]) # Header. + + struct.pack( + '<HHHHHHI', + 18, # Header length. + codec_configuration.sampling_frequency.hz + // 100, # Sampling Rate(/100Hz). + 0, # Bitrate(unused). + bin(codec_configuration.audio_channel_allocation).count( + '1' + ), # Channels. + codec_configuration.frame_duration.us + // 10, # Frame duration(/10us). + 0, # RFU. + 0x0FFFFFFF, # Frame counts. + ) + ) + file_outputs[ase] = file_output + assert ase.cis_link + ase.cis_link.sink = functools.partial(on_pdu, ase) + + for ase in ascs.ase_state_machines.values(): + ase.on( + 'state_change', + functools.partial(on_ase_state_change, ase=ase), + ) - advertising_set = await device.create_advertising_set( + await device.create_advertising_set( advertising_data=advertising_data, + auto_restart=True, ) await hci_transport.source.terminated diff --git a/examples/run_vcp_renderer.py b/examples/run_vcp_renderer.py index b695956..0cffbae 100644 --- a/examples/run_vcp_renderer.py +++ b/examples/run_vcp_renderer.py @@ -31,6 +31,7 @@ from bumble.hci import ( OwnAddressType, ) from bumble.profiles.bap import ( + UnicastServerAdvertisingData, CodecSpecificCapabilities, ContextType, AudioLocation, @@ -101,7 +102,7 @@ async def main() -> None: supported_frame_durations=( SupportedFrameDuration.DURATION_10000_US_SUPPORTED ), - supported_audio_channel_counts=[1], + supported_audio_channel_count=[1], min_octets_per_codec_frame=120, max_octets_per_codec_frame=120, supported_max_codec_frames_per_sdu=1, @@ -151,6 +152,7 @@ async def main() -> None: ) ) + csis.get_advertising_data() + + bytes(UnicastServerAdvertisingData()) ) await device.create_advertising_set( diff --git a/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt b/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt index bd5b7f4..46a014a 100644 --- a/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt +++ b/extras/android/BtBench/app/src/main/java/com/github/google/bumble/btbench/SocketClient.kt @@ -56,13 +56,19 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue thread { socketDataSource.receive() + socket.close() + sender.abort() } Log.info("Startup delay: $DEFAULT_STARTUP_DELAY") Thread.sleep(DEFAULT_STARTUP_DELAY.toLong()); Log.info("Starting to send") - sender.run() + try { + sender.run() + } catch (error: IOException) { + Log.info("run ended abruptly") + } cleanup() } } diff --git a/rust/CHANGELOG.md b/rust/CHANGELOG.md index 2cfed4e..ce575a2 100644 --- a/rust/CHANGELOG.md +++ b/rust/CHANGELOG.md @@ -1,7 +1,10 @@ -# Next +# 0.2.0 - Code-gen company ID table +- Unstable support for extended advertisements +- CLI tools for downloading Realtek firmware +- PDL-generated types for HCI commands # 0.1.0 -- Initial release
\ No newline at end of file +- Initial release diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3339339..5305f9f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -182,7 +182,7 @@ dependencies = [ [[package]] name = "bumble" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "bytes", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 8106114..73c9ac3 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "bumble" description = "Rust API for the Bumble Bluetooth stack" -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "Apache-2.0" homepage = "https://google.github.io/bumble/index.html" diff --git a/rust/README.md b/rust/README.md index 15a19b9..e08ef25 100644 --- a/rust/README.md +++ b/rust/README.md @@ -37,6 +37,11 @@ PYTHONPATH=..:[virtualenv site-packages] \ cargo run --features bumble-tools --bin bumble -- --help ``` +Notable subcommands: + +- `firmware realtek download`: download Realtek firmware for various chipsets so that it can be automatically loaded when needed +- `usb probe`: show USB devices, highlighting the ones usable for Bluetooth + # Development Run the tests: @@ -63,4 +68,4 @@ To regenerate the assigned number tables based on the Python codebase: ``` PYTHONPATH=.. cargo run --bin gen-assigned-numbers --features dev-tools -```
\ No newline at end of file +``` diff --git a/rust/src/wrapper/controller.rs b/rust/src/wrapper/controller.rs index 4f19dd6..cec10db 100644 --- a/rust/src/wrapper/controller.rs +++ b/rust/src/wrapper/controller.rs @@ -35,7 +35,7 @@ impl Controller { /// module specifies the defaults. Must be called from a thread with a Python event loop, which /// should be true on `tokio::main` and `async_std::main`. /// - /// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars. + /// For more info, see <https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars>. pub async fn new( name: &str, host_source: Option<TransportSource>, diff --git a/rust/src/wrapper/hci.rs b/rust/src/wrapper/hci.rs index 533fe21..52480c8 100644 --- a/rust/src/wrapper/hci.rs +++ b/rust/src/wrapper/hci.rs @@ -149,7 +149,7 @@ impl ToPyObject for Address { /// An error meaning that the u64 value did not represent a valid BT address. #[derive(Debug)] -pub struct InvalidAddress(u64); +pub struct InvalidAddress(#[allow(unused)] u64); impl TryInto<packets::Address> for Address { type Error = ConversionError<InvalidAddress>; diff --git a/rust/src/wrapper/l2cap.rs b/rust/src/wrapper/l2cap.rs index 5e0752e..06fbc52 100644 --- a/rust/src/wrapper/l2cap.rs +++ b/rust/src/wrapper/l2cap.rs @@ -71,7 +71,7 @@ impl LeConnectionOrientedChannel { /// Must be called from a thread with a Python event loop, which should be true on /// `tokio::main` and `async_std::main`. /// - /// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars. + /// For more info, see <https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars>. pub async fn disconnect(&mut self) -> PyResult<()> { Python::with_gil(|py| { self.0 @@ -33,18 +33,17 @@ include_package_data = True install_requires = aiohttp ~= 3.8; platform_system!='Emscripten' appdirs >= 1.4; platform_system!='Emscripten' - bt-test-interfaces >= 0.0.2; platform_system!='Emscripten' - click == 8.1.3; platform_system!='Emscripten' + click >= 8.1.3; platform_system!='Emscripten' cryptography == 39; platform_system!='Emscripten' # Pyodide bundles a version of cryptography that is built for wasm, which may not match the # versions available on PyPI. Relax the version requirement since it's better than being # completely unable to import the package in case of version mismatch. cryptography >= 39.0; platform_system=='Emscripten' - grpcio == 1.57.0; platform_system!='Emscripten' + grpcio >= 1.62.1; platform_system!='Emscripten' humanize >= 4.6.0; platform_system!='Emscripten' libusb1 >= 2.0.1; platform_system!='Emscripten' libusb-package == 1.0.26.1; platform_system!='Emscripten' - platformdirs == 3.10.0; platform_system!='Emscripten' + platformdirs >= 3.10.0; platform_system!='Emscripten' prompt_toolkit >= 3.0.16; platform_system!='Emscripten' prettytable >= 3.6.0; platform_system!='Emscripten' protobuf >= 3.12.4; platform_system!='Emscripten' @@ -63,6 +62,7 @@ console_scripts = bumble-gatt-dump = bumble.apps.gatt_dump:main bumble-hci-bridge = bumble.apps.hci_bridge:main bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main + bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main bumble-pair = bumble.apps.pair:main bumble-scan = bumble.apps.scan:main bumble-show = bumble.apps.show:main @@ -82,24 +82,27 @@ console_scripts = build = build >= 0.7 test = - pytest >= 8.0 - pytest-asyncio == 0.21.1 + pytest >= 8.2 + pytest-asyncio >= 0.23.5 pytest-html >= 3.2.0 coverage >= 6.4 development = - black == 22.10 - grpcio-tools >= 1.57.0 + black == 24.3 + grpcio-tools >= 1.62.1 invoke >= 1.7.3 - mypy == 1.8.0 + mypy == 1.10.0 nox >= 2022 - pylint == 2.15.8 + pylint == 3.1.0 pyyaml >= 6.0 types-appdirs >= 1.4.3 types-invoke >= 1.7.3 types-protobuf >= 4.21.0 + wasmtime == 20.0.0 avatar = - pandora-avatar == 0.0.8 - rootcanal == 1.9.0 ; python_version>='3.10' + pandora-avatar == 0.0.9 + rootcanal == 1.10.0 ; python_version>='3.10' +pandora = + bt-test-interfaces >= 0.0.6 documentation = mkdocs >= 1.4.0 mkdocs-material >= 8.5.6 diff --git a/tests/bap_test.py b/tests/bap_test.py index bc223c1..0b6db1a 100644 --- a/tests/bap_test.py +++ b/tests/bap_test.py @@ -72,7 +72,7 @@ def test_codec_specific_capabilities() -> None: cap = CodecSpecificCapabilities( supported_sampling_frequencies=SAMPLE_FREQUENCY, supported_frame_durations=FRAME_SURATION, - supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS, + supported_audio_channel_count=AUDIO_CHANNEL_COUNTS, min_octets_per_codec_frame=40, max_octets_per_codec_frame=40, supported_max_codec_frames_per_sdu=1, @@ -88,7 +88,7 @@ def test_pac_record() -> None: cap = CodecSpecificCapabilities( supported_sampling_frequencies=SAMPLE_FREQUENCY, supported_frame_durations=FRAME_SURATION, - supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS, + supported_audio_channel_count=AUDIO_CHANNEL_COUNTS, min_octets_per_codec_frame=40, max_octets_per_codec_frame=40, supported_max_codec_frames_per_sdu=1, @@ -216,7 +216,7 @@ async def test_pacs(): supported_frame_durations=( SupportedFrameDuration.DURATION_10000_US_SUPPORTED ), - supported_audio_channel_counts=[1], + supported_audio_channel_count=[1], min_octets_per_codec_frame=40, max_octets_per_codec_frame=40, supported_max_codec_frames_per_sdu=1, @@ -232,7 +232,7 @@ async def test_pacs(): supported_frame_durations=( SupportedFrameDuration.DURATION_10000_US_SUPPORTED ), - supported_audio_channel_counts=[1], + supported_audio_channel_count=[1], min_octets_per_codec_frame=60, max_octets_per_codec_frame=60, supported_max_codec_frames_per_sdu=1, diff --git a/tests/core_test.py b/tests/core_test.py index 6c9d0c3..11afb1c 100644 --- a/tests/core_test.py +++ b/tests/core_test.py @@ -17,6 +17,7 @@ # ----------------------------------------------------------------------------- from bumble.core import AdvertisingData, UUID, get_dict_key_by_value + # ----------------------------------------------------------------------------- def test_ad_data(): data = bytes([2, AdvertisingData.TX_POWER_LEVEL, 123]) diff --git a/tests/device_test.py b/tests/device_test.py index 5d87282..ac0c96b 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -16,6 +16,7 @@ # Imports # ----------------------------------------------------------------------------- import asyncio +import functools import logging import os from types import LambdaType @@ -35,12 +36,14 @@ from bumble.hci import ( HCI_COMMAND_STATUS_PENDING, HCI_CREATE_CONNECTION_COMMAND, HCI_SUCCESS, + HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR, Address, OwnAddressType, HCI_Command_Complete_Event, HCI_Command_Status_Event, HCI_Connection_Complete_Event, HCI_Connection_Request_Event, + HCI_Error, HCI_Packet, ) from bumble.gatt import ( @@ -52,6 +55,10 @@ from bumble.gatt import ( from .test_utils import TwoDevices, async_barrier +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +_TIMEOUT = 0.1 # ----------------------------------------------------------------------------- # Logging @@ -214,6 +221,12 @@ async def test_device_connect_parallel(): d1.host.set_packet_sink(Sink(d1_flow())) d2.host.set_packet_sink(Sink(d2_flow())) + d1_accept_task = asyncio.create_task(d1.accept(peer_address=d0.public_address)) + d2_accept_task = asyncio.create_task(d2.accept()) + + # Ensure that the accept tasks have started. + await async_barrier() + [c01, c02, a10, a20] = await asyncio.gather( *[ asyncio.create_task( @@ -222,8 +235,8 @@ async def test_device_connect_parallel(): asyncio.create_task( d0.connect(d2.public_address, transport=BT_BR_EDR_TRANSPORT) ), - asyncio.create_task(d1.accept(peer_address=d0.public_address)), - asyncio.create_task(d2.accept()), + d1_accept_task, + d2_accept_task, ] ) @@ -387,6 +400,29 @@ async def test_get_remote_le_features(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio +async def test_get_remote_le_features_failed(): + devices = TwoDevices() + await devices.setup_connection() + + def on_hci_le_read_remote_features_complete_event(event): + devices[0].host.emit( + 'le_remote_features_failure', + event.connection_handle, + HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR, + ) + + devices[0].host.on_hci_le_read_remote_features_complete_event = ( + on_hci_le_read_remote_features_complete_event + ) + + with pytest.raises(HCI_Error): + await asyncio.wait_for( + devices.connections[0].get_remote_le_features(), _TIMEOUT + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio async def test_cis(): devices = TwoDevices() await devices.setup_connection() @@ -434,6 +470,65 @@ async def test_cis(): # ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_cis_setup_failure(): + devices = TwoDevices() + await devices.setup_connection() + + cis_requests = asyncio.Queue() + + def on_cis_request( + acl_connection: Connection, + cis_handle: int, + cig_id: int, + cis_id: int, + ): + del acl_connection, cig_id, cis_id + cis_requests.put_nowait(cis_handle) + + devices[1].on('cis_request', on_cis_request) + + cis_handles = await devices[0].setup_cig( + cig_id=1, + cis_id=[2], + sdu_interval=(0, 0), + framing=0, + max_sdu=(0, 0), + retransmission_number=0, + max_transport_latency=(0, 0), + ) + assert len(cis_handles) == 1 + + cis_create_task = asyncio.create_task( + devices[0].create_cis( + [ + (cis_handles[0], devices.connections[0].handle), + ] + ) + ) + + def on_hci_le_cis_established_event(host, event): + host.emit( + 'cis_establishment_failure', + event.connection_handle, + HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR, + ) + + for device in devices: + device.host.on_hci_le_cis_established_event = functools.partial( + on_hci_le_cis_established_event, device.host + ) + + cis_request = await asyncio.wait_for(cis_requests.get(), _TIMEOUT) + + with pytest.raises(HCI_Error): + await asyncio.wait_for(devices[1].accept_cis_request(cis_request), _TIMEOUT) + + with pytest.raises(HCI_Error): + await asyncio.wait_for(cis_create_task, _TIMEOUT) + + +# ----------------------------------------------------------------------------- def test_gatt_services_with_gas(): device = Device(host=Host(None, None)) diff --git a/tests/hfp_test.py b/tests/hfp_test.py index dc28180..83b0d35 100644 --- a/tests/hfp_test.py +++ b/tests/hfp_test.py @@ -19,8 +19,9 @@ import asyncio import logging import os import pytest +import pytest_asyncio -from typing import Tuple +from typing import Tuple, Optional from .test_utils import TwoDevices from bumble import core @@ -36,9 +37,93 @@ logger = logging.getLogger(__name__) # ----------------------------------------------------------------------------- +def _default_hf_configuration() -> hfp.HfConfiguration: + return hfp.HfConfiguration( + supported_hf_features=[ + hfp.HfFeature.CODEC_NEGOTIATION, + hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.HfFeature.HF_INDICATORS, + hfp.HfFeature.ENHANCED_CALL_STATUS, + hfp.HfFeature.THREE_WAY_CALLING, + hfp.HfFeature.CLI_PRESENTATION_CAPABILITY, + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_audio_codecs=[ + hfp.AudioCodec.CVSD, + hfp.AudioCodec.MSBC, + ], + ) + + +# ----------------------------------------------------------------------------- +def _default_hf_sdp_features() -> hfp.HfSdpFeature: + return ( + hfp.HfSdpFeature.WIDE_BAND + | hfp.HfSdpFeature.THREE_WAY_CALLING + | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY + ) + + +# ----------------------------------------------------------------------------- +def _default_ag_configuration() -> hfp.AgConfiguration: + return hfp.AgConfiguration( + supported_ag_features=[ + hfp.AgFeature.HF_INDICATORS, + hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, + hfp.AgFeature.REJECT_CALL, + hfp.AgFeature.CODEC_NEGOTIATION, + hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, + hfp.AgFeature.ENHANCED_CALL_STATUS, + hfp.AgFeature.THREE_WAY_CALLING, + ], + supported_ag_indicators=[ + hfp.AgIndicatorState.call(), + hfp.AgIndicatorState.service(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.callsetup(), + hfp.AgIndicatorState.signal(), + hfp.AgIndicatorState.roam(), + hfp.AgIndicatorState.battchg(), + ], + supported_hf_indicators=[ + hfp.HfIndicator.ENHANCED_SAFETY, + hfp.HfIndicator.BATTERY_LEVEL, + ], + supported_ag_call_hold_operations=[ + hfp.CallHoldOperation.ADD_HELD_CALL, + hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, + hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, + hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, + hfp.CallHoldOperation.CONNECT_TWO_CALLS, + ], + supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], + ) + + +# ----------------------------------------------------------------------------- +def _default_ag_sdp_features() -> hfp.AgSdpFeature: + return ( + hfp.AgSdpFeature.WIDE_BAND + | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY + | hfp.AgSdpFeature.THREE_WAY_CALLING + ) + + +# ----------------------------------------------------------------------------- async def make_hfp_connections( - hf_config: hfp.Configuration, -) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]: + hf_config: Optional[hfp.HfConfiguration] = None, + ag_config: Optional[hfp.AgConfiguration] = None, +): + if not hf_config: + hf_config = _default_hf_configuration() + if not ag_config: + ag_config = _default_ag_configuration() + # Setup devices devices = TwoDevices() await devices.setup_connection() @@ -55,38 +140,371 @@ async def make_hfp_connections( # Setup HFP connection hf = hfp.HfProtocol(client_dlc, hf_config) - ag = hfp.HfpProtocol(server_dlc) - return hf, ag + ag = hfp.AgProtocol(server_dlc, ag_config) + + await hf.initiate_slc() + return (hf, ag) # ----------------------------------------------------------------------------- +@pytest_asyncio.fixture +async def hfp_connections(): + hf, ag = await make_hfp_connections() + hf_loop_task = asyncio.create_task(hf.run()) + + try: + yield (hf, ag) + finally: + # Close the coroutine. + hf.unsolicited_queue.put_nowait(None) + await hf_loop_task +# ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_slc(): - hf_config = hfp.Configuration( - supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[] - ) - hf, ag = await make_hfp_connections(hf_config) - - async def ag_loop(): - while line := await ag.next_line(): - if line.startswith('AT+BRSF'): - ag.send_response_line('+BRSF: 0') - elif line.startswith('AT+CIND=?'): - ag.send_response_line( - '+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),' - '("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),' - '("callheld",(0-2))' +async def test_slc_with_minimal_features(): + hf, ag = await make_hfp_connections( + hfp.HfConfiguration( + supported_audio_codecs=[], + supported_hf_features=[], + supported_hf_indicators=[], + ), + hfp.AgConfiguration( + supported_ag_call_hold_operations=[], + supported_ag_features=[], + supported_ag_indicators=[ + hfp.AgIndicatorState( + indicator=hfp.AgIndicator.CALL, + supported_values={0, 1}, + current_status=0, ) - elif line.startswith('AT+CIND?'): - ag.send_response_line('+CIND: 0,0,1,4,1,5,0') - ag.send_response_line('OK') + ], + supported_hf_indicators=[], + supported_audio_codecs=[], + ), + ) - ag_task = asyncio.create_task(ag_loop()) + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.indicator == b.indicator + assert a.current_status == b.current_status - await hf.initiate_slc() - ag_task.cancel() + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + assert hf.supported_ag_features == ag.supported_ag_features + assert hf.supported_hf_features == ag.supported_hf_features + assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations + for a, b in zip(hf.ag_indicators, ag.ag_indicators): + assert a.indicator == b.indicator + assert a.current_status == b.current_status + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + hf.on('ag_indicator', future.set_result) + + ag.update_ag_indicator(hfp.AgIndicator.CALL, 1) + + indicator: hfp.AgIndicatorState = await future + assert indicator.current_status == 1 + assert indicator.indicator == hfp.AgIndicator.CALL + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hf_indicator', future.set_result) + + await hf.execute_command('AT+BIEV=2,100') + + indicator: hfp.HfIndicatorState = await future + assert indicator.current_status == 100 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_codec_negotiation( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + futures = [ + asyncio.get_running_loop().create_future(), + asyncio.get_running_loop().create_future(), + ] + hf.on('codec_negotiation', futures[0].set_result) + ag.on('codec_negotiation', futures[1].set_result) + await ag.negotiate_codec(hfp.AudioCodec.MSBC) + + assert await futures[0] == await futures[1] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + NUMBER = 'ATD123456789' + + future = asyncio.get_running_loop().create_future() + ag.on('dial', future.set_result) + await hf.execute_command(f'ATD{NUMBER}') + + number: str = await future + assert number == NUMBER + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('answer', lambda: future.set_result(None)) + await hf.answer_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_reject_incoming_call( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.reject_incoming_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + + future = asyncio.get_running_loop().create_future() + ag.on('hang_up', lambda: future.set_result(None)) + await hf.terminate_call() + + await future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_query_calls_without_calls( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + + assert await hf.query_current_calls() == [] + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_query_calls_with_calls( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + ag.calls.append( + hfp.CallInfo( + index=1, + direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, + status=hfp.CallInfoStatus.ACTIVE, + mode=hfp.CallInfoMode.VOICE, + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + number='123456789', + ) + ) + + assert await hf.query_current_calls() == ag.calls + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "operation,", + ( + hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, + hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, + hfp.CallHoldOperation.ADD_HELD_CALL, + hfp.CallHoldOperation.CONNECT_TWO_CALLS, + ), +) +async def test_hold_call_without_call_index( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], + operation: hfp.CallHoldOperation, +): + hf, ag = hfp_connections + call_hold_future = asyncio.get_running_loop().create_future() + ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) + + await hf.execute_command(f"AT+CHLD={operation.value}") + + assert (await call_hold_future) == (operation, None) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +@pytest.mark.parametrize( + "operation,", + ( + hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, + hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, + ), +) +async def test_hold_call_with_call_index( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], + operation: hfp.CallHoldOperation, +): + hf, ag = hfp_connections + call_hold_future = asyncio.get_running_loop().create_future() + ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) + ag.calls.append( + hfp.CallInfo( + index=1, + direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, + status=hfp.CallInfoStatus.ACTIVE, + mode=hfp.CallInfoMode.VOICE, + multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, + number='123456789', + ) + ) + + await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}") + + assert (await call_hold_future) == (operation, 1) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + ring_future = asyncio.get_running_loop().create_future() + hf.on("ring", lambda: ring_future.set_result(None)) + + ag.send_ring() + + await ring_future + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + speaker_volume_future = asyncio.get_running_loop().create_future() + hf.on("speaker_volume", speaker_volume_future.set_result) + + ag.set_speaker_volume(10) + + assert await speaker_volume_future == 10 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_microphone_volume( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + microphone_volume_future = asyncio.get_running_loop().create_future() + hf.on("microphone_volume", microphone_volume_future.set_result) + + ag.set_microphone_volume(10) + + assert await microphone_volume_future == 10 + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): + hf, ag = hfp_connections + cli_notification_future = asyncio.get_running_loop().create_future() + hf.on("cli_notification", cli_notification_future.set_result) + + ag.send_cli_notification( + hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"") + ) + + assert await cli_notification_future == hfp.CallLineIdentification( + number="123456789", type=129, alpha="Bumble", subaddr="", satype=None + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_voice_recognition_from_hf( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + voice_recognition_future = asyncio.get_running_loop().create_future() + ag.on("voice_recognition", voice_recognition_future.set_result) + + await hf.execute_command("AT+BVRA=1") + + assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_voice_recognition_from_ag( + hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] +): + hf, ag = hfp_connections + voice_recognition_future = asyncio.get_running_loop().create_future() + hf.on("voice_recognition", voice_recognition_future.set_result) + + ag.send_response("+BVRA: 1") + + assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_hf_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records( + 1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_hf_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_hf_sdp_features(), + ) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_ag_sdp_record(): + devices = TwoDevices() + await devices.setup_connection() + + devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records( + 1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8 + ) + + assert await hfp.find_ag_sdp_record(devices.connections[1]) == ( + 2, + hfp.ProfileVersion.V1_8, + _default_ag_sdp_features(), + ) # ----------------------------------------------------------------------------- diff --git a/tests/rfcomm_test.py b/tests/rfcomm_test.py index 4ce4d11..5146c8b 100644 --- a/tests/rfcomm_test.py +++ b/tests/rfcomm_test.py @@ -32,6 +32,8 @@ from bumble.rfcomm import ( RFCOMM_PSM, ) +_TIMEOUT = 0.1 + # ----------------------------------------------------------------------------- def basic_frame_check(x): @@ -60,7 +62,7 @@ def test_frames(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio -async def test_basic_connection() -> None: +async def test_connection_and_disconnection() -> None: devices = test_utils.TwoDevices() await devices.setup_connection() @@ -81,6 +83,34 @@ async def test_basic_connection() -> None: dlcs[1].write(b'Lorem ipsum dolor sit amet') assert await queues[0].get() == b'Lorem ipsum dolor sit amet' + closed = asyncio.Event() + dlcs[1].on('close', closed.set) + await dlcs[1].disconnect() + await closed.wait() + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_receive_pdu_before_open_dlc_returns() -> None: + devices = await test_utils.TwoDevices.create_with_connection() + DATA = b'123' + + accept_future: asyncio.Future[DLC] = asyncio.get_running_loop().create_future() + channel = Server(devices[0]).listen(acceptor=accept_future.set_result) + + assert devices.connections[1] + multiplexer = await Client(devices.connections[1]).start() + open_dlc_task = asyncio.create_task(multiplexer.open_dlc(channel)) + + dlc_responder = await accept_future + dlc_responder.write(DATA) + + dlc_initiator = await open_dlc_task + dlc_initiator_queue = asyncio.Queue() # type: ignore[var-annotated] + dlc_initiator.sink = dlc_initiator_queue.put_nowait + + assert await asyncio.wait_for(dlc_initiator_queue.get(), timeout=_TIMEOUT) == DATA + # ----------------------------------------------------------------------------- @pytest.mark.asyncio diff --git a/tests/self_test.py b/tests/self_test.py index 259de02..5c68ea0 100644 --- a/tests/self_test.py +++ b/tests/self_test.py @@ -316,13 +316,13 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2): # Set up the pairing configs if pairing_config1: - two_devices.devices[ - 0 - ].pairing_config_factory = lambda connection: pairing_config1 + two_devices.devices[0].pairing_config_factory = ( + lambda connection: pairing_config1 + ) if pairing_config2: - two_devices.devices[ - 1 - ].pairing_config_factory = lambda connection: pairing_config2 + two_devices.devices[1].pairing_config_factory = ( + lambda connection: pairing_config2 + ) # Pair await two_devices.devices[0].pair(connection) diff --git a/tests/test_utils.py b/tests/test_utils.py index d193d6e..1f0b4f3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,7 +16,8 @@ # Imports # ----------------------------------------------------------------------------- import asyncio -from typing import List, Optional +from typing import List, Optional, Type +from typing_extensions import Self from bumble.controller import Controller from bumble.link import LocalLink @@ -81,6 +82,12 @@ class TwoDevices: def __getitem__(self, index: int) -> Device: return self.devices[index] + @classmethod + async def create_with_connection(cls: Type[Self]) -> Self: + devices = cls() + await devices.setup_connection() + return devices + # ----------------------------------------------------------------------------- async def async_barrier(): diff --git a/tests/transport_tcp_server_test.py b/tests/transport_tcp_server_test.py new file mode 100644 index 0000000..a5f015d --- /dev/null +++ b/tests/transport_tcp_server_test.py @@ -0,0 +1,64 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import os +import pytest +import socket +import unittest +from unittest.mock import ANY, patch + +from bumble.transport.tcp_server import ( + open_tcp_server_transport, + open_tcp_server_transport_with_socket, +) + + +class OpenTcpServerTransportTests(unittest.TestCase): + def setUp(self): + self.patcher = patch('bumble.transport.tcp_server._create_server') + self.mock_create_server = self.patcher.start() + + def tearDown(self): + self.patcher.stop() + + def test_open_with_spec(self): + asyncio.run(open_tcp_server_transport('localhost:32100')) + self.mock_create_server.assert_awaited_once_with( + ANY, host='localhost', port=32100 + ) + + def test_open_with_port_only_spec(self): + asyncio.run(open_tcp_server_transport('_:32100')) + self.mock_create_server.assert_awaited_once_with(ANY, host=None, port=32100) + + def test_open_with_socket(self): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + asyncio.run(open_tcp_server_transport_with_socket(sock=sock)) + self.mock_create_server.assert_awaited_once_with(ANY, sock=sock) + + +@pytest.mark.skipif( + not os.environ.get('PYTEST_NOSKIP', 0), + reason='''\ +Not hermetic. Should only run manually with + $ PYTEST_NOSKIP=1 pytest tests +''', +) +def test_open_with_real_socket(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(('localhost', 0)) + port = sock.getsockname()[1] + assert port != 0 + asyncio.run(open_tcp_server_transport_with_socket(sock=sock)) diff --git a/tools/rtk_fw_download.py b/tools/rtk_fw_download.py index 89c49b2..74c783b 100644 --- a/tools/rtk_fw_download.py +++ b/tools/rtk_fw_download.py @@ -49,6 +49,7 @@ LINUX_FROM_SCRATCH_SOURCE = ( False, ) + # ----------------------------------------------------------------------------- # Functions # ----------------------------------------------------------------------------- @@ -111,7 +112,7 @@ def main(output_dir, source, single, force, parse): for driver_info in rtk.Driver.DRIVER_INFOS ] - for (fw_name, config_name, config_needed) in images: + for fw_name, config_name, config_needed in images: print(color("---", "yellow")) fw_image_out = output_dir / fw_name if not force and fw_image_out.exists(): diff --git a/web/bumble.js b/web/bumble.js index cb807eb..c554bc2 100644 --- a/web/bumble.js +++ b/web/bumble.js @@ -24,6 +24,11 @@ class PacketSource { } class PacketSink { + constructor() { + this.queue = []; + this.isProcessing = false; + } + on_packet(packet) { if (!this.writer) { return; @@ -31,11 +36,24 @@ class PacketSink { const buffer = packet.toJs({create_proxies : false}); packet.destroy(); //console.log(`HCI[host->controller]: ${bufferToHex(buffer)}`); - // TODO: create an async queue here instead of blindly calling write without awaiting - this.writer(buffer); + this.queue.push(buffer); + this.processQueue(); + } + + async processQueue() { + if (this.isProcessing) { + return; + } + this.isProcessing = true; + while (this.queue.length > 0) { + const buffer = this.queue.shift(); + await this.writer(buffer); + } + this.isProcessing = false; } } + class LogEvent extends Event { constructor(message) { super('log'); @@ -185,4 +203,4 @@ export async function setupSimpleApp(appUrl, bumbleControls, log) { bumbleControls.onBumbleLoaded(); return app; -}
\ No newline at end of file +} |