Multi-threaded Port Scanner

One of the core functions performed during network reconnaissance is identifying open ports on a target host.

There exist hundreds of tools that can scan for open ports, but the fundamental idea of a port scanner is attempting to establish a socket connection. If the connection is successful, that port is open. If not, no services exist there or the host is otherwise rejecting or filtering packets for that port.

If the number of ports to be scanned is relatively small, it’s easy enough to test those ports manually or with a script. This is a great step towards automation, but doesn’t scale very well.

That’s where multithreading comes in.

Multithreading allows a process to execute certain steps concurrently or in parallel. In the case of port scanning, multiple threads allows multiple ports to be scanned at the same time, drastically reducing the time necessary to scan the full range of desired ports.

I wrote a Python program that does exactly this. Let’s walk through the code to understand what’s happening.

First we have to import libraries for networking, threading and argument parsing.

import socket
import argparse
import ipaddress
import threading
from queue import Queue
import time

Next, we will use a queue to allow the multiple threads to see which ports haven’t been scanned.

q = Queue()

A mutex lock will be necessary to prevent other threads from accessing or overwriting certain data.

THREAD_LOCK = threading.Lock()

The port_scan function attempts to establish a connection using the provided host and port parameters. If it cannot connect, a socket error exception is thrown (but we choose to ignore it).

def port_scan(host: str, port: int) -> None:
    try:
        con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        con.connect((host, port))

        # prevent other threads from accessing port variable
        with THREAD_LOCK:
            print('Port %d is open' % port)

        # disconnect from socket
        con.close()
    except socket.error:
        pass

This function governs the threads we will create through the start_threads() function. It gets the host:port data off of the queue, scans the port, then lets the queue know that particular host:port combination has been scanned.

def scanner() -> None:
    """ Execute this thread's task """
    while True:
        # retrieve target from the queue
        target = q.get()

        # target is stored as list containing two columns, host and port
        host = target[0]
        port = target[1]

        # run the scan on this socket
        port_scan(host, port)

        # inform the queue that this queued task has been processed
        q.task_done()

This is a type checking function used to validate that the host argument is a valid IP address (in this case, I have limited it to RFC 1918 private addresses).

def valid_ip_addr(target: str) -> str:
    """ Validate target to scan is a valid private IP address """
    if not ipaddress.ip_address(target).is_private:
        raise argparse.ArgumentTypeError("Target is limited to private IP address space (RFC 1918)")
    return target

This function is responsible for generating an arbitrary number of threads that will be used to scan ports. We set the function for the thread to use the scanner() method.

Setting the threads to start as a daemon means they can run independently of the main thread. This is perfect for scanning ports, since a daemon thread will also continue running until the main thread exits, meaning each thread can always pick up a new port to scan.

def start_threads(num_threads:int ) -> None:
    """ Creates threads for executing scans in parallel """

    for thread in range(num_threads):
        # target refers to the algorithm followed by each individual thread
        thread = threading.Thread(name='scanner_'+ str(thread), target=scanner)
        thread.daemon = True
        thread.start()

This is the main entry thread which handles arguments to be used when running the program as well as starting the threads, populating the queue with the specified port ranges, and tracking execution time.

def main() -> None:
    """ Parent thread that spawns children processes to scan multiple ports """
    argp = argparse.ArgumentParser(description='Perform a basic port scan')

    # add available arguments
    argp.add_argument(
        '-s', '--start',
        required=True,
        type=int,
        default=None,
        help='Specify the start of the port range')
    argp.add_argument(
        '-e', '--end',
        type=int,
        default=None,
        help='Specify the end of the port range')
    argp.add_argument(
        '-t', '--threads',
        type=int,
        default=24,
        help='Specify the number of threads to spawn')
    argp.add_argument(
        'HOST',
        type=valid_ip_addr,
        default=None,
        help='Specify the host to scan')
    args = argp.parse_args()

    # give args a friendly name
    host = args.HOST
    num_threads = args.threads

    if (args.start is not None) and (args.end is None):
        # if only start port is issued
        start_port = args.start
        end_port = args.start + 1
    elif (args.start is not None) and (args.end is not None):
        # if both args are issued
        start_port = args.start
        end_port = args.end + 1
    else:
        raise argp.error("[HOST] and --start are required, --end is optional")

    # track start time for performance display
    start_time = time.time()

    # spawn a number of threads to scan a queue of ports
    start_threads(num_threads)

    # queue up the ports to be scanned
    for port in range(start_port, end_port):
        # use a list to store host:port on a single queue entry
        q.put([host, port])

    # hold until all ports have been scanned
    q.join()

    # display performance data
    run_time = float("%0.2f" % (time.time() - start_time))
    print("Run Time: %f seconds" % run_time)

if __name__ == '__main__':
    main()

Source code available via GitHub.