bundled wstools-0.3
[p2pool.git] / wstools / tests / test_wsdl.py
1 #!/usr/bin/env python
2
3 ############################################################################
4 # Joshua R. Boverhof, David W. Robertson, LBNL
5 # See LBNLCopyright for copyright notice!
6 ###########################################################################
7
8 import sys, unittest
9 import ConfigParser
10 import os
11 from wstools.Utility import DOM
12 from wstools.WSDLTools import WSDLReader
13 from wstools.TimeoutSocket import TimeoutError
14
15 from wstools import tests
16 cwd = os.path.dirname(tests.__file__)
17
18 class WSDLToolsTestCase(unittest.TestCase):
19
20     def __init__(self, methodName='runTest'):
21         unittest.TestCase.__init__(self, methodName)
22
23     def setUp(self):
24         self.path = nameGenerator.next()
25         print self.path
26         sys.stdout.flush()
27
28     def __str__(self):
29         teststr = unittest.TestCase.__str__(self)
30         if hasattr(self, "path"):
31             return "%s: %s" % (teststr, self.path )
32         else:
33             return "%s" % (teststr)
34
35     def checkWSDLCollection(self, tag_name, component, key='name'):
36         if self.wsdl is None:
37             return
38         definition = self.wsdl.document.documentElement
39         version = DOM.WSDLUriToVersion(definition.namespaceURI)
40         nspname = DOM.GetWSDLUri(version)
41         for node in DOM.getElements(definition, tag_name, nspname):
42             name = DOM.getAttr(node, key)
43             comp = component[name]
44             self.failUnlessEqual(eval('comp.%s' %key), name)
45
46     def checkXSDCollection(self, tag_name, component, node, key='name'):
47         for cnode in DOM.getElements(node, tag_name):
48             name = DOM.getAttr(cnode, key)
49             component[name] 
50
51     def test_all(self):
52         try:
53             if self.path[:7] == 'http://':
54                 self.wsdl = WSDLReader().loadFromURL(self.path)
55             else:
56                 self.wsdl = WSDLReader().loadFromFile(self.path)
57
58         except TimeoutError:
59             print "connection timed out"
60             sys.stdout.flush()
61             return
62         except:
63             self.path = self.path + ": load failed, unable to start"
64             raise
65
66         try:
67             self.checkWSDLCollection('service', self.wsdl.services)
68         except:
69             self.path = self.path + ": wsdl.services"
70             raise
71
72         try:
73             self.checkWSDLCollection('message', self.wsdl.messages)
74         except:
75             self.path = self.path + ": wsdl.messages"
76             raise
77
78         try:
79             self.checkWSDLCollection('portType', self.wsdl.portTypes)
80         except:
81             self.path = self.path + ": wsdl.portTypes"
82             raise
83
84         try:
85             self.checkWSDLCollection('binding', self.wsdl.bindings)
86         except:
87             self.path = self.path + ": wsdl.bindings"
88             raise
89
90         try:
91             self.checkWSDLCollection('import', self.wsdl.imports, key='namespace')
92         except:
93             self.path = self.path + ": wsdl.imports"
94             raise
95
96         try:
97             for key in self.wsdl.types.keys(): 
98                 schema = self.wsdl.types[key]
99                 self.failUnlessEqual(key, schema.getTargetNamespace())
100
101             definition = self.wsdl.document.documentElement
102             version = DOM.WSDLUriToVersion(definition.namespaceURI)
103             nspname = DOM.GetWSDLUri(version)
104             for node in DOM.getElements(definition, 'types', nspname):
105                 for snode in DOM.getElements(node, 'schema'):
106                     tns = DOM.findTargetNS(snode)
107                     schema = self.wsdl.types[tns]
108                     self.schemaAttributesDeclarations(schema, snode)
109                     self.schemaAttributeGroupDeclarations(schema, snode)
110                     self.schemaElementDeclarations(schema, snode)
111                     self.schemaTypeDefinitions(schema, snode)
112         except:
113             self.path = self.path + ": wsdl.types"
114             raise
115
116         if self.wsdl.extensions:
117             print 'No check for WSDLTools(%s) Extensions:' %(self.wsdl.name)
118             for ext in self.wsdl.extensions: print '\t', ext
119
120     def schemaAttributesDeclarations(self, schema, node):
121         self.checkXSDCollection('attribute', schema.attr_decl, node)
122
123     def schemaAttributeGroupDeclarations(self, schema, node):
124         self.checkXSDCollection('group', schema.attr_groups, node)
125
126     def schemaElementDeclarations(self, schema, node):
127         self.checkXSDCollection('element', schema.elements, node)
128
129     def schemaTypeDefinitions(self, schema, node):
130         self.checkXSDCollection('complexType', schema.types, node)
131         self.checkXSDCollection('simpleType', schema.types, node)
132
133
134 def setUpOptions(section):
135     cp = ConfigParser.ConfigParser()
136     cp.read(cwd+'/config.txt')
137     if not cp.sections():
138         print 'fatal error:  configuration file config.txt not present'
139         sys.exit(0)
140     if not cp.has_section(section):
141         print '%s section not present in configuration file, exiting' % section
142         sys.exit(0)
143     return cp, len(cp.options(section))
144
145 def getOption(cp, section):
146     for name, value in cp.items(section):
147         yield value
148     
149 def makeTestSuite(section='services_by_file'):
150     global nameGenerator
151
152     cp, numTests = setUpOptions(section)
153     nameGenerator = getOption(cp, section)
154     suite = unittest.TestSuite()
155     for i in range(0, numTests):
156         suite.addTest(unittest.makeSuite(WSDLToolsTestCase, 'test_'))
157     return suite
158
159
160 def main():
161     unittest.main(defaultTest="makeTestSuite")
162                   
163
164 if __name__ == "__main__" : main()