## # wmi - a lightweight Python wrapper around Microsoft's WMI interface # # Windows Management Instrumentation (WMI) is Microsoft's answer to # the DMTF's Common Information Model. It allows you to query just # about any conceivable piece of information from any computer which # is running the necessary agent and over which have you the # necessary authority. # # The implementation is by means of COM/DCOM and most of the examples # assume you're running one of Microsoft's scripting technologies. # Fortunately, Mark Hammond's pywin32 has pretty much all you need # for a workable Python adaptation. I haven't tried any of the fancier # stuff like Async calls and so on, so I don't know if they'd work. # # Since the COM implementation doesn't give much away to Python # programmers, I've wrapped it in some lightweight classes with # some getattr / setattr magic to ease the way. In particular: # #
# disks = wmi.WMI ().Win32_LogicalDisk () ## # In addition, you can specify what would become the WHERE clause # as keyword parameters: # #
# fixed_disks = wmi.WMI ().Win32_LogicalDisk (DriveType = 3) ##
# for p in wmi.WMI ().Win32_Process (): # if p.Name.lower () == 'notepad.exe': # p.Terminate (Result=1) ##
# for p in wmi.WMI ().Win32_Process (): # if p.Name.lower () == 'notepad.exe': # for r in p.references (): # print r.Name ##
# process = wmi.WMI ().Win32_Process # process.Create (CommandLine="notepad.exe") ## #
# import wmi
#
# vodev1 = wmi.WMI ("vodev1")
# for disk in vodev1.Win32_LogicalDisk ():
# if disk.DriveType == 3:
# space = 100 * long (disk.FreeSpace) / long (disk.Size)
# print "%s has %d%% free" % (disk.Name, space)
#
#
pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
print pp0.path ().RelPath
"""
try:
return self.ole_object.Path_
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def derivation (self):
"""Return a tuple representing the object derivation for
this object, with the most specific object first. eg,
pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
print ' <- '.join (pp0.derivation ())
"""
try:
return self.ole_object.Derivation_
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def associators (self, wmi_association_class="", wmi_result_class=""):
"""Return a list of objects related to this one, optionally limited
either by association class (ie the name of the class which relates
them) or by result class (ie the name of the class which would be
retrieved)
c = wmi.WMI ()
pp = c.Win32_ParallelPort ()[0]
for i in pp.associators (wmi_association_class="Win32_PortResource"):
print i
for i in pp.associators (wmi_result_class="Win32_PnPEntity"):
print i
"""
try:
return [
_wmi_object (i) for i in \
self.ole_object.Associators_ (
strAssocClass=wmi_association_class,
strResultClass=wmi_result_class
)
]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def references (self, wmi_class=""):
"""Return a list of associations involving this object, optionally
limited by the result class (the name of the association class).
NB Associations are treated specially; although WMI only returns
the string corresponding to the instance of each associated object,
this module will automatically convert that to the object itself.
c = wmi.WMI ()
sp = c.Win32_SerialPort ()[0]
for i in sp.references ():
print i
for i in sp.references (wmi_class="Win32_SerialPortSetting"):
print i
"""
try:
return [_wmi_object (i) for i in self.ole_object.References_ (strResultClass=wmi_class)]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
#
# class _wmi_event
#
class _wmi_event (_wmi_object):
"""Slight extension of the _wmi_object class to allow
objects which are the result of events firing to return
extra information such as the type of event.
"""
event_type_re = re.compile ("__Instance(Creation|Modification|Deletion)Event")
def __init__ (self, event, event_info):
_wmi_object.__init__ (self, event)
_set (self, "event_type", None)
_set (self, "timestamp", None)
_set (self, "previous", None)
if event_info:
event_type = self.event_type_re.match (event_info.Path_.Class).group (1).lower ()
_set (self, "event_type", event_type)
if hasattr (event_info, "TIME_CREATED"):
_set (self, "timestamp", from_1601 (event_info.TIME_CREATED))
if hasattr (event_info, "PreviousInstance"):
_set (self, "previous", event_info.PreviousInstance)
#
# class _wmi_class
#
class _wmi_class (_wmi_object):
"""Currying class to assist in issuing queries against
a WMI namespace. The idea is that when someone issues
an otherwise unknown method against the WMI object, if
it matches a known WMI class a query object will be
returned which may then be called with one or more params
which will form the WHERE clause. eg,
c = wmi.WMI () c_drive = c.Win32_LogicalDisk (Name='C:')""" def __init__ (self, namespace, wmi_class): _wmi_object.__init__ (self, wmi_class) _set (self, "_class_name", wmi_class.Path_.Class) if namespace: _set (self, "_namespace", namespace) else: class_moniker = wmi_class.Path_.DisplayName winmgmts, namespace_moniker, class_name = class_moniker.split (":") namespace = _wmi_namespace (GetObject (winmgmts + ":" + namespace_moniker), False) _set (self, "_namespace", namespace) def query (self, fields=[], **where_clause): """Make it slightly easier to query against the class, by calling the namespace's query with the class preset. Won't work if the class has been instantiated directly. """ if self._namespace is None: raise x_wmi_no_namespace, u"You cannot query directly from a WMI class" try: field_list = u", ".join (fields) or u"*" wql = u"SELECT " + field_list + u" FROM " + self._class_name if where_clause: wql += u" WHERE " + u" AND ". join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items ()]) return self._namespace.query (wql, self, fields) except pywintypes.com_error, error_info: handle_com_error (error_info) __call__ = query def watch_for ( self, notification_type=u"operation", delay_secs=1, **where_clause ): if self._namespace is None: raise x_wmi_no_namespace, u"You cannot watch directly from a WMI class" return self._namespace.watch_for ( notification_type=notification_type, wmi_class=self, delay_secs=delay_secs, **where_clause ) def instances (self): """Return a list of instances of the WMI class """ try: return [_wmi_object (instance, self) for instance in self.Instances_ ()] except pywintypes.com_error, error_info: handle_com_error (error_info) def new (self, **kwargs): """This is the equivalent to the raw-WMI SpawnInstance_ method. Note that there are relatively few uses for this, certainly fewer than you might imagine. Most classes which need to create a new *real* instance of themselves, eg Win32_Process, offer a .Create method. SpawnInstance_ is generally reserved for instances which are passed as parameters to such .Create methods, a common example being the Win32_SecurityDescriptor, passed to Win32_Share.Create and other instances which need security. The example here is Win32_ProcessStartup, which controls the shown/hidden state etc. of a new Win32_Process instance.
import win32con
import wmi
c = wmi.WMI ()
startup = c.Win32_ProcessStartup.new (ShowWindow=win32con.SW_SHOWMINIMIZED)
pid, retval = c.Win32_Process.Create (
CommandLine="notepad.exe",
ProcessStartupInformation=startup
)
NB previous versions of this module, used this function
to create new process. This is *not* a good example
of its use; it is better handled with something like
the example above.
"""
try:
obj = _wmi_object (self.SpawnInstance_ (), self)
obj.set (**kwargs)
return obj
except pywintypes.com_error, error_info:
handle_com_error (error_info)
#
# class _wmi_result
#
class _wmi_result:
"""Simple, data only result for targeted WMI queries which request
data only result classes via fetch_as_classes.
"""
def __init__(self, obj, attributes):
if attributes:
for attr in attributes:
self.__dict__[attr] = obj.Properties_ (attr).Value
else:
for p in obj.Properties_:
attr = p.Name
self.__dict__[attr] = obj.Properties_(attr).Value
#
# class WMI
#
class _wmi_namespace:
"""A WMI root of a computer system. The classes attribute holds a list
of the classes on offer. This means you can explore a bit with
things like this:
c = wmi.WMI ()
for i in c.classes:
if "user" in i.lower ():
print i
"""
def __init__ (self, namespace, find_classes):
_set (self, "_namespace", namespace)
#
# wmi attribute preserved for backwards compatibility
#
_set (self, "wmi", namespace)
# Initialise the "classes" attribute, to avoid infinite recursion in the
# __getattr__ method (which uses it).
self.classes = {}
#
# Pick up the list of classes under this namespace
# so that they can be queried, and used as though
# properties of the namespace by means of the __getattr__
# hook below.
# If the namespace does not support SubclassesOf, carry on
# regardless
#
if find_classes:
try:
self.classes.update (self.subclasses_of ())
except AttributeError:
pass
def __repr__ (self):
return u"<_wmi_namespace: %s>" % self.wmi
def __str__ (self):
return repr (self)
def get (self, moniker):
try:
return _wmi_object (self.wmi.Get (moniker))
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def handle (self):
"""The raw OLE object representing the WMI namespace"""
return self._namespace
def subclasses_of (self, root="", regex=r".*"):
classes = {}
for c in self._namespace.SubclassesOf (root):
klass = c.Path_.Class
if re.match (regex, klass):
classes[klass] = None
return classes
def instances (self, class_name):
"""Return a list of instances of the WMI class. This is
(probably) equivalent to querying with no qualifiers.
system.instances ("Win32_LogicalDisk")
# should be the same as
system.Win32_LogicalDisk ()
"""
try:
return [_wmi_object (obj) for obj in self._namespace.InstancesOf (class_name)]
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def new (self, wmi_class, **kwargs):
"""This is now implemented by a call to _wmi_namespace.new (qv)"""
return getattr (self, wmi_class).new (**kwargs)
new_instance_of = new
def _raw_query (self, wql):
"""Execute a WQL query and return its raw results. Use the flags
recommended by Microsoft to achieve a read-only, semi-synchronous
query where the time is taken while looping through. Should really
be a generator, but ...
NB Backslashes need to be doubled up.
"""
flags = wbemFlagReturnImmediately | wbemFlagForwardOnly
wql = wql.replace (u"\\", u"\\\\")
if _DEBUG: print u"_raw_query(wql):", wql
try:
return self._namespace.ExecQuery (strQuery=wql, iFlags=flags)
except pywintypes.com_error, (hresult, hresult_text, additional, param_in_error):
raise WMI_EXCEPTIONS.get (hresult, x_wmi (hresult))
def query (self, wql, instance_of=None, fields=[]):
"""Perform an arbitrary query against a WMI object, and return
a list of _wmi_object representations of the results.
"""
return [ _wmi_object (obj, instance_of, fields) for obj in self._raw_query(wql) ]
def fetch_as_classes (self, wmi_classname, fields=(), **where_clause):
"""Build and execute a wql query to fetch the specified list of fields from
the specified wmi_classname + where_clause, then return the results as
a list of simple class instances with attributes matching fields_list.
If fields is left empty, select * and pre-load all class attributes for
each class returned.
"""
wql = u"SELECT %s FROM %s" % (fields and u", ".join (fields) or u"*", wmi_classname)
if where_clause:
wql += u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items()])
return [_wmi_result (obj, fields) for obj in self._raw_query(wql)]
def fetch_as_lists (self, wmi_classname, fields, **where_clause):
"""Build and execute a wql query to fetch the specified list of fields from
the specified wmi_classname + where_clause, then return the results as
a list of lists whose values correspond fields_list.
"""
wql = u"SELECT %s FROM %s" % (u", ".join (fields), wmi_classname)
if where_clause:
wql += u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items()])
results = []
for obj in self._raw_query(wql):
results.append ([obj.Properties_ (field).Value for field in fields])
return results
def watch_for (
self,
raw_wql=None,
notification_type=u"operation",
wmi_class=None,
delay_secs=1,
**where_clause
):
"""Set up an event tracker on a WMI event. This function
returns an wmi_watcher which can be called to get the
next event. eg,
c = wmi.WMI ()
raw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Process'"
watcher = c.watch_for (raw_wql=raw_wql)
while 1:
process_created = watcher ()
print process_created.Name
# or
watcher = c.watch_for (
notification_type="Creation",
wmi_class="Win32_Process",
delay_secs=2,
Name='calc.exe'
)
calc_created = watcher ()
Now supports timeout on the call to watcher, eg:
import pythoncom
import wmi
c = wmi.WMI (privileges=["Security"])
watcher1 = c.watch_for (
notification_type="Creation",
wmi_class="Win32_NTLogEvent",
Type="error"
)
watcher2 = c.watch_for (
notification_type="Creation",
wmi_class="Win32_NTLogEvent",
Type="warning"
)
while 1:
try:
error_log = watcher1 (500)
except wmi.x_wmi_timed_out:
pythoncom.PumpWaitingMessages ()
else:
print error_log
try:
warning_log = watcher2 (500)
except wmi.x_wmi_timed_out:
pythoncom.PumpWaitingMessages ()
else:
print warning_log
"""
if wmi_class:
if isinstance (wmi_class, _wmi_class):
class_name = wmi_class._class_name
else:
class_name = wmi_class
wmi_class = getattr (self, class_name)
is_extrinsic = u"__ExtrinsicEvent" in wmi_class.derivation ()
else:
class_name = is_extrinsic = None
if raw_wql:
wql = raw_wql
else:
if is_extrinsic:
if where_clause:
where = u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items ()])
else:
where = u""
wql = u"SELECT * FROM " + class_name + where
else:
if where_clause:
where = u" AND " + u" AND ".join ([u"TargetInstance.%s = '%s'" % (k, v) for k, v in where_clause.items ()])
else:
where = u""
wql = \
u"SELECT * FROM __Instance%sEvent WITHIN %d WHERE TargetInstance ISA '%s' %s" % \
(notification_type, delay_secs, class_name, where)
if _DEBUG: print wql
try:
return _wmi_watcher (self._namespace.ExecNotificationQuery (wql), is_extrinsic=is_extrinsic)
except pywintypes.com_error, error_info:
handle_com_error (error_info)
def __getattr__ (self, attribute):
"""Offer WMI classes as simple attributes. Pass through any untrapped
unattribute to the underlying OLE object. This means that new or
unmapped functionality is still available to the module user.
"""
#
# Don't try to match against known classes as was previously
# done since the list may not have been requested
# (find_classes=False).
#
try:
return self._cached_classes (attribute)
except pywintypes.com_error, error_info:
try:
return self._cached_classes ("Win32_" + attribute)
except pywintypes.com_error, error_info:
return getattr (self._namespace, attribute)
def _cached_classes (self, class_name):
"""Standard caching helper which keeps track of classes
already retrieved by name and returns the existing object
if found. If this is the first retrieval, store it and
pass it back
"""
if self.classes.get (class_name) is None:
self.classes[class_name] = _wmi_class (self, self._namespace.Get (class_name))
return self.classes[class_name]
def _getAttributeNames (self):
"""Return list of classes for IPython completion engine"""
classes = [str (x) for x in self.classes.keys () if not x.startswith ('__')]
return classes
#
# class _wmi_watcher
#
class _wmi_watcher:
"""Helper class for WMI.watch_for below (qv)"""
_event_property_map = {
"TargetInstance" : _wmi_object,
"PreviousInstance" : _wmi_object
}
def __init__ (self, wmi_event, is_extrinsic):
self.wmi_event = wmi_event
self.is_extrinsic = is_extrinsic
def __call__ (self, timeout_ms=-1):
"""When called, return the instance which caused the event. Supports
timeout in milliseconds (defaulting to infinite). If the watcher
times out, x_wmi_timed_out is raised. This makes it easy to support
watching for multiple objects.
"""
try:
event = self.wmi_event.NextEvent (timeout_ms)
if self.is_extrinsic:
return _wmi_event (event, None)
else:
return _wmi_event (
event.Properties_ ("TargetInstance").Value,
_wmi_object (event, property_map=self._event_property_map)
)
except pywintypes.com_error, error_info:
hresult_code, hresult_name, additional_info, parameter_in_error = error_info
if additional_info:
wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info
if scode == wbemErrTimedout:
raise x_wmi_timed_out
handle_com_error (error_info)
PROTOCOL = "winmgmts:"
IMPERSONATION_LEVEL = "impersonate"
AUTHENTICATION_LEVEL = "default"
NAMESPACE = "root/cimv2"
def connect (
computer=".",
impersonation_level="",
authentication_level="",
authority="",
privileges="",
moniker="",
wmi=None,
namespace="",
suffix="",
user="",
password="",
find_classes=True,
debug=False
):
"""The WMI constructor can either take a ready-made moniker or as many
parts of one as are necessary. Eg,
c = wmi.WMI (moniker="winmgmts:{impersonationLevel=Delegate}//remote")
# or
c = wmi.WMI (computer="remote", privileges=["!RemoteShutdown", "Security"])
I daren't link to a Microsoft URL; they change so often. Try Googling for
WMI construct moniker and see what it comes back with.
For complete control, a named argument "wmi" can be supplied, which
should be a SWbemServices object, which you create yourself. Eg,
loc = win32com.client.Dispatch("WbemScripting.SWbemLocator")
svc = loc.ConnectServer(...)
c = wmi.WMI(wmi=svc)
This is the only way of connecting to a remote computer with a different
username, as the moniker syntax does not allow specification of a user
name.
If the "wmi" parameter is supplied, all other parameters are ignored.
"""
global _DEBUG
_DEBUG = debug
#
# If namespace is a blank string, leave
# it unaltered as it might to trying to
# access the root namespace
#
#if namespace is None:
# namespace = NAMESPACE
try:
if wmi:
obj = wmi
elif moniker:
if not moniker.startswith (PROTOCOL):
moniker = PROTOCOL + moniker
if _DEBUG: print moniker
obj = GetObject (moniker)
else:
if namespace:
parts = re.split (r"[/\\]", namespace)
if parts[0] != 'root':
parts.insert (0, "root")
namespace = "/".join (parts)
if user:
if impersonation_level or authentication_level or privileges or suffix:
raise x_wmi, u"You can't specify an impersonation, authentication or privilege as well as a username"
else:
obj = connect_server (
server=computer,
namespace=namespace,
user=user,
password=password,
authority=authority
)
else:
moniker = construct_moniker (
computer=computer,
impersonation_level=impersonation_level or IMPERSONATION_LEVEL,
authentication_level=authentication_level or AUTHENTICATION_LEVEL,
authority=authority,
privileges=privileges,
namespace=namespace,
suffix=suffix
)
if _DEBUG: print moniker
obj = GetObject (moniker)
wmi_type = get_wmi_type (obj)
if wmi_type == "namespace":
return _wmi_namespace (obj, find_classes)
elif wmi_type == "class":
return _wmi_class (None, obj)
elif wmi_type == "instance":
return _wmi_object (obj)
else:
raise x_wmi, u"Unknown moniker type"
except pywintypes.com_error, error_info:
handle_com_error (error_info)
WMI = connect
def construct_moniker (
computer=None,
impersonation_level="Impersonate",
authentication_level="Default",
authority=None,
privileges=None,
namespace=None,
suffix=None
):
security = []
if impersonation_level: security.append ("impersonationLevel=%s" % impersonation_level)
if authentication_level: security.append ("authenticationLevel=%s" % authentication_level)
#
# Use of the authority descriptor is invalid on the local machine
#
if authority and computer: security.append ("authority=%s" % authority)
if privileges: security.append ("(%s)" % ", ".join (privileges))
moniker = [PROTOCOL]
if security: moniker.append ("{%s}/" % ",".join (security))
if computer: moniker.append ("/%s/" % computer)
if namespace:
moniker.append (namespace)
if suffix: moniker.append (":%s" % suffix)
return "".join (moniker)
def get_wmi_type (obj):
try:
path = obj.Path_
except AttributeError:
return "namespace"
else:
if path.IsClass:
return "class"
else:
return "instance"
def connect_server (
server,
namespace = "",
user = "",
password = "",
locale = "",
authority = "",
security_flags = 0,
named_value_set = None
):
"""Return a remote server running WMI
server - name of the server
namespace - namespace to connect to: defaults to whatever's defined as default
user - username to connect as, either local or domain (dom\name or user@domain for XP)
password: leave blank to use current context
locale: desired locale in form MS_XXXX (eg MS_409 for Am En)
authority: either "Kerberos:" or an NT domain. Not needed if included in user
security_flags: if 0, connect will wait forever; if 0x80, connect will timeout at 2 mins
named_value_set: typically empty, otherwise a context-specific SWbemNamedValueSet
c = wmi.WMI (wmi=wmi.connect_server (server="remote_machine", user="myname", password="mypassword"))""" if _DEBUG: print server print namespace print user print password print locale print authority print security_flags print named_value_set return Dispatch ("WbemScripting.SWbemLocator").\ ConnectServer ( server, namespace, user, password, locale, authority, security_flags, named_value_set ) def Registry ( computer=None, impersonation_level="Impersonate", authentication_level="Default", authority=None, privileges=None, moniker=None ): if not moniker: moniker = construct_moniker ( computer=computer, impersonation_level=impersonation_level, authentication_level=authentication_level, authority=authority, privileges=privileges, namespace="root/default", suffix="StdRegProv" ) try: return _wmi_object (GetObject (moniker)) except pywintypes.com_error, error_info: handle_com_error (error_info) # # From a post to python-win32 by Sean # def machines_in_domain (domain_name): adsi = Dispatch ("ADsNameSpaces") nt = adsi.GetObject ("","WinNT:") result = nt.OpenDSObject ("WinNT://%s" % domain_name, "", "", 0) result.Filter = ["computer"] domain = [] for machine in result: domain.append (machine.Name) return domain # # Typical use test # if __name__ == '__main__': system = WMI () for my_computer in system.Win32_ComputerSystem (): print u"Disks on", my_computer.Name for disk in system.Win32_LogicalDisk (): print disk.Caption, disk.Description, disk.ProviderName or ""