#!/usr/bin/python # # Livetest # Tests for functionality of basic ipsec features. # # Copyright (C) 2009 Daniel Snider # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. See . # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. import os import sys import commands import cgitb import httplib import urllib import subprocess import datetime import time import signal import datetime import getopt natflag = 0 retryflag = 0 nocolour = 0 returnhelp = 0 verbose = 0 ADDRWEB = "" #livetest server ADDRSEC = "" #livetest libreswan server configsetup = "config setup" nat_traversal = "nat_traversal=yes" def outmsg(msg): sys.stdout.write(msg) sys.stdout.flush() return len(msg) #function to print output def output(mod, state): if nocolour: s = "\t[" + state + "]\n" sys.stdout.write (s.expandtabs(70 - mod - len (state))) if state == "FAIL": sys.exit(1) else: s = "\t[" sys.stdout.write(s.expandtabs(70-mod-len(state))) if (state == "OK") | (state == "YES") | (state == "NO"): sys.stdout.write ('\033[92m') elif (state == "FAIL") | (state == "FAILED"): sys.stdout.write ('\033[91m') else: sys.stdout.write ('\033[93m') sys.stdout.write (state) sys.stdout.write ('\033[0m') sys.stdout.write ("]\n") if state == "FAIL": sys.exit(1) #runs commands and stops after timeout seconds def timeout_command(command, timeout): #call shell-command and either return its output or kill it #if it doesn't normally exit within timeout seconds and return None cmd = command.split(" ") start = datetime.datetime.now() process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while process.poll() is None: time.sleep(0.2) now = datetime.datetime.now() if (now - start).seconds> timeout: os.kill(process.pid, signal.SIGKILL) os.waitpid(-1, os.WNOHANG) return None return process.wait() #Check pluto. It should not be running def checkPluto(): (status,output) = commands.getstatusoutput("ipsec setup status") #return values: #0 - ipsec running #256 - could be started or stopped #512 - subsystem locked #768 - ipsec stopped if "can not load config" in output: if nocolour: print "\nError: " + output else: print "\n\033[91mError\033[0m: " + output sys.exit(1) elif status == 0: if returnhelp: print "\n\nTo stop pluto use the command: \"ipsec setup stop\"\n" return "FAIL" elif status == 256: outp = commands.getoutput("ipsec ikeping") if "v4 bind: Address already in use" in outp: if returnhelp: print "\n\nTo stop pluto use the command: \"ipsec setup stop\"\n" return "FAIL" else: return "OK" elif status == 512: return "LOCKED" elif status == 768: return "OK" else: return "UNKNOWN" #Check for installed libreswan def checkInstall(): (status,output) = commands.getstatusoutput("ipsec --version") if status == 0: return "OK" else: return "FAIL" #check for SElinux enforce def checkSElinux(): if os.path.exists("/selinux/enforce"): return "FAILED" else: return "OK" #check for ipforwarding def checkForwarding(): file = open("/proc/sys/net/ipv4/ip_forward", 'r') contents = file.read() if "0" in contents: if returnhelp: print "\n\nTry \"echo \"1\" > /proc/sys/net/ipv4/ip_forward\" to fix this problem\n" return "WARNING" elif "1" in contents: return "OK" else: return "FAIL" #check that send and accept icmp redirects are disabled if netkey is detected def check_icmp_redirect(option): if "1" in open("/proc/sys/net/ipv4/conf/default/" + option + "_redirects").read(): if returnhelp: print "\n\nTry \"for f in /proc/sys/net/ipv4/conf/*/"+option+"_redirects; do echo 0 > $f; done\" to fix this problem\n" return "WARNING" else: return "OK" #check for IPsec suppor in kernel def checkKern(): (status,output) = commands.getstatusoutput("ipsec --version") if "no kernel code" in output: if (commands.getstatusoutput("modprobe af_key")[0] != 0) & (commands.getstatusoutput("modprobe ipsec")[0] != 0): return "FAIL" else: return "OK" else: return "OK" #Check that ip command is present def checkCommandIP(): (status,output) = commands.getstatusoutput("which ip") if status == 0: return "OK" else: return "FAIL" #Check that ip command is present def checkCommandIPtables(): (status,output) = commands.getstatusoutput("which iptables") if status == 0: return "OK" else: return "FAIL" #Check for internet connectivity def checkNet(): (status,output) = commands.getstatusoutput("ping " + ADDRWEB + " -c 1") if status == 0: return "OK" else: return "FAIL" def getAppearentIP(): conn = httplib.HTTPConnection(ADDRWEB, 80) conn.request("GET", "/cgi-bin/returnip.py") r1 = conn.getresponse() appearent = r1.read() conn.close() return appearent.strip("\n") #definitively get local ip from default route def getLocalIP(): interfaces = commands.getstatusoutput("ip ro list") split_if = interfaces[1].split("\n") for line in split_if: if "default" in line: default = line.split(" ")[4] ifconfig = commands.getstatusoutput("ifconfig") split_ifconfig = ifconfig[1].split("\n\n") for line in split_ifconfig: if default in line: return (line.split(":")[7]).split(" ")[0] #Compare local IP to percieved remote IP def checkNAT(): global natflag #compare local ips to the appearent one if getLocalIP().find(getAppearentIP()) == -1: natflag = 1 #you are behind natt return "YES" else: natflag = 0 #you are not behind natt return "NO" #check to make sure NAT address is within RFC1918 address space def checkLocalIP(): local = getLocalIP() if "192.168." not in local: if "172.10" not in local: if "10." not in local: return "FAILED" return "OK" #open ipsec.conf def openConf(): (status, dir) = commands.getstatusoutput("ipsec --confdir") f = open("/etc/ipsec.conf", 'r') if f: return "OK" else: return "FAIL" #check ipsec.conf def checkConf(s): f = open("/etc/ipsec.conf", 'r') for line in f: if ("#" + s) in line: return "FAIL" elif s in line: return "OK" return "FAIL" #port check host originating def checkPort(port, addr): global retryflag, verbose cmd = "ipsec ikeping --ikeport " + str(port) + " " + addr + "/" + str(port) (status,output) = commands.getstatusoutput("ipsec ikeping --ikeport " + str(port) + " " + addr + "/" + str(port)) if verbose: print "\nPrinting command: " + cmd print "Printing output: " + output + "\n" if (status == 0) | (status > 60000): retryflag = 0 return "OK" else: retryflag = 1 return "FAILED" #start libreswan def startPluto(): status = timeout_command("ipsec setup start", 10) if status == 0: return "OK" elif "None" in str(status): return "LAG" else: return "FAIL" #try to set up tunnel def whack(command, timeout): global retryflag retryflag = 0 if timeout == 0: (status,output) = commands.getstatusoutput(command) if status == 0: return "OK" else: return "FAILED" else: status = timeout_command(command, timeout) if status == 0: return "OK" elif "None" in str(status): retryflag = 1 return "FAILED" #more logic could be here for why a command stalled else: retryflag = 1 return "FAILED" #ping threw tunnel def livetestPing(): global retryflag retryflag = 0 #commands.getstatusoutput("ping " + ADDRSEC + " -c 1") (status,output) = commands.getstatusoutput("ping " + ADDRSEC + " -c 1") if status == 0: return "OK" else: retryflag = 1 return "FAILED" #check secure log for tunnel errors def checkLog(): #this is currently operating system dependent global retryflag retryflag = 0 log_path = "/var/log/auth.log" (status,output) = commands.getstatusoutput("tail " + log_path) if "differs from size specified in ISAKMP HDR" in output: return "expected packet size differs" elif "malformed payload notifies" in output: return "possible key error" elif "INVALID_KEY_INFORMATION" in output: return "key error" elif "INVALID_ID_INFORMATION" in output: return "ID error" elif "unknown exchange" in output: return "unknown exchange" elif "Quick Mode message is for a non-existent" in output: return "pfs not supported?" else: return "not-sure" def checkMTU(timeout): #Exception to raise on a timeout class TimeExceededError(Exception): pass #connection execution def mtu(): conn = httplib.HTTPConnection(ADDRSEC, 80) conn.request("GET", "/icons/f.png") r1 = conn.getresponse() appearent = r1.read(10) conn.close() #an alarm singal handler def alarmHandler(signum, frame): raise TimeExceededError, "Your command ran too long" #set the alarm signal handler, and set the alarm to 'timeout' seconds signal.signal(signal.SIGALRM, alarmHandler) signal.alarm(timeout) try: mtu() signal.alarm(0) return "OK" except TimeExceededError: return "FAILED" def stopPluto(): (stats,outpt) = commands.getstatusoutput("ipsec auto --delete livetest") (status,output) = commands.getstatusoutput("ipsec setup stop") if status == 0: return "OK" else: return "FAILED" #barfanalyse def barfAnalyze(): conn = httplib.HTTPConnection(ADDRWEB, 80) conn.request("GET", "/cgi-bin/barfanalyze.py") r = conn.getresponse() sys.stdout.write(r.read()) sys.stdout.flush() conn.close() #barfupload def barfUpload(): answer = raw_input("Would you like to additional checks done? It requires your ipsec barf info to be sent to Xelerance. But it's instant. Y/N: ") if answer in ["Y", "y"]: barf = commands.getoutput("ipsec barf") file_path = "/home/daniel/livetest/client/old/barf.txt" data = urllib.urlencode({"file" : barf}) f = urllib.urlopen("http://livetest.libreswan.org/cgi-bin/barfload.py", data) barfAnalyze() print "To view dubugging information on your livetest tunnel as well as your ipsec barf go to /http://livetest.libreswan.org/cgi-bin/debug_info.py" def main(): ADDRWEB = "" #livetest server ADDRSEC = "" #livetest libreswan server no_net = 0 quick = 0 configsetup = "config setup" nat_traversal = "nat_traversal=yes" global retryflag #check privileges if os.getuid() != 0: sys.exit("You are not superuser") try: opts, args = getopt.getopt(sys.argv[1:], 'nvrbiq' ,['nocolour','verbose','returnhelp','barfupload','nointernets','quick']) for o, a in opts: if (o == "--nocolour") | (o == "-n"): global nocolour nocolour = 1 if (o == "--returnhelp") | (o == "-r"): global returnhelp returnhelp = 1 if (o == "--verbose") | (o == "-v"): global verbose verbose = 1 if (o == "--barfupload") | (o == "-b"): barfUpload() sys.exit(0) if (o == "--nointernets") | (o == "-i"): no_net = 1 if (o == "--quick") | (o == "-q"): quick = 1 except getopt.GetoptError: print "usage:" print " -v, --verbose \tprints more connection info".expandtabs(25) print " -r, --returnhelp \tprints some suggestions to help fix warnings or failures".expandtabs(25) print " -n, --nocolour \tprints output without colour".expandtabs(25) print " -b, --barfupload \tuploads and analyzes your ipsec barf".expandtabs(25) print " -i, --nointernets \twill run commands that don't require internet".expandtabs(25) print " -q, --quick \tomits port tests".expandtabs(25) print " -h, --help \tprints this help".expandtabs(25) sys.exit(2) #check for network connectivity if not no_net: interfaces = commands.getstatusoutput("ip ro list") if interfaces[1] == "": sys.exit("You do not have a network connection. Try ipsec livetest -i") try: print "Checking your system to see if IPsec will work:" output (outmsg("Checking that Libreswan is installed"), checkInstall()) output (outmsg("Pluto must be stopped for this program"), checkPluto()) output (outmsg("Checking for SElinux"), checkSElinux()) output (outmsg("Check IP forwarding"), checkForwarding()) if os.path.exists("/proc/net/pfkey"): output (outmsg("NETKEY detected, testing for disabled ICMP send_redirects"), check_icmp_redirect("send")) output (outmsg("NETKEY detected, testing for disabled ICMP accept_redirects"), check_icmp_redirect("accept")) output (outmsg("Checking for ipsec support in kernel"), checkKern()) output (outmsg("Checking for 'ip' command"), checkCommandIP()) output (outmsg("Checking for 'iptables' command"), checkCommandIPtables()) try: if not no_net: output (outmsg("Checking Internet connectivity"), checkNet()) except SystemExit: try: output (outmsg("Retry: Checking Internet connectivity"), checkNet()) except SystemExit: if commands.getstatusoutput("ping www.google.com -c 1")[0] == 0: print "Livetest's test facility is currenlty offline. Sorry for the inconvienence, please try again later." sys.exit(1) else: sys.exit(1) if not no_net: getAppearentIP() if not no_net: output (outmsg("Are you behind NAT?"), checkNAT()) if natflag: output (outmsg ("Checking your local IP"), checkLocalIP()) output (outmsg("Opening ipsec.conf"), openConf()) output (outmsg("Searching /etc/ipsec.conf for \"" + configsetup + "\""), checkConf(configsetup)) if natflag: output(outmsg ("Searching /etc/ipsec.conf for \"" + nat_traversal + "\""), checkConf(nat_traversal)) if not no_net and not quick: output (outmsg("Testing port 500"), checkPort(500,ADDRSEC)) if retryflag: output (outmsg("Retry: Checking port 500"), checkPort(500,ADDRSEC)) if not no_net and not quick: output (outmsg("Testing port 4500"), checkPort(4500, ADDRWEB)) if retryflag: output (outmsg("Retry: Checking port 4500"), checkPort(4500, ADDRWEB)) if not no_net and not quick: output (outmsg("Testing port 5000"), checkPort(5000, ADDRSEC)) if retryflag: output (outmsg("Retry: Checking port 5000"), checkPort(5000, ADDRSEC)) if not no_net: output (outmsg("Starting Libreswan"), startPluto()) if natflag: conn = "ipsec whack --name livetest --id @client --host " + getLocalIP() + " --forceencaps --to --id @lab --host --tunnel --encrypt --psk --pfs" output (outmsg("Adding \"livetest-nat\" connection"), whack(conn, 0)) if verbose: print conn elif not no_net: conn = "ipsec whack --name livetest --id @client --host " + getLocalIP() + " --to --id @lab --host --tunnel --encrypt --psk --pfs" output (outmsg("Adding \"livetest\" connection"), whack(conn, 0)) if verbose: print conn if not no_net: output (outmsg("Loading \"livetest\" preshared key"), whack("ipsec whack --listen", 0)) if not no_net: output (outmsg("Starting \"livetest\" connection (this may take sometime)"), whack("ipsec whack --initiate --name livetest", 60)) if retryflag: output (outmsg("Checking log about tunnel"), checkLog()) commands.getoutput("ipsec setup stop") sys.exit(1) if not no_net: output (outmsg("Pinging threw connection"), livetestPing()) if not no_net and retryflag: output (outmsg("Retry: Pinging threw connection"), livetestPing()) try: if not no_net: output (outmsg("Running MTU check"), checkMTU(10)) except TimeExceededError: pass if not no_net: output (outmsg("Stopping Libreswan"), stopPluto()) if not no_net: barfUpload() except KeyboardInterrupt: commands.getoutput("ipsec setup stop") print sys.exit(1) main()