socket编程

这篇文章的目的是从零开始逐步写出一个功能完善的http服务。 使用 python 3.12 。 选择 python 是因为 c/c++ 处理字符串很麻烦; java 搭建环境很麻烦; c# 是微软系统,笔者不喜欢; php 对多线程多进程的支持并不好; go 对新手并不友好,而且也不够流行; javascript 似乎没有问题,以后有空也写一个用 javascript 实现的版本。

版本1 只能处理一个请求

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建一个 TCP 套接字
    server.bind(("0.0.0.0", 8012)) # 绑定 IP 地址和端口号
    server.listen(1)
    client_socket, client_address = server.accept()
    client_socket.send(bytes("hello world", encoding="utf-8"))
    client_socket.close()

if __name__ == "__main__":
    main()

版本2 可以处理多个请求,但每次只能处理一个请求 可以使用多次,但无法用 ctrl+c 退出

import socket

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    main()

版本3 可以使用多次,可以用 ctrl+c 退出

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本4 在上一个版本的基础上加了一个 sleep 函数,用于观察一次只能处理一个连接的问题

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        client_socket.send(bytes("hello world", encoding="utf-8"))
        time.sleep(5)
        client_socket.close()

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 使用多线程实现的,可以同时处理多个请求的版本

import socket
import signal
import sys
import threading
import time

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world", encoding="utf-8"))
    time.sleep(5)
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("0.0.0.0", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 discard

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        data = client_socket.recv(1024)
        if not data:
            break  # 如果没有数据,退出循环
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 time

import socket
import signal
import sys
import threading
import time
import datetime
import struct

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(struct.pack("!I", int(time.time()) + 2209017600))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 daytime

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes(time.strftime("%a, %d %b %Y %H:%M:%S %z"), encoding="utf-8"))
    client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 chargen

import socket
import signal
import sys
import threading
import time
import datetime
import string
import random

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    while True:
        for i in range(72):
            # 生成随机字符
            char = random.choice(string.ascii_letters + string.digits + string.punctuation)
            client_socket.send(bytes(str(char.encode('utf-8'), encoding="utf-8"), encoding="utf-8"))
        client_socket.send(bytes("\n", encoding="utf-8"))
        time.sleep(1)

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

版本5 基础上的 echo

import socket
import signal
import sys
import threading
import time
import datetime

# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    sys.exit(0)

def work(client_socket):
    client_socket.send(bytes("hello world\n", encoding="utf-8"))
    client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    while True:
        # 接收数据
        data = client_socket.recv(1024)
        # print(data)
        if not data:
            break  # 如果没有数据,退出循环
        # 发送回客户端
        # time.sleep(10)
        client_socket.sendall(data)
    # client_socket.send(bytes(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), encoding="utf-8"))
    # # time.sleep(1)
    # client_socket.close()

def main():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(("127.0.0.1", 8012))
    server.listen(5)
    while True:
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)

五个协议基本上都是只修改 work 函数 echo daytime time discard chargen

版本6 静态http

import socket
import signal
import sys
import threading
import time
import datetime
import os
from pathlib import Path
import mimetypes

SERVER_VERSION = '1.0'
SERVER_NAME = 'plusplus123'

def parser_request_header(http_request):
    lines = http_request.strip().split('\r\n')
    # 解析请求行(第一行)
    request_line = lines[0]
    parts = request_line.split(' ', 2)  # 最多分割成3部分
    line = {
        'method': parts[0] if len(parts) > 0 else '',
        'path': parts[1] if len(parts) > 1 else '',
        'httpversion': parts[2] if len(parts) > 2 else ''
    }
    
    # 解析请求头(从第二行开始,直到遇到空行)
    header = {}
    for i in range(1, len(lines)):
        if lines[i].strip() == '':  # 遇到空行,忽略
            continue
        if ':' in lines[i]:
            key, value = lines[i].split(':', 1)  # 只分割第一个冒号
            header[key.strip()] = value.strip()

    return {
        'line': line,
        'header': header
    }

def read_staticfile(fullPath):
    # 获取文件大小
    file_size = os.path.getsize(fullPath)
    # 获取MIME类型
    mime_type, _ = mimetypes.guess_type(fullPath)
    if mime_type is None:
        mime_type = 'application/octet-stream'

    if mime_type.startswith("text"):
        file_encoding = detect_encoding(fullPath)
        if file_encoding is not None:
            mime_type += '; charset=' + file_encoding

    p = Path(fullPath)
    filecontent = p.read_bytes()

    return {
        'content': filecontent,
        'content_length': file_size,
        'mime_type': mime_type
    }

def detect_encoding(file_path):
    try:
        with open(file_path, 'rb') as file:
            raw_data = file.read(100)
        # 常见编码列表
        encodings = ['UTF-8','gbk','gb18030','gb2312','iso8859-1','UTF-16BE','UTF-16LE','UTF-32BE','UTF-32LE','big5','big5hkscs']
        detected_encoding = None
        for encoding in encodings:
            try:
                decoded_text = raw_data.decode(encoding)
                detected_encoding = encoding.lower()
                break
            except UnicodeDecodeError:
                continue
        if detected_encoding:
            return detected_encoding
        else:
            return None
    except Exception as e:
        return None

def list_directory(dir_path):
    items = os.listdir(dir_path)
    items.sort()

    li = ''
    if dir_path != './':
        li += '<li><a href="../">../</a></li>'
    for item in items:
        item_path = os.path.join(dir_path, item)
        if os.path.isdir(item_path):
            li += f'<li><a href="{item}/">{item}/</a></li>'
        else:
            li += f'<li><a href="{item}">{item}</a></li>'

    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <title>Directory {dir_path}</title>
    </head>
    <body>
    <h1>Directory {dir_path}</h1>
    <ul>{li}</ul>
    </body>
    </html>
    """

    html_bytes = html.encode('utf-8')
    content_length = len(html_bytes)

    return {
        'content': html_bytes,
        'content_length': content_length,
        'mime_type': 'text/html; charset=utf-8'
    }

def get_statusText(statusCode):
    statusText = ''
    if statusCode == 200:
        statusText = 'OK'
    elif statusCode == 400:
        statusText = 'Bad Request'
    elif statusCode == 403:
        statusText = 'Forbidden'
    elif statusCode == 404:
        statusText = 'Not Found'
    elif statusCode == 500:
        statusText = 'Server Error'
    return statusText    

def builtin_response(statusCode):

    statusText = get_statusText(statusCode)

    html = f'<html><head><title>{statusCode} {statusText}</title></head><body><h1>{statusCode} {statusText}</h1></body></html>'
    html_bytes = html.encode('utf-8')
    content_length = len(html_bytes)

    return {
        'content': html_bytes,
        'content_length': content_length,
        'mime_type': 'text/html; charset=utf-8'
    }

def build_response(responseBoday, statusCode = 200, responseHeader = {}):

    statusText = get_statusText(statusCode)
    responseHeader['Server'] = SERVER_NAME + '/' + SERVER_VERSION
    responseHeader['Connection'] = 'close'

    response = bytes(f'HTTP/1.1 {statusCode} {statusText}' + '\r\n' + '\r\n'.join([f'{key}: {value}' for key, value in responseHeader.items()]) + '\r\n\r\n', encoding="utf-8") + responseBoday
    return response

def work(client_socket, server):

    # 接收数据
    data = client_socket.recv(8192)
    # print(data)
    if not data:
        client_socket.close()
        return

    result = data.split(b'\r\n\r\n', 1)
    print(result)
    result_len = len(result)
    if result_len == 1:
        request_header = parser_request(result[0].decode('utf-8'))
    elif result_len > 1:
        request_header = parser_request_header(result[0].decode('utf-8'))
        request_body = result[1]
    else:
        request = parser_request(result[0])
        # 400
        return

    if request_header['line']['method'].upper() == 'GET':
        urlpath = request_header['line']['path']
        rootdir = '.'
        defaultFile = 'index.html'
        if urlpath == '':
            fullPath = '/' + defaultFile
        fullPath = rootdir + urlpath
        myfile = Path(fullPath)
        if fullPath.endswith('/'): # 目录
            if myfile.is_dir(): # 指定的目录存在
                defaultFile = fullPath + defaultFile
                myfile = Path(defaultFile)
                if myfile.is_file(): # 尝试访问默认文件
                    statusCode = 200
                    response = read_staticfile(defaultFile)
                else:
                    # 返回目录列表
                    statusCode = 200
                    response = list_directory(fullPath)
            else: # 目录不存在
                # 404
                statusCode = 404
                response = builtin_response(statusCode)
                return
        else: # 文件
            if myfile.is_file(): # 指定的文件存在
                statusCode = 200
                response = read_staticfile(fullPath)
            else:
                # 404
                statusCode = 404
                response = builtin_response(statusCode)
    else:
        # 400
        statusCode = 400
        response = builtin_response(statusCode)

    response = build_response(response['content'], statusCode, {
        'Content-Type': response['mime_type'],
        'Content-Length': response['content_length'],
    })
    client_socket.sendall(response)
    time.sleep(1)
    client_socket.close()
    return

def main():
    global stop_thread
    try:
        # socket.AF_INET 表示使用 ipv4
        # socket.SOCK_STREAM 表示使用 tcp
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(("127.0.0.1", 8012))
        server.listen(5)
    except Exception as e:
        stop_thread = True
        print(f"服务器错误: {e}")
    while True:
        if stop_thread == True:
            break
        client_socket, client_address = server.accept()
        work_thread = threading.Thread(target=work, args=[client_socket, server]) # 创建一个子线程对象
        work_thread.daemon = True # 将子线程设置为守护线程
        work_thread.start() # 启动子线程
    if stop_thread == True:
        server.shutdown(socket.SHUT_RDWR)
        server.close()

stop_thread = False
# 定义一个信号处理函数
def signal_handler_quit(signum, frame):
    global stop_thread
    stop_thread = True
    sys.exit(0)

if __name__ == "__main__":

    signal.signal(signal.SIGINT, signal_handler_quit) # 注册信号处理函数,处理 SIGINT 信号
    signal.signal(signal.SIGTERM, signal_handler_quit) # 注册信号处理函数,处理 SIGTERM 信号

    work_thread = threading.Thread(target=main) # 创建一个子线程对象
    work_thread.daemon = True # 将子线程设置为守护线程
    work_thread.start() # 启动子线程

    while True: # 主线程的循环
        time.sleep(1)
        if stop_thread == True:
            sys.exit(0)

版本7 使用面向对象写法的 静态http

版本7 有配置的静态http ip port rootdir mime 这个可以用 mimetypes ,不需要配置 才三个参数,直接用命令行就可以了吧 默认页 目录页 日志(写入文件 命令行输出)

版本7 有配置的静态http+cgi

版本7 有配置的静态http+cgi+http代理

版本7 有配置的静态http+cgi+http代理+socks5代理

既然要实现 http代理 那么 https 和 ws 和 wss 的代理应该也差不多吧 暂时先忽略 fastcgi 和 wsgi

python的命令行参数 https://docs.python.org/zh-cn/3/library/argparse.html https://docs.python.org/zh-cn/3/library/getopt.html

python的配置文件 https://docs.python.org/zh-cn/3/library/configparser.html https://docs.python.org/zh-cn/3/library/json.html

好像只有 windows 下才需要特别处理信号,linux是可以直接用 ctrl+c 退出的 https://www.fournoas.com/posts/handling-signal-in-python-on-different-platforms/ https://www.fournoas.com/posts/why-does-ctrl-c-not-kill-python-process-in-windows-console/