python tcp网络连接
时间: 2025-05-14 21:56:27 浏览: 16
### Python TCP Socket 网络连接示例
以下是基于 `socket` 库实现的一个简单的 TCP 客户端和服务器的示例。
#### 1. 创建一个基本的 TCP 服务器
服务器会监听指定的 IP 和端口,接收来自客户端的消息并返回响应。
```python
import socket
def start_tcp_server(ip, port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字对象
server_socket.bind((ip, port)) # 绑定地址和端口
server_socket.listen(5) # 开始监听,最大等待队列长度为5
print(f"[*] Listening on {ip}:{port}")
while True:
client_socket, addr = server_socket.accept() # 接收客户端连接请求
print(f"[+] Accepted connection from {addr[0]}:{addr[1]}")
try:
message = "Welcome to the TCP Server!"
client_socket.send(message.encode()) # 向客户端发送消息
request = client_socket.recv(1024).decode() # 接收客户端数据
print(f"[>] Received: {request}") # 打印收到的数据
finally:
client_socket.close() # 关闭客户端连接
# 调用函数启动服务
start_tcp_server('127.0.0.1', 9999)
```
此代码展示了如何设置一个基础的 TCP 服务器[^3]。它绑定到本地回环地址 (`127.0.0.1`) 并监听端口 `9999`。
---
#### 2. 实现一个简单的 TCP 客户端
客户端尝试连接到上述服务器,并向其发送一条消息。
```python
import socket
def connect_to_tcp_server(ip, port, message):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建套接字对象
client_socket.connect((ip, port)) # 连接到目标服务器
response = client_socket.recv(1024).decode() # 接收服务器欢迎消息
print(f"[<] Response from server: {response}")
client_socket.send(message.encode()) # 发送自定义消息给服务器
print("[*] Message sent successfully.")
client_socket.close()
# 测试调用
connect_to_tcp_server('127.0.0.1', 9999, 'Hello, this is a test!')
```
这段代码演示了一个基本的 TCP 客户端操作流程。它连接至运行中的服务器,打印服务器发来的欢迎信息,并发送自己的测试消息。
---
#### 3. 长连接框架扩展
如果需要支持更复杂的场景(如长时间保持连接),可以参考站内引用的内容[^2]:
- 使用线程管理多个并发连接。
- 添加锁机制以保护共享资源的安全访问。
下面是一个简化版的长连接模型:
```python
import socket
import threading
class LongConnectionClient:
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.thread_lock = threading.Lock()
self.is_running = False
def start_connection(self):
self.sock.connect((self.ip, self.port))
self.is_running = True
thread = threading.Thread(target=self.handle_communication, daemon=True)
thread.start()
def handle_communication(self):
while self.is_running:
with self.thread_lock:
data = self.sock.recv(1024).decode()
if not data:
break
print(f"[<] Received: {data}")
def send_message(self, msg):
with self.thread_lock:
self.sock.send(msg.encode())
def stop_connection(self):
self.is_running = False
self.sock.close()
client = LongConnectionClient('127.0.0.1', 9999)
client.start_connection()
for i in range(5): # 模拟多次发送消息
client.send_message(f"Message #{i}")
input("Press Enter to exit...")
client.stop_connection()
```
该脚本实现了多线程环境下的持续通信功能,适合处理实时交互需求的应用程序。
---
### 总结
以上分别介绍了短连接模式下的一次性传输以及适用于复杂业务逻辑的长连接设计思路。具体实现可根据实际项目调整优化。
阅读全文
相关推荐

















