import os import dbus class MountException(Exception): pass class MountManager: instance = None def __init__(self): self.bus = dbus.SystemBus() self.manager = self.bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2/Manager') self.manager_interface = dbus.Interface(self.manager, 'org.freedesktop.UDisks2.Manager') self.interfaces = {} @classmethod def get(cls): if not cls.instance: cls.instance = cls() return cls.instance def mount(self, source): f = os.open(source, os.O_RDWR) path = self.manager_interface.LoopSetup(f, { 'read-only': ('b', False) }) device = self.bus.get_object('org.freedesktop.UDisks2', path) self.interfaces[source] = dbus.Interface(device, 'org.freedesktop.UDisks2.Filesystem') self.interfaces[source].Mount({'options': 'rw'}) dbus.Interface(device, 'org.freedesktop.UDisks2.Loop').SetAutoclear(True, []) mount_point = self.interfaces[source].proxy_object.Get( 'org.freedesktop.UDisks2.Filesystem', 'MountPoints', dbus_interface='org.freedesktop.DBus.Properties' )[0] mount_point = ''.join([str(v) for v in mount_point]) return mount_point.rstrip(chr(0)) def dispose_all(self): """ Run this on program closing if you care about your os state to be clean and beautiful :return: """ for s in self.interfaces: if self.is_mounted(s): self.interfaces[s].Unmount([]) else: dbus.Interface(self.interfaces[s].proxy_object, 'org.freedesktop.UDisks2.Loop').Delete([]) self.interfaces.clear() def is_mounted(self, source): return source in self.interfaces and bool( self.interfaces[source].proxy_object.Get( 'org.freedesktop.UDisks2.Filesystem', 'MountPoints', dbus_interface='org.freedesktop.DBus.Properties' ) ) def unmount(self, source): self.interfaces[source].Unmount([]) del self.interfaces[source]