You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Bleak的Python蓝牙扫描器出现重复扫描问题求助

使用Bleak的Python蓝牙扫描器出现重复扫描问题求助

各位好,我现在基于Python的Bleak库开发了一个BLE扫描工具,核心目的是做车辆识别。目前遇到一个头疼的问题:同一个BLE设备会被重复检测到——不管是我的笔记本,还是其他测试设备,都存在这种重复扫描的情况。

我的需求其实很明确,就是要准确记录设备的出现、消失情况,但现在的重复数据严重干扰了后续的车辆识别逻辑。我翻了Bleak的官方文档和相关讨论,没找到已知的会导致这种重复扫描的副作用,所以不确定是我代码逻辑有疏漏,还是BLE扫描本身的特性导致的,想请大家帮忙排查一下。

先简单说下我的代码逻辑:

  1. 启动时让用户选择扫描器编号(1或2),用来区分不同的扫描设备
  2. 用几个字典分别记录:设备的发现次数、首次发现时间、末次发现时间、两次发现的最大间隔
  3. 扫描过程中会实时把设备的MAC地址、RSSI等数据写入带时间戳的TXT文件
  4. 支持按Enter键手动停止扫描

下面是完整的代码:

# -*- coding: utf-8 -*-
""" Source: https://bleak.readthedocs.io/en/latest/api/scanner.html """
import asyncio
from bleak import BleakScanner
import time

# identifier code for the scanning device
device_id = ""
while device_id not in ("1", "2"):
    device_id = input("Enter a 1 or a 2 for either scanner 1 or scanner 2:")

# running dictionary of discovered devices with discovery count
discovered = {}
# when the device was first discoverd
first_discover = {}
# when the device was last discovered
last_discover = {}
# max gap between discovery
max_discover_gap = {}

maxcount = 0
maxtime = 0

#Dynamic File Names
timestamp_str = time.strftime("%Y-%m-%d_%H-%M")
TXT_FILE = "scanner_" + device_id + "_" + "ble_data_" + timestamp_str + ".txt"

async def main():
    stop_event = asyncio.Event()

    # initializing scanning recording file
    # TODO: does not currently allow the program to run twice
    # How we would handle the event where the program is interrupted and
    # has to run again must be decided
    f = open(TXT_FILE, "w", encoding="utf-8")
    try:
        f.write("[Device]\n" + device_id + "\n[BTLECAPT]\n")
        f.flush()

        # Asyncronous event detection for stopping the program
        async def wait_for_enter():
            print("Press Enter to stop scanning...")
            # do more research on async event loops
            await asyncio.get_event_loop().run_in_executor(None, input)
            #stops the current loop
            stop_event.set()

        def callback(device, advertising_data):
            global maxcount, maxtime
            # todo: do something with incoming data
            print(device.address, advertising_data.rssi)
            discover_time = time.time()

            # If undiscovered initialize discovery
            if device.address not in discovered:
                # begins discovery count
                discovered[device.address] = 1
                # Setting the first instance it was discovered
                first_discover[device.address] = discover_time
                # setting up the largest discovery gap between devices
                max_discover_gap[device.address] = 0
            else:
                # increments discovery
                discovered[device.address] = discovered[device.address] + 1
                # calculate gap in discovery time
                gap = discover_time - last_discover[device.address]
                # if gap is greater than discover gap then update value
                if gap > max_discover_gap[device.address]:
                    max_discover_gap[device.address] = gap

            # set last discovered to new discover time
            last_discover[device.address] = discover_time

            time_diff = last_discover[device.address] - first_discover[device.address]

            # writes scanned data into our file
            # creating and adding the scan info to record file
            f.write(str(round(discover_time,2))+ ","+ device.address[:8] + "," + device.address + "," + str(advertising_data.rssi) + "\n")
            f.flush()

        # allows wait_for_enter to run concurrently to other subroutines
        asyncio.create_task(wait_for_enter())

        # call bleakscanner and bleakscanner delivers the device, advertising_data into the callback function
        async with BleakScanner(callback) as scanner:
            # Important! Wait for an event to trigger stop, otherwise scanner
            # will stop immediately.
            await stop_event.wait()

        # writing an ending line to scanned data
        f.write("[BTLECAPT]\n")
        f.flush()
    finally:
        f.close()

asyncio.run(main())

想请教大家几个问题:

  1. 这种同一个设备被重复扫描到的情况,是BLE被动扫描的正常特性吗?如果是,有没有标准的过滤方法?
  2. 我的代码逻辑里有没有疏漏,导致了不必要的重复计数?比如在设备发现的判断、状态更新上有没有问题?
  3. 针对车辆识别的需求,我应该怎么优化扫描逻辑?比如应该以“首次发现”作为设备出现的标志,还是需要结合间隔时间判断设备是否持续存在?

麻烦各位帮忙看看,谢谢啦!

火山引擎 最新活动