Files
custum-hyprpanel/services/bluetooth.py

174 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
import subprocess
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
AGENT_PATH = "/test/agent"
CAPABILITY = "NoInputNoOutput"
def send_notification_with_actions(title, message, actions, action_handler):
bus = dbus.SessionBus()
notification_object = bus.get_object("org.freedesktop.Notifications", "/org/freedesktop/Notifications")
notify_interface = dbus.Interface(notification_object, "org.freedesktop.Notifications")
bus.add_signal_receiver(action_handler,
dbus_interface="org.freedesktop.Notifications",
signal_name="ActionInvoked")
notify_interface.Notify("bluetooth_agent", 0, "", title, message, actions, {}, -1)
class Agent(dbus.service.Object):
def __init__(self, bus):
dbus.service.Object.__init__(self, bus, AGENT_PATH)
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
def Release(self):
logging.info("Release")
def get_device_name(self, device):
bus = dbus.SystemBus()
device_proxy = bus.get_object("org.bluez", device)
device_properties = dbus.Interface(device_proxy, "org.freedesktop.DBus.Properties")
return device_properties.Get("org.bluez.Device1", "Name")
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
def RequestPinCode(self, device):
device_name = self.get_device_name(device)
logging.info(f"RequestPinCode {device_name} ({device})")
self.request_input("Enter PIN code", f"Enter PIN code for device {device_name}", device, "pin")
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="u")
def RequestPasskey(self, device):
device_name = self.get_device_name(device)
logging.info(f"RequestPasskey {device_name} ({device})")
self.request_input("Enter Passkey", f"Enter passkey for device {device_name}", device, "passkey")
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
def DisplayPasskey(self, device, passkey):
device_name = self.get_device_name(device)
logging.info(f"DisplayPasskey {device_name} passkey {passkey}")
send_notification_with_actions("Bluetooth Pairing Request", f"Passkey for device {device_name} is {passkey}", [], lambda *args: None)
@dbus.service.method("org.bluez.Agent1", in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
device_name = self.get_device_name(device)
logging.info(f"RequestConfirmation {device_name} passkey {passkey}")
actions = ["confirm", "Confirm", "deny", "Deny"]
def action_handler(notification_id, action_key):
if action_key == "confirm":
logging.info(f"Confirmed pairing for {device_name}")
self.send_reply(device)
elif action_key == "deny":
logging.info(f"Denied pairing for {device_name}")
self.send_error(device, "org.bluez.Error.Rejected")
send_notification_with_actions("Bluetooth Pairing Request",
f"Confirm passkey {passkey} for device {device_name}",
actions, action_handler)
return
@dbus.service.method("org.bluez.Agent1", in_signature="o", out_signature="")
def RequestAuthorization(self, device):
device_name = self.get_device_name(device)
logging.info(f"RequestAuthorization {device_name}")
actions = ["confirm", "Confirm", "deny", "Deny"]
def action_handler(notification_id, action_key):
if action_key == "confirm":
logging.info(f"Authorized device {device_name}")
self.send_reply(device)
elif action_key == "deny":
logging.info(f"Denied authorization for {device_name}")
self.send_error(device, "org.bluez.Error.Rejected")
send_notification_with_actions("Bluetooth Service Authorization",
f"Authorize device {device_name}",
actions, action_handler)
return
@dbus.service.method("org.bluez.Agent1", in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
device_name = self.get_device_name(device)
logging.info(f"AuthorizeService {device_name} uuid {uuid}")
actions = ["confirm", "Confirm", "deny", "Deny"]
def action_handler(notification_id, action_key):
if action_key == "confirm":
logging.info(f"Authorized service {uuid} for device {device_name}")
self.send_reply(device)
elif action_key == "deny":
logging.info(f"Denied authorization for service {uuid} on device {device_name}")
self.send_error(device, "org.bluez.Error.Rejected")
send_notification_with_actions("Bluetooth Service Authorization",
f"Authorize service {uuid} for device {device_name}",
actions, action_handler)
return
@dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="")
def Cancel(self):
logging.info("Cancel")
def request_input(self, title, message, device, input_type):
def action_handler(notification_id, action_key):
if action_key == "input":
result = subprocess.run(["zenity", "--entry", "--title", title, "--text", message], capture_output=True, text=True)
user_input = result.stdout.strip()
if input_type == "pin":
self.handle_pin_input(device, user_input)
elif input_type == "passkey":
self.handle_passkey_input(device, user_input)
elif action_key == "cancel":
self.send_error(device, "org.bluez.Error.Rejected")
actions = ["input", "Enter", "cancel", "Cancel"]
send_notification_with_actions(title, message, actions, action_handler)
def handle_pin_input(self, device, pin_code):
logging.info(f"PIN code entered for {device}: {pin_code}")
self.send_reply(device)
def handle_passkey_input(self, device, passkey):
logging.info(f"Passkey entered for {device}: {passkey}")
self.send_reply(device)
def send_reply(self, device):
logging.info(f"Sending reply for {device}")
bus = dbus.SystemBus()
agent = bus.get_object("org.bluez", device)
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
agent_interface.Pair(reply_handler=self.success_callback, error_handler=self.error_callback)
def send_error(self, device, error):
logging.info(f"Sending error for {device}")
bus = dbus.SystemBus()
agent = bus.get_object("org.bluez", device)
agent_interface = dbus.Interface(agent, "org.bluez.Device1")
agent_interface.CancelPairing(reply_handler=self.success_callback, error_handler=self.error_callback)
def success_callback(self):
logging.info("Operation succeeded")
def error_callback(self, error):
logging.error(f"Operation failed: {error}")
def register_agent():
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.AgentManager1")
path = AGENT_PATH
agent = Agent(bus)
manager.RegisterAgent(path, CAPABILITY)
manager.RequestDefaultAgent(path)
logging.info("Agent registered")
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
register_agent()
loop = GLib.MainLoop()
loop.run()