Source code for container.dhcpd

"""
ISC DHCP Server
===============
"""
import tarfile
import time
from io import BytesIO

from ipaddress import IPv4Network, IPv4Address

from container import Docker


[docs]class Dhcpd(Docker): def __init__(self, name, **kwargs): """ Docker container running a DHCP Daemon Attributes: conf: String storing dhcpd.conf settings. """ Docker.__init__(self, name, dimage="dhcpd", **kwargs) self.conf = "" # type: str self.dhcp_pid = None
[docs] def document(self, doc): doc.add_heading('ISC DHCP server', level=2) doc.add_paragraph( 'ISC DHCP server, used in scenarios to enable DHCP starvation or rougue server attacks. To perform these attacks Yersinia is used, available on provided Kali Docker images.') doc.add_paragraph("Configuration: \t%s" % self.conf) super(Dhcpd, self).document(doc)
[docs] def config(self, **kwargs): """ Extends Node.config. Creates a file containing the settings from 'conf' and writes them to /etc/dhcp inside the container. TODO: If running and a config change is made restart the server TODO: Store and mount the config file rather than copying it https://stackoverflow.com/questions/42248198/how-to-mount-a-single-file-in-a-volume """ super(Dhcpd, self).config(**kwargs) conf = self.conf.encode('utf-8') # Encode conf as tar for use with put_archive tarstream = BytesIO() tar = tarfile.TarFile(fileobj=tarstream, mode='w') tarinfo = tarfile.TarInfo(name='dhcpd.conf') tarinfo.size = len(conf) tarinfo.mtime = time.time() tar.addfile(tarinfo, BytesIO(conf)) tar.close() tarstream.seek(0) # Use docker-py's put_archive command to copy our config to the container self.dcli.put_archive(self.dc, "/etc/dhcp", tarstream) # Run dhcpd on the link's interface NOTE: -d was used previously for debugging self.dhcp_pid = self.cmd("/usr/sbin/dhcpd --no-pid %s" % ' '.join(str(intf.name) for idx, intf in self.intfs.items()))
[docs] def add_global(self, option): # type: (str) -> None """ Add global parameter to 'conf'. Create these with functions within dhcpd.globals. :param option: String returned from one of the methods inside 'dhcpd.globals' class. """ self.conf += option
[docs] def add_subnet(self, ip_network, option_list=None, addr_range=None): # type: (IPv4Network, [str], (IPv4Address, IPv4Address)) -> None """ Adds a subnet configuration with the supplied parameters to 'conf'. :param ip_network: Network for subnet address and netmask. :param addr_range: Range of IP addresses the DHCP server will assign. :param option_list: List of options created by functions within 'dhcpd.options' or 'dhcpd.globals'. """ # If no address range was supplied, set range to cover every host in the subnet if option_list is None: option_list = [] if addr_range is None: addr_range = (ip_network.network_address + 1, ip_network.broadcast_address - 1) # Add address range string to options option_list += ["range %s %s;" % addr_range] # Generate subnet configuration and append to config self.conf += "subnet %s netmask %s {\n%s\n}" % ( ip_network.network_address, ip_network.netmask, '\n'.join(option_list))
# Full list of parameters available at: http://www.ipamworldwide.com/ipam/dhcp-server-params.html # noinspection PyClassHasNoInit
[docs]class globals: """ Global options as per http://www.ipamworldwide.com/ipam/dhcp-server-params.html. NOTE: not all options are implemented. """
[docs] @staticmethod def authoritative(): return "authoritative;"
[docs] @staticmethod def local_port(port): # type: (int) -> str if port not in range(1, 65536): raise IndexError("TCP Port must be between 1 and 65535 inclusive.") return "local-port %s;" % str(port)
[docs] @staticmethod def local_address(address): # type: (IPv4Address) -> str return "local-address %s;" % str(address)
[docs] @staticmethod def default_lease_time(time): # type: (int) -> str return "default-lease-time %s;" % str(time)
[docs] @staticmethod def min_lease_time(time): # type: (int) -> str return "min-lease-time %s;" % str(time)
[docs] @staticmethod def max_lease_time(time): # type: (int) -> str return "max-lease-time %s;" % str(time)
# noinspection PyClassHasNoInit
[docs]class options: """ Per network options as per http://www.ipamworldwide.com/ipam/isc-dhcpv4-options.html NOTE: not all options are implemented. """ from typing import List, Tuple
[docs] @staticmethod def subnet_mask(ip_network): # type: (IPv4Network) -> str return "option subnet-mask %s;" % str(ip_network.netmask)
[docs] @staticmethod def broadcast_address(ip_network): # type: (IPv4Network) -> str return "option broadcast-address %s;" % str(ip_network.broadcast_address)
@staticmethod def _server_list(name, *addresses): """ Internal helper function for options which are lists of addresses. """ return "option %s %s;" % (name, ' '.join([str(address) for address in addresses]))
[docs] @staticmethod def routers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("routers", addresses)
[docs] @staticmethod def time_servers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("time-servers", addresses)
[docs] @staticmethod def domain_name(name): # type: (str) -> str return "option domain-name %s;" % name
[docs] @staticmethod def domain_name_servers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("domain-name-servers", addresses)
[docs] @staticmethod def log_servers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("log-servers", addresses)
[docs] @staticmethod def cookie_servers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("cookie-servers", addresses)
[docs] @staticmethod def ntp_servers(*addresses): # type: (List[IPv4Address]) -> str return options._server_list("ntp-servers", addresses)
[docs] @staticmethod def router_discovery(should_discover): # type: (bool) -> str return "option routers %s;" % str(should_discover).lower()
[docs] @staticmethod def static_routes(*addresses): # type: ([Tuple[IPv4Address, IPv4Address]]) -> str return "option domain_name_servers %s;" % ','.join(["%s, %s" % (addr1, addr2) for addr1, addr2 in addresses])
[docs]def example(): from mininet.clean import cleanup from mininet.cli import CLI from mininet.log import info, setLogLevel from mininet.net import Containernet if __name__ == "__main__": setLogLevel('info') info('*** Running Cleanup\n') cleanup() net = Containernet(controller=None) info('*** Adding host\n') h1 = net.addHost('h1', ip='10.10.10.2/24') info('*** Adding switch\n') s1 = net.addSwitch('s1') info('*** Adding dhcpd\n') dhcpd = net.addDocker('dhcpd', cls=Dhcpd, dhcp_switch=s1, ip='10.10.10.1/24') # type: Dhcpd dhcpd.add_subnet(IPv4Network(u'10.10.10.0/24')) info('*** Creating links\n') net.addLink(h1, s1) info('*** Starting network\n') net.start() info('*** Running CLI\n') CLI(net) info('*** Stopping network') net.stop()
if __name__ == "__main__": example()