"""yang.connector module defines a set of classes that connect to Data Model
Interfaces (DMI), in particular, an implementation of Netconf client by
wrapping up ncclient package. Restconf implementation is coming next."""
# metadata
__version__ = '3.0.0'
__author__ = ('Jonathan Yang <yuekyang@cisco.com>',
'Siming Yuan <siyuan@cisco.com',)
__contact__ = 'yang-python@cisco.com'
__copyright__ = 'Cisco Systems, Inc.'
import re
import time
import atexit
import logging
from ncclient import manager
from ncclient import operations
from ncclient import transport
from ncclient.devices.default import DefaultDeviceHandler
from ncclient.operations.errors import TimeoutExpiredError
from pyats.connections import BaseConnection
# try to record usage statistics
# - only internal cisco users will have stats.CesMonitor module
# - below code does nothing for DevNet users - we DO NOT track usage stats
# for PyPI/public/customer users
try:
# new internal cisco-only pkg since devnet release
from ats.cisco.stats import CesMonitor
except Exception:
try:
# legacy pyats version, stats was inside utils module
from ats.utils.stats import CesMonitor
except Exception:
CesMonitor = None
finally:
if CesMonitor is not None:
# CesMonitor exists -> this is an internal cisco user
CesMonitor(action = __name__, application='pyATS Packages').post()
# create a logger for this module
logger = logging.getLogger(__name__)
[docs]class Netconf(manager.Manager, BaseConnection):
'''Netconf
Implementation of NetConf connection to devices (NX-OS, IOS-XR or IOS-XE),
based on pyATS BaseConnection and ncclient.
YAML Example::
devices:
asr22:
type: 'ASR'
tacacs:
login_prompt: "login:"
password_prompt: "Password:"
username: "admin"
passwords:
tacacs: admin
enable: admin
line: admin
connections:
a:
protocol: telnet
ip: "1.2.3.4"
port: 2004
vty:
protocol : telnet
ip : "2.3.4.5"
netconf:
class: yang.connector.Netconf
ip : "2.3.4.5"
port: 830
username: admin
password: admin
Code Example::
>>> from pyats.topology import loader
>>> testbed = loader.load('/users/xxx/xxx/asr22.yaml')
>>> device = testbed.devices['asr22']
>>> device.connect(alias='nc', via='netconf')
>>> device.nc.connected
True
>>> netconf_request = """
... <rpc message-id="101"
... xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
... <get>
... <filter>
... <native xmlns="http://cisco.com/ns/yang/ned/ios">
... <version>
... </version>
... </native>
... </filter>
... </get>
... </rpc>
... """
>>> reply = device.nc.request(netconf_request)
>>> print(reply)
<?xml version="1.0" encoding="UTF-8"?>
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
message-id="101"><data>
<native xmlns="http://cisco.com/ns/yang/ned/ios">
<version>16.3</version></native></data></rpc-reply>
>>> device.nc.disconnect()
>>> device.nc.connected
False
>>>
Attributes
----------
timeout : `int`
Timeout value in seconds which is used by paramiko channel. By
default this value is 30 seconds.
client_capabilities : `object`
Object ncclient.capabilities.Capabilities representing the client's
capabilities.
server_capabilities : `object`
Object ncclient.capabilities.Capabilities representing the server's
capabilities, and it has a list of data models the server supports.
async_mode : `boolean`
Specify whether operations are executed asynchronously (True) or
synchronously (False). The default value is False.
'''
def __init__(self, *args, **kwargs):
'''
__init__ instantiates a single connection instance.
'''
# instanciate BaseConnection
# (could use super...)
BaseConnection.__init__(self, *args, **kwargs)
# default values
defaults = {
'async_mode': False,
'raise_mode': 0,
'timeout': 30,
}
defaults = {k: self.connection_info.get(k, v) for k, v in defaults.items()}
defaults = {k: kwargs.get(k, v) for k, v in defaults.items()}
# shortwire Ncclient device handling portion
# and create just the DeviceHandler
device_handler = DefaultDeviceHandler()
# create the session instance
session = transport.SSHSession(device_handler)
# load known_hosts file (if available)
session.load_known_hosts()
# instanciate ncclient Manager
# (can't use super due to mro change)
manager.Manager.__init__(self, session = session,
device_handler = device_handler,
*args, **defaults)
for item in ['async_mode', 'raise_mode']:
setattr(self, item, defaults[item])
@property
def session(self):
'''session
High-level api: return the SSH session object.
Returns
-------
object
The SSH session that was created by ncclient.transport.SSHSession.
'''
return self._session
[docs] def connect(self):
'''connect
High-level api: opens the NetConf connection and exchanges
capabilities. Since topology YAML file is parsed by BaseConnection,
the following parameters can be specified in your YAML file.
Parameters
----------
host : `string`
Hostname or IP address to connect to.
port : `int`, optional
By default port is 830, but some devices use the default SSH port
of 22 so this may need to be specified.
timeout : `int`, optional
An optional keyed argument to set timeout value in seconds. By
default this value is 30 seconds.
username : `string`
The username to use for SSH authentication.
password : `string`
The password used if using password authentication, or the
passphrase to use for unlocking keys that require it.
key_filename : `string`
a filename where a the private key to be used can be found.
allow_agent : `boolean`
Enables querying SSH agent (if found) for keys. The default value
is True.
hostkey_verify : `boolean`
Enables hostkey verification from ~/.ssh/known_hosts. The default
value is False.
look_for_keys : `boolean`
Enables looking in the usual locations for ssh keys
(e.g. ~/.ssh/id_*). The default value is True.
ssh_config : `string`
Enables parsing of an OpenSSH configuration file, if set to its
path, e.g. ~/.ssh/config or to True. If the value is True,
ncclient uses ~/.ssh/config. The default value is None.
Raises
------
Exception
If the YAML file does not have correct connections section, or
establishing transport to ip:port is failed, ssh authentication is
failed, or other transport failures.
Note
----
There is no return from this method. If something goes wrong, an
exception will be raised.
YAML Example::
devices:
asr22:
type: 'ASR'
tacacs:
login_prompt: "login:"
password_prompt: "Password:"
username: "admin"
passwords:
tacacs: admin
enable: admin
line: admin
connections:
a:
protocol: telnet
ip: "1.2.3.4"
port: 2004
vty:
protocol : telnet
ip : "2.3.4.5"
netconf:
class: yang.connector.Netconf
ip : "2.3.4.5"
port: 830
username: admin
password: admin
Code Example::
>>> from pyats.topology import loader
>>> testbed = loader.load('/users/xxx/xxx/asr22.yaml')
>>> device = testbed.devices['asr22']
>>> device.connect(alias='nc', via='netconf')
>>>
Expected Results::
>>> device.nc.connected
True
>>> for iter in device.nc.server_capabilities:
... print(iter)
...
urn:ietf:params:xml:ns:yang:smiv2:RFC-1215?module=RFC-1215
urn:ietf:params:xml:ns:yang:smiv2:SNMPv2-TC?module=SNMPv2-TC
...
>>>
'''
if self.connected:
return
logger.debug(self.session)
if not self.session.is_alive():
self._session = transport.SSHSession(self._device_handler)
# default values
defaults = {
'host': None,
'port': 830,
'timeout': 30,
'username': None,
'password': None,
'key_filename': None,
'allow_agent': False,
'hostkey_verify': False,
'look_for_keys': False,
'ssh_config': None,
}
defaults.update(self.connection_info)
# remove items
disregards = ['class', 'model', 'protocol', 'async_mode', 'raise_mode']
defaults = {k: v for k, v in defaults.items() if k not in disregards}
# rename ip -> host, cast to str type
if 'ip' in defaults:
defaults['host'] = str(defaults.pop('ip'))
# rename user -> username
if 'user' in defaults:
defaults['username'] = str(defaults.pop('user'))
defaults = {k: getattr(self, k, v) for k, v in defaults.items()}
try:
self.session.connect(**defaults)
except Exception:
if self.session.transport:
self.session.close()
raise
@atexit.register
def cleanup():
if self.session.transport:
self.session.close()
[docs] def disconnect(self):
'''disconnect
High-level api: closes the NetConf connection.
'''
self.session.close()
[docs] def execute(self, operation, *args, **kwargs):
'''execute
High-level api: The fact that most connection classes implement
execute method lead us to add this method here as well.
Supported operations are get, get_config, get_schema, dispatch,
edit_config, copy_config, validate, commit, discard_changes,
delete_config, lock, unlock, close_session, kill_session,
poweroff_machine and reboot_machine. Refer to ncclient document for
more details.
'''
# allow for operation string type
if type(operation) is str:
try:
cls = manager.OPERATIONS[operation]
except KeyError:
raise ValueError('No such operation "%s".\n'
'Supported operations are: %s' %
(operation, list(manager.OPERATIONS.keys())))
else:
cls = operation
return super().execute(cls, *args, **kwargs)
[docs] def request(self, msg, timeout=30):
'''request
High-level api: sends message through NetConf session and returns with
a reply. Exception is thrown out either the reply is in wrong
format or timout. Users can modify timeout value (in seconds) by
passing parameter timeout. Users may want to set a larger timeout when
making a large query.
Parameters
----------
msg : `str`
Any message need to be sent out in XML format. The message can be
in wrong format if it is a negative test case. Because ncclient
tracks same message-id in both rpc and rpc-reply, missing
message-id in your rpc may cause exception when receiving
rpc-reply. Most other wrong format rpc's can be sent without
exception.
timeout : `int`, optional
An optional keyed argument to set timeout value in seconds. Its
default value is 30 seconds.
Returns
-------
str
The reply from the device in string. If something goes wrong, an
exception will be raised.
Raises
------
Exception
If NetConf is not connected, or there is a timeout when receiving
reply.
Code Example::
>>> from pyats.topology import loader
>>> testbed = loader.load('/users/xxx/xxx/asr_20_22.yaml')
>>> device = testbed.devices['asr22']
>>> device.connect(alias='nc', via='netconf')
>>> netconf_request = """
... <rpc message-id="101"
... xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
... <get>
... <filter>
... <native xmlns="http://cisco.com/ns/yang/ned/ios">
... <version>
... </version>
... </native>
... </filter>
... </get>
... </rpc>
... """
>>> reply = device.nc.request(netconf_request)
>>>
Expected Results::
>>> print(reply)
<?xml version="1.0" encoding="UTF-8"?>
<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
message-id="101"><data>
<native xmlns="http://cisco.com/ns/yang/ned/ios">
<version>16.3</version></native></data></rpc-reply>
>>>
'''
rpc = RawRPC(session = self.session,
device_handler = self._device_handler,
timeout = timeout,
raise_mode = operations.rpc.RaiseMode.NONE)
# identify message-id
m = re.search(r'message-id="([A-Za-z0-9_\-:# ]*)"', msg)
if m:
rpc._id = m.group(1)
rpc._listener.register(rpc._id, rpc)
logger.debug('Found message-id="%s" in your rpc, which is good.', rpc._id)
else:
logger.warning('Cannot find message-id in your rpc. You may '
'expect an exception when receiving rpc-reply '
'due to missing message-id.')
return rpc._request(msg).xml
def __getattr__(self, method):
# avoid the __getattr__ from Manager class
if method in manager.VENDOR_OPERATIONS or method in manager.OPERATIONS:
return super().__getattr__(method)
else:
raise AttributeError("'%s' object has no attribute '%s'"
% (self.__class__.__name__, method))
[docs]class RawRPC(operations.rpc.RPC):
'''RawRPC
A modified ncclient.operations.rpc.RPC class. This is for internal use
only.
'''
def _request(self, msg):
'''_request
Override method _request in class ncclient.operations.RPC, so it can
handle raw rpc requests in string format without validating your rpc
request syntax. When your rpc-reply is received, in most cases, it
simply returns rpc-reply again in string format, except one scenario:
If message-id is missing or message-id received does not match that in
rpc request, ncclient will raise an OperationError.
'''
logger.debug('Requesting %r' % self.__class__.__name__)
logger.info('Sending rpc...')
logger.info(msg)
time1 = time.time()
self._session.send(msg)
if not self._async:
logger.debug('Sync request, will wait for timeout=%r' %
self._timeout)
self._event.wait(self._timeout)
if self._event.isSet():
time2 = time.time()
reply_time = "{:.3f}".format(time2 - time1)
logger.info('Receiving rpc-reply after {} sec...'.
format(reply_time))
logger.info(self._reply)
return self._reply
else:
logger.info('Timeout. No rpc-reply received.')
raise TimeoutExpiredError('ncclient timed out while waiting '
'for an rpc-reply.')