174 lines
7.7 KiB
Python
Executable File
174 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import dbus
|
|
import dbus.service
|
|
import dbus.mainloop.glib
|
|
from gi.repository import GLib
|
|
import subprocess
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
AGENT_PATH = "/test/agent"
|
|
CAPABILITY = "NoInputNoOutput"
|
|
|
|
def send_notification_with_actions(title, message, actions, action_handler):
|
|
bus = dbus.SessionBus()
|
|
notification_object = bus.get_object("org.freedesktop.Notifications", "/org/freedesktop/Notifications")
|
|
notify_interface = dbus.Interface(notification_object, "org.freedesktop.Notifications")
|
|
bus.add_signal_receiver(action_handler,
|
|
dbus_interface="org.freedesktop.Notifications",
|
|
signal_name="ActionInvoked")
|
|
notify_interface.Notify("bluetooth_agent", 0, "", title, message, actions, {}, -1)
|
|
|
|
class Agent(dbus.service.Object):
|
|
def __init__(self, bus):
|
|
dbus.service.Object.__init__(self, bus, AGENT_PATH)
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
|
|
def Release(self):
|
|
logging.info("Release")
|
|
|
|
def get_device_name(self, device):
|
|
bus = dbus.SystemBus()
|
|
device_proxy = bus.get_object("org.bluez", device)
|
|
device_properties = dbus.Interface(device_proxy, "org.freedesktop.DBus.Properties")
|
|
return device_properties.Get("org.bluez.Device1", "Name")
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
|
|
def RequestPinCode(self, device):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"RequestPinCode {device_name} ({device})")
|
|
self.request_input("Enter PIN code", f"Enter PIN code for device {device_name}", device, "pin")
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="u")
|
|
def RequestPasskey(self, device):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"RequestPasskey {device_name} ({device})")
|
|
self.request_input("Enter Passkey", f"Enter passkey for device {device_name}", device, "passkey")
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
|
|
def DisplayPasskey(self, device, passkey):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"DisplayPasskey {device_name} passkey {passkey}")
|
|
send_notification_with_actions("Bluetooth Pairing Request", f"Passkey for device {device_name} is {passkey}", [], lambda *args: None)
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
|
|
def RequestConfirmation(self, device, passkey):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"RequestConfirmation {device_name} passkey {passkey}")
|
|
actions = ["confirm", "Confirm", "deny", "Deny"]
|
|
|
|
def action_handler(notification_id, action_key):
|
|
if action_key == "confirm":
|
|
logging.info(f"Confirmed pairing for {device_name}")
|
|
self.send_reply(device)
|
|
elif action_key == "deny":
|
|
logging.info(f"Denied pairing for {device_name}")
|
|
self.send_error(device, "org.bluez.Error.Rejected")
|
|
|
|
send_notification_with_actions("Bluetooth Pairing Request",
|
|
f"Confirm passkey {passkey} for device {device_name}",
|
|
actions, action_handler)
|
|
return
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
|
|
def RequestAuthorization(self, device):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"RequestAuthorization {device_name}")
|
|
actions = ["confirm", "Confirm", "deny", "Deny"]
|
|
|
|
def action_handler(notification_id, action_key):
|
|
if action_key == "confirm":
|
|
logging.info(f"Authorized device {device_name}")
|
|
self.send_reply(device)
|
|
elif action_key == "deny":
|
|
logging.info(f"Denied authorization for {device_name}")
|
|
self.send_error(device, "org.bluez.Error.Rejected")
|
|
|
|
send_notification_with_actions("Bluetooth Service Authorization",
|
|
f"Authorize device {device_name}",
|
|
actions, action_handler)
|
|
return
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="os", out_signature="")
|
|
def AuthorizeService(self, device, uuid):
|
|
device_name = self.get_device_name(device)
|
|
logging.info(f"AuthorizeService {device_name} uuid {uuid}")
|
|
actions = ["confirm", "Confirm", "deny", "Deny"]
|
|
|
|
def action_handler(notification_id, action_key):
|
|
if action_key == "confirm":
|
|
logging.info(f"Authorized service {uuid} for device {device_name}")
|
|
self.send_reply(device)
|
|
elif action_key == "deny":
|
|
logging.info(f"Denied authorization for service {uuid} on device {device_name}")
|
|
self.send_error(device, "org.bluez.Error.Rejected")
|
|
|
|
send_notification_with_actions("Bluetooth Service Authorization",
|
|
f"Authorize service {uuid} for device {device_name}",
|
|
actions, action_handler)
|
|
return
|
|
|
|
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
|
|
def Cancel(self):
|
|
logging.info("Cancel")
|
|
|
|
def request_input(self, title, message, device, input_type):
|
|
def action_handler(notification_id, action_key):
|
|
if action_key == "input":
|
|
result = subprocess.run(["zenity", "--entry", "--title", title, "--text", message], capture_output=True, text=True)
|
|
user_input = result.stdout.strip()
|
|
if input_type == "pin":
|
|
self.handle_pin_input(device, user_input)
|
|
elif input_type == "passkey":
|
|
self.handle_passkey_input(device, user_input)
|
|
elif action_key == "cancel":
|
|
self.send_error(device, "org.bluez.Error.Rejected")
|
|
|
|
actions = ["input", "Enter", "cancel", "Cancel"]
|
|
send_notification_with_actions(title, message, actions, action_handler)
|
|
|
|
def handle_pin_input(self, device, pin_code):
|
|
logging.info(f"PIN code entered for {device}: {pin_code}")
|
|
self.send_reply(device)
|
|
|
|
def handle_passkey_input(self, device, passkey):
|
|
logging.info(f"Passkey entered for {device}: {passkey}")
|
|
self.send_reply(device)
|
|
|
|
def send_reply(self, device):
|
|
logging.info(f"Sending reply for {device}")
|
|
bus = dbus.SystemBus()
|
|
agent = bus.get_object("org.bluez", device)
|
|
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
|
|
agent_interface.Pair(reply_handler=self.success_callback, error_handler=self.error_callback)
|
|
|
|
def send_error(self, device, error):
|
|
logging.info(f"Sending error for {device}")
|
|
bus = dbus.SystemBus()
|
|
agent = bus.get_object("org.bluez", device)
|
|
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
|
|
agent_interface.CancelPairing(reply_handler=self.success_callback, error_handler=self.error_callback)
|
|
|
|
def success_callback(self):
|
|
logging.info("Operation succeeded")
|
|
|
|
def error_callback(self, error):
|
|
logging.error(f"Operation failed: {error}")
|
|
|
|
def register_agent():
|
|
bus = dbus.SystemBus()
|
|
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.AgentManager1")
|
|
path = AGENT_PATH
|
|
agent = Agent(bus)
|
|
manager.RegisterAgent(path, CAPABILITY)
|
|
manager.RequestDefaultAgent(path)
|
|
logging.info("Agent registered")
|
|
|
|
if __name__ == "__main__":
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
register_agent()
|
|
loop = GLib.MainLoop()
|
|
loop.run()
|