Updated notifications to handle actions and added a blue dbus messenger to listen for bluetooth actions.
This commit is contained in:
173
services/bluetooth.py
Executable file
173
services/bluetooth.py
Executable file
@@ -0,0 +1,173 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import dbus
|
||||
import dbus.service
|
||||
import dbus.mainloop.glib
|
||||
from gi.repository import GLib
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
AGENT_PATH = "/test/agent"
|
||||
CAPABILITY = "NoInputNoOutput"
|
||||
|
||||
def send_notification_with_actions(title, message, actions, action_handler):
|
||||
bus = dbus.SessionBus()
|
||||
notification_object = bus.get_object("org.freedesktop.Notifications", "/org/freedesktop/Notifications")
|
||||
notify_interface = dbus.Interface(notification_object, "org.freedesktop.Notifications")
|
||||
bus.add_signal_receiver(action_handler,
|
||||
dbus_interface="org.freedesktop.Notifications",
|
||||
signal_name="ActionInvoked")
|
||||
notify_interface.Notify("bluetooth_agent", 0, "", title, message, actions, {}, -1)
|
||||
|
||||
class Agent(dbus.service.Object):
|
||||
def __init__(self, bus):
|
||||
dbus.service.Object.__init__(self, bus, AGENT_PATH)
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
|
||||
def Release(self):
|
||||
logging.info("Release")
|
||||
|
||||
def get_device_name(self, device):
|
||||
bus = dbus.SystemBus()
|
||||
device_proxy = bus.get_object("org.bluez", device)
|
||||
device_properties = dbus.Interface(device_proxy, "org.freedesktop.DBus.Properties")
|
||||
return device_properties.Get("org.bluez.Device1", "Name")
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
|
||||
def RequestPinCode(self, device):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"RequestPinCode {device_name} ({device})")
|
||||
self.request_input("Enter PIN code", f"Enter PIN code for device {device_name}", device, "pin")
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="u")
|
||||
def RequestPasskey(self, device):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"RequestPasskey {device_name} ({device})")
|
||||
self.request_input("Enter Passkey", f"Enter passkey for device {device_name}", device, "passkey")
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
|
||||
def DisplayPasskey(self, device, passkey):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"DisplayPasskey {device_name} passkey {passkey}")
|
||||
send_notification_with_actions("Bluetooth Pairing Request", f"Passkey for device {device_name} is {passkey}", [], lambda *args: None)
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
|
||||
def RequestConfirmation(self, device, passkey):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"RequestConfirmation {device_name} passkey {passkey}")
|
||||
actions = ["confirm", "Confirm", "deny", "Deny"]
|
||||
|
||||
def action_handler(notification_id, action_key):
|
||||
if action_key == "confirm":
|
||||
logging.info(f"Confirmed pairing for {device_name}")
|
||||
self.send_reply(device)
|
||||
elif action_key == "deny":
|
||||
logging.info(f"Denied pairing for {device_name}")
|
||||
self.send_error(device, "org.bluez.Error.Rejected")
|
||||
|
||||
send_notification_with_actions("Bluetooth Pairing Request",
|
||||
f"Confirm passkey {passkey} for device {device_name}",
|
||||
actions, action_handler)
|
||||
return
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
|
||||
def RequestAuthorization(self, device):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"RequestAuthorization {device_name}")
|
||||
actions = ["confirm", "Confirm", "deny", "Deny"]
|
||||
|
||||
def action_handler(notification_id, action_key):
|
||||
if action_key == "confirm":
|
||||
logging.info(f"Authorized device {device_name}")
|
||||
self.send_reply(device)
|
||||
elif action_key == "deny":
|
||||
logging.info(f"Denied authorization for {device_name}")
|
||||
self.send_error(device, "org.bluez.Error.Rejected")
|
||||
|
||||
send_notification_with_actions("Bluetooth Service Authorization",
|
||||
f"Authorize device {device_name}",
|
||||
actions, action_handler)
|
||||
return
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="os", out_signature="")
|
||||
def AuthorizeService(self, device, uuid):
|
||||
device_name = self.get_device_name(device)
|
||||
logging.info(f"AuthorizeService {device_name} uuid {uuid}")
|
||||
actions = ["confirm", "Confirm", "deny", "Deny"]
|
||||
|
||||
def action_handler(notification_id, action_key):
|
||||
if action_key == "confirm":
|
||||
logging.info(f"Authorized service {uuid} for device {device_name}")
|
||||
self.send_reply(device)
|
||||
elif action_key == "deny":
|
||||
logging.info(f"Denied authorization for service {uuid} on device {device_name}")
|
||||
self.send_error(device, "org.bluez.Error.Rejected")
|
||||
|
||||
send_notification_with_actions("Bluetooth Service Authorization",
|
||||
f"Authorize service {uuid} for device {device_name}",
|
||||
actions, action_handler)
|
||||
return
|
||||
|
||||
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
|
||||
def Cancel(self):
|
||||
logging.info("Cancel")
|
||||
|
||||
def request_input(self, title, message, device, input_type):
|
||||
def action_handler(notification_id, action_key):
|
||||
if action_key == "input":
|
||||
result = subprocess.run(["zenity", "--entry", "--title", title, "--text", message], capture_output=True, text=True)
|
||||
user_input = result.stdout.strip()
|
||||
if input_type == "pin":
|
||||
self.handle_pin_input(device, user_input)
|
||||
elif input_type == "passkey":
|
||||
self.handle_passkey_input(device, user_input)
|
||||
elif action_key == "cancel":
|
||||
self.send_error(device, "org.bluez.Error.Rejected")
|
||||
|
||||
actions = ["input", "Enter", "cancel", "Cancel"]
|
||||
send_notification_with_actions(title, message, actions, action_handler)
|
||||
|
||||
def handle_pin_input(self, device, pin_code):
|
||||
logging.info(f"PIN code entered for {device}: {pin_code}")
|
||||
self.send_reply(device)
|
||||
|
||||
def handle_passkey_input(self, device, passkey):
|
||||
logging.info(f"Passkey entered for {device}: {passkey}")
|
||||
self.send_reply(device)
|
||||
|
||||
def send_reply(self, device):
|
||||
logging.info(f"Sending reply for {device}")
|
||||
bus = dbus.SystemBus()
|
||||
agent = bus.get_object("org.bluez", device)
|
||||
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
|
||||
agent_interface.Pair(reply_handler=self.success_callback, error_handler=self.error_callback)
|
||||
|
||||
def send_error(self, device, error):
|
||||
logging.info(f"Sending error for {device}")
|
||||
bus = dbus.SystemBus()
|
||||
agent = bus.get_object("org.bluez", device)
|
||||
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
|
||||
agent_interface.CancelPairing(reply_handler=self.success_callback, error_handler=self.error_callback)
|
||||
|
||||
def success_callback(self):
|
||||
logging.info("Operation succeeded")
|
||||
|
||||
def error_callback(self, error):
|
||||
logging.error(f"Operation failed: {error}")
|
||||
|
||||
def register_agent():
|
||||
bus = dbus.SystemBus()
|
||||
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.AgentManager1")
|
||||
path = AGENT_PATH
|
||||
agent = Agent(bus)
|
||||
manager.RegisterAgent(path, CAPABILITY)
|
||||
manager.RequestDefaultAgent(path)
|
||||
logging.info("Agent registered")
|
||||
|
||||
if __name__ == "__main__":
|
||||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||||
register_agent()
|
||||
loop = GLib.MainLoop()
|
||||
loop.run()
|
||||
Reference in New Issue
Block a user