import os
import dbus
class MountException(Exception):
pass
class MountManager:
instance = None
def __init__(self):
self.bus = dbus.SystemBus()
self.manager = self.bus.get_object('org.freedesktop.UDisks2', '/org/freedesktop/UDisks2/Manager')
self.manager_interface = dbus.Interface(self.manager, 'org.freedesktop.UDisks2.Manager')
self.interfaces = {}
@classmethod
def get(cls):
if not cls.instance:
cls.instance = cls()
return cls.instance
def mount(self, source):
f = os.open(source, os.O_RDWR)
path = self.manager_interface.LoopSetup(f, {
'read-only': ('b', False)
})
device = self.bus.get_object('org.freedesktop.UDisks2', path)
self.interfaces[source] = dbus.Interface(device, 'org.freedesktop.UDisks2.Filesystem')
self.interfaces[source].Mount({'options': 'rw'})
dbus.Interface(device, 'org.freedesktop.UDisks2.Loop').SetAutoclear(True, [])
mount_point = self.interfaces[source].proxy_object.Get(
'org.freedesktop.UDisks2.Filesystem', 'MountPoints', dbus_interface='org.freedesktop.DBus.Properties'
)[0]
mount_point = ''.join([str(v) for v in mount_point])
return mount_point.rstrip(chr(0))
def dispose_all(self):
"""
Run this on program closing if you care about your os state to be clean and beautiful
:return:
"""
for s in self.interfaces:
if self.is_mounted(s):
self.interfaces[s].Unmount([])
else:
dbus.Interface(self.interfaces[s].proxy_object, 'org.freedesktop.UDisks2.Loop').Delete([])
self.interfaces.clear()
def is_mounted(self, source):
return source in self.interfaces and bool(
self.interfaces[source].proxy_object.Get(
'org.freedesktop.UDisks2.Filesystem', 'MountPoints', dbus_interface='org.freedesktop.DBus.Properties'
)
)
def unmount(self, source):
self.interfaces[source].Unmount([])
del self.interfaces[source]