Files
@ b168d439d9a0
Branch filter:
Location: linux-tools/kcdemu/cdemu/proxy.py
b168d439d9a0
5.2 KiB
text/x-python
initial commit. Version 1.0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | import dbus, dbus.service, dbus.exceptions
from event.sync import EventDispatcher
OP_DPM_EMU = 'dpm-emulation'
OP_TR_EMU = 'tr-emulation'
OP_BS_EMU = 'bad-sector-emulation'
OP_DEV_ID = 'device-id'
class CDEmuDaemonProxy(EventDispatcher):
_name = 'net.sf.cdemu.CDEmuDaemon'
_object_path = '/Daemon'
def __init__(self, use_system=False):
"""
Low-level abstraction for communitating with cdemu daemon
:param autostart_daemon: Attempt to start daemon if it is not running
:param use_system: Not recommended. Consider this option deprecated. Left as a compatibility option
"""
super().__init__(
'daemon_started',
'daemon_stopped',
'device_status_changed',
'device_option_changed',
'device_mapping_ready',
'device_added',
'device_removed'
)
self._is_running = False
self.use_system = use_system
if self.use_system:
self.bus = dbus.SystemBus()
else:
self.bus = dbus.SessionBus()
self.bus.add_signal_receiver(self.from_daemon, dbus_interface='net.sf.cdemu.CDEmuDaemon',
member_keyword='signal', message_keyword='params')
self.bus.add_signal_receiver(self.from_dbus, dbus_interface='org.freedesktop.DBus',
member_keyword='signal', message_keyword='params')
self.obj = None
self.interface = None
self.connect()
@property
def is_running(self):
return self._is_running
def connect(self, retry=True):
self.obj = self.bus.get_object(self._name, self._object_path)
self.interface = dbus.Interface(self.obj, dbus_interface='net.sf.cdemu.CDEmuDaemon')
self.ping_daemon(retry)
def ping_daemon(self, retry=True):
try:
self.obj.Ping(dbus_interface='org.freedesktop.DBus.Peer')
self._is_running = True
self.dispatch('daemon_started')
except dbus.exceptions.DBusException as e:
if e.get_dbus_name() == 'org.freedesktop.DBus.Error.ServiceUnknown' and retry:
self.connect(False)
def from_dbus(self, *args, signal, params):
print('from_dbus', '|'.join([str(a) for a in args]), signal, params)
if signal == 'NameOwnerChanged':
if args[0] == self._name:
if args[2]:
self._is_running = True
self.dispatch('daemon_started')
else:
self._is_running = False
self.dispatch('daemon_stopped')
def from_daemon(self, *args, signal, params):
print('from_daemon', '|'.join([str(a) for a in args]), signal, params)
if signal == "DeviceStatusChanged":
self.dispatch('device_status_changed', *args)
elif signal == "DeviceOptionChanged":
self.dispatch('device_option_changed', *args)
elif signal == "DeviceMappingReady":
self.dispatch('device_mapping_ready', *args)
elif signal == "DeviceAdded":
self.dispatch('device_added', *args)
elif signal == "DeviceRemoved":
self.dispatch('device_removed', *args)
def get_daemon_version(self):
return self.interface.GetDaemonVersion()
def get_library_version(self):
return self.interface.GetLibraryVersion()
def get_daemon_interface_version2(self):
return self.interface.GetDaemonInterfaceVersion2()
def enum_daemon_debug_masks(self):
return self.interface.EnumDaemonDebugMasks()
def enum_library_debug_masks(self):
return self.interface.EnumLibraryDebugMasks()
def enum_supported_parsers(self):
return self.interface.EnumSupportedParsers()
def enum_supported_writers(self):
return self.interface.EnumSupportedWriters()
def enum_supported_filter_streams(self):
return self.interface.EnumSupportedFilterStreams()
def enum_writer_parameters(self, writer_id):
return self.interface.EnumWriterParameters('(s)', writer_id)
def get_number_of_devices(self):
return self.interface.GetNumberOfDevices()
def device_get_mapping(self, device_number):
return self.interface.DeviceGetMapping(device_number)
def device_get_status(self, device_number):
return self.interface.DeviceGetStatus(device_number)
def device_load(self, device_number, filenames, parameters={}):
return self.interface.DeviceLoad(device_number, filenames, parameters)
def device_create_blank(self, device_number, filename, parameters):
return self.interface.DeviceCreateBlank(device_number, filename, parameters)
def device_unload(self, device_number):
return self.interface.DeviceUnload(device_number)
def device_get_option(self, device_number, option_name):
return self.interface.DeviceGetOption(device_number, option_name)
def device_set_option(self, device_number, option_name, option_value):
return self.interface.DeviceSetOption(device_number, option_name, option_value)
def add_device(self):
return self.interface.AddDevice()
def remove_device(self):
return self.interface.RemoveDevice()
|