## # wmi - a lightweight Python wrapper around Microsoft's WMI interface # # Windows Management Instrumentation (WMI) is Microsoft's answer to # the DMTF's Common Information Model. It allows you to query just # about any conceivable piece of information from any computer which # is running the necessary agent and over which have you the # necessary authority. # # The implementation is by means of COM/DCOM and most of the examples # assume you're running one of Microsoft's scripting technologies. # Fortunately, Mark Hammond's pywin32 has pretty much all you need # for a workable Python adaptation. I haven't tried any of the fancier # stuff like Async calls and so on, so I don't know if they'd work. # # Since the COM implementation doesn't give much away to Python # programmers, I've wrapped it in some lightweight classes with # some getattr / setattr magic to ease the way. In particular: # # # # Many thanks, obviously to Mark Hammond for creating the win32all # extensions, but also to Alex Martelli and Roger Upole, whose # c.l.py postings pointed me in the right direction. # Thanks especially in release 1.2 to Paul Tiemann for his code # contributions and robust testing. # # (c) Tim Golden - mail at timgolden.me.uk 5th June 2003 # Licensed under the (GPL-compatible) MIT License: # http://www.opensource.org/licenses/mit-license.php # # For change history see CHANGELOG.TXT ## try: True, False except NameError: True = 1 False = 0 try: object except NameError: class object: pass __VERSION__ = "1.3.3" _DEBUG = False import sys import re import datetime from win32com.client import GetObject, Dispatch import pywintypes class ProvideConstants (object): """ A class which, when called on a win32com.client.Dispatch object, provides lazy access to constants defined in the typelib. They can be accessed as attributes of the _constants property. From Thomas Heller on c.l.py """ def __init__(self, comobj): "@param comobj A COM object whose typelib constants are to be exposed" comobj.__dict__["_constants"] = self # Get the typelibrary's typecomp interface self.__typecomp = \ comobj._oleobj_.GetTypeInfo().GetContainingTypeLib()[0].GetTypeComp() def __getattr__(self, name): if name.startswith("__") and name.endswith("__"): raise AttributeError, name result = self.__typecomp.Bind(name) # Bind returns a 2-tuple, first item is TYPEKIND, # the second item has the value if not result[0]: raise AttributeError, name return result[1].value obj = GetObject ("winmgmts:") ProvideConstants (obj) wbemErrInvalidQuery = obj._constants.wbemErrInvalidQuery wbemErrTimedout = obj._constants.wbemErrTimedout wbemFlagReturnImmediately = obj._constants.wbemFlagReturnImmediately wbemFlagForwardOnly = obj._constants.wbemFlagForwardOnly def handle_com_error (error_info): """Convenience wrapper for displaying all manner of COM errors. Raises a x_wmi exception with more useful information attached @param error_info The structure attached to a pywintypes.com_error """ hresult_code, hresult_name, additional_info, parameter_in_error = error_info exception_string = [u"%s - %s" % (hex (hresult_code), hresult_name.decode (sys.stdout.encoding))] if additional_info: wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info exception_string.append (u" Error in: %s" % source_of_error.decode (sys.stdout.encoding)) exception_string.append (u" %s - %s" % (hex (scode), (error_description or u"").decode (sys.stdout.encoding).strip ())) raise x_wmi, u"\n".join (exception_string) BASE = datetime.datetime (1601, 1, 1) def from_1601 (ns100): return BASE + datetime.timedelta (microseconds=int (ns100) / 10) def from_time (year=None, month=None, day=None, hours=None, minutes=None, seconds=None, microseconds=None, timezone=None): """ Convenience wrapper to take a series of date/time elements and return a WMI time of the form yyyymmddHHMMSS.mmmmmm+UUU. All elements may be int, string or omitted altogether. If omitted, they will be replaced in the output string by a series of stars of the appropriate length. @param year The year element of the date/time @param month The month element of the date/time @param day The day element of the date/time @param hours The hours element of the date/time @param minutes The minutes element of the date/time @param seconds The seconds element of the date/time @param microseconds The microseconds element of the date/time @param timezone The timeezone element of the date/time @return A WMI datetime string of the form: yyyymmddHHMMSS.mmmmmm+UUU """ def str_or_stars (i, length): if i is None: return u"*" * length else: return unicode (i).rjust (length, u"0") wmi_time = u"" wmi_time += str_or_stars (year, 4) wmi_time += str_or_stars (month, 2) wmi_time += str_or_stars (day, 2) wmi_time += str_or_stars (hours, 2) wmi_time += str_or_stars (minutes, 2) wmi_time += str_or_stars (seconds, 2) wmi_time += "." wmi_time += str_or_stars (microseconds, 6) wmi_time += str_or_stars (timezone, 4) return wmi_time def to_time (wmi_time): """ Convenience wrapper to take a WMI datetime string of the form yyyymmddHHMMSS.mmmmmm+UUU and return a 9-tuple containing the individual elements, or None where string contains placeholder stars. @param wmi_time The WMI datetime string in yyyymmddHHMMSS.mmmmmm+UUU format @return A 9-tuple of (year, month, day, hours, minutes, seconds, microseconds, timezone) """ def int_or_none (s, start, end): try: return int (s[start:end]) except ValueError: return None year = int_or_none (wmi_time, 0, 4) month = int_or_none (wmi_time, 4, 6) day = int_or_none (wmi_time, 6, 8) hours = int_or_none (wmi_time, 8, 10) minutes = int_or_none (wmi_time, 10, 12) seconds = int_or_none (wmi_time, 12, 14) microseconds = int_or_none (wmi_time, 15, 21) timezone = wmi_time[21:] return year, month, day, hours, minutes, seconds, microseconds, timezone # # Exceptions # class x_wmi (Exception): pass class x_wmi_invalid_query (x_wmi): pass class x_wmi_timed_out (x_wmi): pass class x_wmi_no_namespace (x_wmi): pass WMI_EXCEPTIONS = { wbemErrInvalidQuery : x_wmi_invalid_query, wbemErrTimedout : x_wmi_timed_out } def _set (obj, attribute, value): """ Helper function to add an attribute directly into the instance dictionary, bypassing possible __getattr__ calls @param obj Any python object @param attribute String containing attribute name @param value Any python object """ obj.__dict__[attribute] = value class _wmi_method: """ A currying sort of wrapper around a WMI method name. It abstract's the method's parameters and can be called like a normal Python object passing in the parameter values. Output parameters are returned from the call as a tuple. In addition, the docstring is set up as the method's signature, including an indication as to whether any given parameter is expecting an array, and what special privileges are required to call the method. """ def __init__ (self, ole_object, method_name): """ @param ole_object The WMI class/instance whose method is to be called @param method_name The name of the method to be called """ try: self.ole_object = Dispatch (ole_object) self.method = ole_object.Methods_ (method_name) self.qualifiers = {} for q in self.method.Qualifiers_: self.qualifiers[q.Name] = q.Value self.provenance = u"\n".join (self.qualifiers.get ("MappingStrings", [])) self.in_parameters = self.method.InParameters self.out_parameters = self.method.OutParameters if self.in_parameters is None: self.in_parameter_names = [] else: self.in_parameter_names = [(i.Name, i.IsArray) for i in self.in_parameters.Properties_] if self.out_parameters is None: self.out_parameter_names = [] else: self.out_parameter_names = [(i.Name, i.IsArray) for i in self.out_parameters.Properties_] doc = u"%s (%s) => (%s)" % ( method_name, ", ".join ([name + (u"", u"[]")[is_array] for (name, is_array) in self.in_parameter_names]), ", ".join ([name + (u"", u"[]")[is_array] for (name, is_array) in self.out_parameter_names]) ) privileges = self.qualifiers.get ("Privileges", []) if privileges: doc += u" | Needs: " + ", ".join (privileges) self.__doc__ = doc except pywintypes.com_error, error_info: handle_com_error (error_info) def __call__ (self, *args, **kwargs): """ Execute the call to a WMI method, returning a tuple (even if is of only one value) containing the out and return parameters. """ try: if self.in_parameters: parameter_names = {} for name, is_array in self.in_parameter_names: parameter_names[name] = is_array parameters = self.in_parameters # # Check positional parameters first # for n_arg in range (len (args)): arg = args[n_arg] parameter = parameters.Properties_[n_arg] if parameter.IsArray: try: list (arg) except TypeError: raise TypeError, u"parameter %d must be iterable" % n_arg parameter.Value = arg # # If any keyword param supersedes a positional one, # it'll simply overwrite it. # for k, v in kwargs.items (): is_array = parameter_names.get (k) if is_array is None: raise AttributeError, u"%s is not a valid parameter for %s" % (k, self.__doc__) else: if is_array: try: list (v) except TypeError: raise TypeError, u"%s must be iterable" % k parameters.Properties_ (k).Value = v result = self.ole_object.ExecMethod_ (self.method.Name, self.in_parameters) else: result = self.ole_object.ExecMethod_ (self.method.Name) results = [] for name, is_array in self.out_parameter_names: value = result.Properties_ (name).Value if is_array: # # Thanks to Jonas Bjering for bug report and patch # results.append (list (value or [])) else: results.append (value) return tuple (results) except pywintypes.com_error, error_info: handle_com_error (error_info) def __repr__ (self): return "" % self.__doc__ # # class _wmi_object # class _wmi_object: "A lightweight wrapper round an OLE WMI object" def __init__ (self, ole_object, instance_of=None, fields=[], property_map={}): try: _set (self, "ole_object", ole_object) _set (self, "_instance_of", instance_of) _set (self, "properties", {}) _set (self, "methods", {}) _set (self, "property_map", property_map) if fields: for field in fields: self.properties[field] = None else: for p in ole_object.Properties_: self.properties[p.Name] = None for m in ole_object.Methods_: self.methods[m.Name] = None _set (self, "_properties", self.properties.keys ()) _set (self, "_methods", self.methods.keys ()) _set (self, "qualifiers", {}) for q in self.ole_object.Qualifiers_: self.qualifiers[q.Name] = q.Value _set (self, "is_association", self.qualifiers.has_key ("Association")) except pywintypes.com_error, error_info: handle_com_error (error_info) def __str__ (self): """ For a call to print [object] return the OLE description of the properties / values of the object """ try: return self.ole_object.GetObjectText_ () except pywintypes.com_error, error_info: handle_com_error (error_info) def __repr__ (self): """ Indicate both the fact that this is a wrapped WMI object and the WMI object's own identifying class. """ try: return u"<%s: %s>" % (self.__class__.__name__, str (self.Path_.Path)) except pywintypes.com_error, error_info: handle_com_error (error_info) def _cached_properties (self, attribute): if self.properties[attribute] is None: self.properties[attribute] = self.ole_object.Properties_ (attribute) return self.properties[attribute] def _cached_methods (self, attribute): if self.methods[attribute] is None: self.methods[attribute] = _wmi_method (self.ole_object, attribute) return self.methods[attribute] def __getattr__ (self, attribute): """ Attempt to pass attribute calls to the proxied COM object. If the attribute is recognised as a property, return its value; if it is recognised as a method, return a method wrapper which can then be called with parameters; otherwise pass the lookup on to the underlying object. """ try: if self.properties.has_key (attribute): factory = self.property_map.get (attribute, lambda x: x) value = factory (self._cached_properties (attribute).Value) # # If this is an association, its properties are # actually the paths to the two aspects of the # association, so translate them automatically # into WMI objects. # if self.is_association: return WMI (moniker=value) else: return value elif self.methods.has_key (attribute): return self._cached_methods (attribute) else: return getattr (self.ole_object, attribute) except pywintypes.com_error, error_info: handle_com_error (error_info) def __setattr__ (self, attribute, value): """ If the attribute to be set is valid for the proxied COM object, set that objects's parameter value; if not, raise an exception. """ try: if self.properties.has_key (attribute): self._cached_properties (attribute).Value = value if self.ole_object.Path_.Path: self.ole_object.Put_ () else: raise AttributeError, attribute except pywintypes.com_error, error_info: handle_com_error (error_info) def __eq__ (self, other): """ Use WMI's CompareTo_ to compare this object with another. Don't try to do anything if the other object is not a wmi object. It might be possible to compare this object's unique key with a string or something, but this doesn't seem to be universal enough to merit a special case. """ if isinstance (other, self.__class__): return self.ole_object.CompareTo_ (other.ole_object) else: raise x_wmi, u"Can't compare a WMI object with something else" def _getAttributeNames (self): """Return list of methods/properties for IPython completion""" attribs = [str (x) for x in self.methods.keys ()] attribs.extend ([str (x) for x in self.properties.keys ()]) return attribs def put (self): self.ole_object.Put_ () def set (self, **kwargs): """ Set several properties of the underlying object at one go. This is particularly useful in combination with the new () method below. However, an instance which has been spawned in this way won't have enough information to write pack, so only try if the instance has a path. """ if kwargs: try: for attribute, value in kwargs.items (): if self.properties.has_key (attribute): self._cached_properties (attribute).Value = value else: raise AttributeError, attribute # # Only try to write the attributes # back if the object exists. # if self.ole_object.Path_.Path: self.ole_object.Put_ () except pywintypes.com_error, error_info: handle_com_error (error_info) def path (self): """ Return the WMI URI to this object. Can be used to determine the path relative to the parent namespace. eg,
    pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
    print pp0.path ().RelPath
    
""" try: return self.ole_object.Path_ except pywintypes.com_error, error_info: handle_com_error (error_info) def derivation (self): """Return a tuple representing the object derivation for this object, with the most specific object first. eg, pp0 = wmi.WMI ().Win32_ParallelPort ()[0] print ' <- '.join (pp0.derivation ()) """ try: return self.ole_object.Derivation_ except pywintypes.com_error, error_info: handle_com_error (error_info) def associators (self, wmi_association_class="", wmi_result_class=""): """Return a list of objects related to this one, optionally limited either by association class (ie the name of the class which relates them) or by result class (ie the name of the class which would be retrieved)
c = wmi.WMI ()
pp = c.Win32_ParallelPort ()[0]

for i in pp.associators (wmi_association_class="Win32_PortResource"):
  print i

for i in pp.associators (wmi_result_class="Win32_PnPEntity"):
  print i
    
""" try: return [ _wmi_object (i) for i in \ self.ole_object.Associators_ ( strAssocClass=wmi_association_class, strResultClass=wmi_result_class ) ] except pywintypes.com_error, error_info: handle_com_error (error_info) def references (self, wmi_class=""): """Return a list of associations involving this object, optionally limited by the result class (the name of the association class). NB Associations are treated specially; although WMI only returns the string corresponding to the instance of each associated object, this module will automatically convert that to the object itself.
    c =  wmi.WMI ()
    sp = c.Win32_SerialPort ()[0]

    for i in sp.references ():
      print i

    for i in sp.references (wmi_class="Win32_SerialPortSetting"):
      print i
    
""" try: return [_wmi_object (i) for i in self.ole_object.References_ (strResultClass=wmi_class)] except pywintypes.com_error, error_info: handle_com_error (error_info) # # class _wmi_event # class _wmi_event (_wmi_object): """Slight extension of the _wmi_object class to allow objects which are the result of events firing to return extra information such as the type of event. """ event_type_re = re.compile ("__Instance(Creation|Modification|Deletion)Event") def __init__ (self, event, event_info): _wmi_object.__init__ (self, event) _set (self, "event_type", None) _set (self, "timestamp", None) _set (self, "previous", None) if event_info: event_type = self.event_type_re.match (event_info.Path_.Class).group (1).lower () _set (self, "event_type", event_type) if hasattr (event_info, "TIME_CREATED"): _set (self, "timestamp", from_1601 (event_info.TIME_CREATED)) if hasattr (event_info, "PreviousInstance"): _set (self, "previous", event_info.PreviousInstance) # # class _wmi_class # class _wmi_class (_wmi_object): """Currying class to assist in issuing queries against a WMI namespace. The idea is that when someone issues an otherwise unknown method against the WMI object, if it matches a known WMI class a query object will be returned which may then be called with one or more params which will form the WHERE clause. eg,
  c = wmi.WMI ()
  c_drive = c.Win32_LogicalDisk (Name='C:')
  
""" def __init__ (self, namespace, wmi_class): _wmi_object.__init__ (self, wmi_class) _set (self, "_class_name", wmi_class.Path_.Class) if namespace: _set (self, "_namespace", namespace) else: class_moniker = wmi_class.Path_.DisplayName winmgmts, namespace_moniker, class_name = class_moniker.split (":") namespace = _wmi_namespace (GetObject (winmgmts + ":" + namespace_moniker), False) _set (self, "_namespace", namespace) def query (self, fields=[], **where_clause): """Make it slightly easier to query against the class, by calling the namespace's query with the class preset. Won't work if the class has been instantiated directly. """ if self._namespace is None: raise x_wmi_no_namespace, u"You cannot query directly from a WMI class" try: field_list = u", ".join (fields) or u"*" wql = u"SELECT " + field_list + u" FROM " + self._class_name if where_clause: wql += u" WHERE " + u" AND ". join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items ()]) return self._namespace.query (wql, self, fields) except pywintypes.com_error, error_info: handle_com_error (error_info) __call__ = query def watch_for ( self, notification_type=u"operation", delay_secs=1, **where_clause ): if self._namespace is None: raise x_wmi_no_namespace, u"You cannot watch directly from a WMI class" return self._namespace.watch_for ( notification_type=notification_type, wmi_class=self, delay_secs=delay_secs, **where_clause ) def instances (self): """Return a list of instances of the WMI class """ try: return [_wmi_object (instance, self) for instance in self.Instances_ ()] except pywintypes.com_error, error_info: handle_com_error (error_info) def new (self, **kwargs): """This is the equivalent to the raw-WMI SpawnInstance_ method. Note that there are relatively few uses for this, certainly fewer than you might imagine. Most classes which need to create a new *real* instance of themselves, eg Win32_Process, offer a .Create method. SpawnInstance_ is generally reserved for instances which are passed as parameters to such .Create methods, a common example being the Win32_SecurityDescriptor, passed to Win32_Share.Create and other instances which need security. The example here is Win32_ProcessStartup, which controls the shown/hidden state etc. of a new Win32_Process instance.
    import win32con
    import wmi
    c = wmi.WMI ()
    startup = c.Win32_ProcessStartup.new (ShowWindow=win32con.SW_SHOWMINIMIZED)
    pid, retval = c.Win32_Process.Create (
      CommandLine="notepad.exe",
      ProcessStartupInformation=startup
    )
    
NB previous versions of this module, used this function to create new process. This is *not* a good example of its use; it is better handled with something like the example above. """ try: obj = _wmi_object (self.SpawnInstance_ (), self) obj.set (**kwargs) return obj except pywintypes.com_error, error_info: handle_com_error (error_info) # # class _wmi_result # class _wmi_result: """Simple, data only result for targeted WMI queries which request data only result classes via fetch_as_classes. """ def __init__(self, obj, attributes): if attributes: for attr in attributes: self.__dict__[attr] = obj.Properties_ (attr).Value else: for p in obj.Properties_: attr = p.Name self.__dict__[attr] = obj.Properties_(attr).Value # # class WMI # class _wmi_namespace: """A WMI root of a computer system. The classes attribute holds a list of the classes on offer. This means you can explore a bit with things like this:
  c = wmi.WMI ()
  for i in c.classes:
    if "user" in i.lower ():
      print i
  
""" def __init__ (self, namespace, find_classes): _set (self, "_namespace", namespace) # # wmi attribute preserved for backwards compatibility # _set (self, "wmi", namespace) # Initialise the "classes" attribute, to avoid infinite recursion in the # __getattr__ method (which uses it). self.classes = {} # # Pick up the list of classes under this namespace # so that they can be queried, and used as though # properties of the namespace by means of the __getattr__ # hook below. # If the namespace does not support SubclassesOf, carry on # regardless # if find_classes: try: self.classes.update (self.subclasses_of ()) except AttributeError: pass def __repr__ (self): return u"<_wmi_namespace: %s>" % self.wmi def __str__ (self): return repr (self) def get (self, moniker): try: return _wmi_object (self.wmi.Get (moniker)) except pywintypes.com_error, error_info: handle_com_error (error_info) def handle (self): """The raw OLE object representing the WMI namespace""" return self._namespace def subclasses_of (self, root="", regex=r".*"): classes = {} for c in self._namespace.SubclassesOf (root): klass = c.Path_.Class if re.match (regex, klass): classes[klass] = None return classes def instances (self, class_name): """Return a list of instances of the WMI class. This is (probably) equivalent to querying with no qualifiers.
    system.instances ("Win32_LogicalDisk")
    # should be the same as
    system.Win32_LogicalDisk ()
    
""" try: return [_wmi_object (obj) for obj in self._namespace.InstancesOf (class_name)] except pywintypes.com_error, error_info: handle_com_error (error_info) def new (self, wmi_class, **kwargs): """This is now implemented by a call to _wmi_namespace.new (qv)""" return getattr (self, wmi_class).new (**kwargs) new_instance_of = new def _raw_query (self, wql): """Execute a WQL query and return its raw results. Use the flags recommended by Microsoft to achieve a read-only, semi-synchronous query where the time is taken while looping through. Should really be a generator, but ... NB Backslashes need to be doubled up. """ flags = wbemFlagReturnImmediately | wbemFlagForwardOnly wql = wql.replace (u"\\", u"\\\\") if _DEBUG: print u"_raw_query(wql):", wql try: return self._namespace.ExecQuery (strQuery=wql, iFlags=flags) except pywintypes.com_error, (hresult, hresult_text, additional, param_in_error): raise WMI_EXCEPTIONS.get (hresult, x_wmi (hresult)) def query (self, wql, instance_of=None, fields=[]): """Perform an arbitrary query against a WMI object, and return a list of _wmi_object representations of the results. """ return [ _wmi_object (obj, instance_of, fields) for obj in self._raw_query(wql) ] def fetch_as_classes (self, wmi_classname, fields=(), **where_clause): """Build and execute a wql query to fetch the specified list of fields from the specified wmi_classname + where_clause, then return the results as a list of simple class instances with attributes matching fields_list. If fields is left empty, select * and pre-load all class attributes for each class returned. """ wql = u"SELECT %s FROM %s" % (fields and u", ".join (fields) or u"*", wmi_classname) if where_clause: wql += u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items()]) return [_wmi_result (obj, fields) for obj in self._raw_query(wql)] def fetch_as_lists (self, wmi_classname, fields, **where_clause): """Build and execute a wql query to fetch the specified list of fields from the specified wmi_classname + where_clause, then return the results as a list of lists whose values correspond fields_list. """ wql = u"SELECT %s FROM %s" % (u", ".join (fields), wmi_classname) if where_clause: wql += u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items()]) results = [] for obj in self._raw_query(wql): results.append ([obj.Properties_ (field).Value for field in fields]) return results def watch_for ( self, raw_wql=None, notification_type=u"operation", wmi_class=None, delay_secs=1, **where_clause ): """Set up an event tracker on a WMI event. This function returns an wmi_watcher which can be called to get the next event. eg,
    c = wmi.WMI ()
    
    raw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Process'"
    watcher = c.watch_for (raw_wql=raw_wql)
    while 1:
      process_created = watcher ()
      print process_created.Name

    # or
     
    watcher = c.watch_for (
      notification_type="Creation",
      wmi_class="Win32_Process",
      delay_secs=2,
      Name='calc.exe'
    )
    calc_created = watcher ()
    
Now supports timeout on the call to watcher, eg:
    import pythoncom
    import wmi
    c = wmi.WMI (privileges=["Security"])
    watcher1 = c.watch_for (
      notification_type="Creation",
      wmi_class="Win32_NTLogEvent",
      Type="error"
    )
    watcher2 = c.watch_for (
      notification_type="Creation",
      wmi_class="Win32_NTLogEvent",
      Type="warning"
    )

    while 1:
      try:
        error_log = watcher1 (500)
      except wmi.x_wmi_timed_out:
        pythoncom.PumpWaitingMessages ()
      else:
        print error_log

      try:
        warning_log = watcher2 (500)
      except wmi.x_wmi_timed_out:
        pythoncom.PumpWaitingMessages ()
      else:
        print warning_log
    
""" if wmi_class: if isinstance (wmi_class, _wmi_class): class_name = wmi_class._class_name else: class_name = wmi_class wmi_class = getattr (self, class_name) is_extrinsic = u"__ExtrinsicEvent" in wmi_class.derivation () else: class_name = is_extrinsic = None if raw_wql: wql = raw_wql else: if is_extrinsic: if where_clause: where = u" WHERE " + u" AND ".join ([u"%s = '%s'" % (k, v) for k, v in where_clause.items ()]) else: where = u"" wql = u"SELECT * FROM " + class_name + where else: if where_clause: where = u" AND " + u" AND ".join ([u"TargetInstance.%s = '%s'" % (k, v) for k, v in where_clause.items ()]) else: where = u"" wql = \ u"SELECT * FROM __Instance%sEvent WITHIN %d WHERE TargetInstance ISA '%s' %s" % \ (notification_type, delay_secs, class_name, where) if _DEBUG: print wql try: return _wmi_watcher (self._namespace.ExecNotificationQuery (wql), is_extrinsic=is_extrinsic) except pywintypes.com_error, error_info: handle_com_error (error_info) def __getattr__ (self, attribute): """Offer WMI classes as simple attributes. Pass through any untrapped unattribute to the underlying OLE object. This means that new or unmapped functionality is still available to the module user. """ # # Don't try to match against known classes as was previously # done since the list may not have been requested # (find_classes=False). # try: return self._cached_classes (attribute) except pywintypes.com_error, error_info: try: return self._cached_classes ("Win32_" + attribute) except pywintypes.com_error, error_info: return getattr (self._namespace, attribute) def _cached_classes (self, class_name): """Standard caching helper which keeps track of classes already retrieved by name and returns the existing object if found. If this is the first retrieval, store it and pass it back """ if self.classes.get (class_name) is None: self.classes[class_name] = _wmi_class (self, self._namespace.Get (class_name)) return self.classes[class_name] def _getAttributeNames (self): """Return list of classes for IPython completion engine""" classes = [str (x) for x in self.classes.keys () if not x.startswith ('__')] return classes # # class _wmi_watcher # class _wmi_watcher: """Helper class for WMI.watch_for below (qv)""" _event_property_map = { "TargetInstance" : _wmi_object, "PreviousInstance" : _wmi_object } def __init__ (self, wmi_event, is_extrinsic): self.wmi_event = wmi_event self.is_extrinsic = is_extrinsic def __call__ (self, timeout_ms=-1): """When called, return the instance which caused the event. Supports timeout in milliseconds (defaulting to infinite). If the watcher times out, x_wmi_timed_out is raised. This makes it easy to support watching for multiple objects. """ try: event = self.wmi_event.NextEvent (timeout_ms) if self.is_extrinsic: return _wmi_event (event, None) else: return _wmi_event ( event.Properties_ ("TargetInstance").Value, _wmi_object (event, property_map=self._event_property_map) ) except pywintypes.com_error, error_info: hresult_code, hresult_name, additional_info, parameter_in_error = error_info if additional_info: wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info if scode == wbemErrTimedout: raise x_wmi_timed_out handle_com_error (error_info) PROTOCOL = "winmgmts:" IMPERSONATION_LEVEL = "impersonate" AUTHENTICATION_LEVEL = "default" NAMESPACE = "root/cimv2" def connect ( computer=".", impersonation_level="", authentication_level="", authority="", privileges="", moniker="", wmi=None, namespace="", suffix="", user="", password="", find_classes=True, debug=False ): """The WMI constructor can either take a ready-made moniker or as many parts of one as are necessary. Eg,
   c = wmi.WMI (moniker="winmgmts:{impersonationLevel=Delegate}//remote")

   # or

   c = wmi.WMI (computer="remote", privileges=["!RemoteShutdown", "Security"])
   
I daren't link to a Microsoft URL; they change so often. Try Googling for WMI construct moniker and see what it comes back with. For complete control, a named argument "wmi" can be supplied, which should be a SWbemServices object, which you create yourself. Eg,
   loc = win32com.client.Dispatch("WbemScripting.SWbemLocator")
   svc = loc.ConnectServer(...)
   c = wmi.WMI(wmi=svc)
   
This is the only way of connecting to a remote computer with a different username, as the moniker syntax does not allow specification of a user name. If the "wmi" parameter is supplied, all other parameters are ignored. """ global _DEBUG _DEBUG = debug # # If namespace is a blank string, leave # it unaltered as it might to trying to # access the root namespace # #if namespace is None: # namespace = NAMESPACE try: if wmi: obj = wmi elif moniker: if not moniker.startswith (PROTOCOL): moniker = PROTOCOL + moniker if _DEBUG: print moniker obj = GetObject (moniker) else: if namespace: parts = re.split (r"[/\\]", namespace) if parts[0] != 'root': parts.insert (0, "root") namespace = "/".join (parts) if user: if impersonation_level or authentication_level or privileges or suffix: raise x_wmi, u"You can't specify an impersonation, authentication or privilege as well as a username" else: obj = connect_server ( server=computer, namespace=namespace, user=user, password=password, authority=authority ) else: moniker = construct_moniker ( computer=computer, impersonation_level=impersonation_level or IMPERSONATION_LEVEL, authentication_level=authentication_level or AUTHENTICATION_LEVEL, authority=authority, privileges=privileges, namespace=namespace, suffix=suffix ) if _DEBUG: print moniker obj = GetObject (moniker) wmi_type = get_wmi_type (obj) if wmi_type == "namespace": return _wmi_namespace (obj, find_classes) elif wmi_type == "class": return _wmi_class (None, obj) elif wmi_type == "instance": return _wmi_object (obj) else: raise x_wmi, u"Unknown moniker type" except pywintypes.com_error, error_info: handle_com_error (error_info) WMI = connect def construct_moniker ( computer=None, impersonation_level="Impersonate", authentication_level="Default", authority=None, privileges=None, namespace=None, suffix=None ): security = [] if impersonation_level: security.append ("impersonationLevel=%s" % impersonation_level) if authentication_level: security.append ("authenticationLevel=%s" % authentication_level) # # Use of the authority descriptor is invalid on the local machine # if authority and computer: security.append ("authority=%s" % authority) if privileges: security.append ("(%s)" % ", ".join (privileges)) moniker = [PROTOCOL] if security: moniker.append ("{%s}/" % ",".join (security)) if computer: moniker.append ("/%s/" % computer) if namespace: moniker.append (namespace) if suffix: moniker.append (":%s" % suffix) return "".join (moniker) def get_wmi_type (obj): try: path = obj.Path_ except AttributeError: return "namespace" else: if path.IsClass: return "class" else: return "instance" def connect_server ( server, namespace = "", user = "", password = "", locale = "", authority = "", security_flags = 0, named_value_set = None ): """Return a remote server running WMI server - name of the server namespace - namespace to connect to: defaults to whatever's defined as default user - username to connect as, either local or domain (dom\name or user@domain for XP) password: leave blank to use current context locale: desired locale in form MS_XXXX (eg MS_409 for Am En) authority: either "Kerberos:" or an NT domain. Not needed if included in user security_flags: if 0, connect will wait forever; if 0x80, connect will timeout at 2 mins named_value_set: typically empty, otherwise a context-specific SWbemNamedValueSet
  c = wmi.WMI (wmi=wmi.connect_server (server="remote_machine", user="myname", password="mypassword"))
  
""" if _DEBUG: print server print namespace print user print password print locale print authority print security_flags print named_value_set return Dispatch ("WbemScripting.SWbemLocator").\ ConnectServer ( server, namespace, user, password, locale, authority, security_flags, named_value_set ) def Registry ( computer=None, impersonation_level="Impersonate", authentication_level="Default", authority=None, privileges=None, moniker=None ): if not moniker: moniker = construct_moniker ( computer=computer, impersonation_level=impersonation_level, authentication_level=authentication_level, authority=authority, privileges=privileges, namespace="root/default", suffix="StdRegProv" ) try: return _wmi_object (GetObject (moniker)) except pywintypes.com_error, error_info: handle_com_error (error_info) # # From a post to python-win32 by Sean # def machines_in_domain (domain_name): adsi = Dispatch ("ADsNameSpaces") nt = adsi.GetObject ("","WinNT:") result = nt.OpenDSObject ("WinNT://%s" % domain_name, "", "", 0) result.Filter = ["computer"] domain = [] for machine in result: domain.append (machine.Name) return domain # # Typical use test # if __name__ == '__main__': system = WMI () for my_computer in system.Win32_ComputerSystem (): print u"Disks on", my_computer.Name for disk in system.Win32_LogicalDisk (): print disk.Caption, disk.Description, disk.ProviderName or ""