设备基础命令
This commit is contained in:
179
DeviceCommand/Base/TCP.cs
Normal file
179
DeviceCommand/Base/TCP.cs
Normal file
@@ -0,0 +1,179 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace DeviceCommand.Base
|
||||
{
|
||||
public class Tcp : ITcp
|
||||
{
|
||||
public string IPAddress { get; set; } = "127.0.0.1";
|
||||
public int Port { get; set; } = 502;
|
||||
public int SendTimeout { get; set; } = 3000;
|
||||
public int ReceiveTimeout { get; set; } = 3000;
|
||||
|
||||
private TcpClient _tcpClient;
|
||||
public bool IsConnected => _tcpClient?.Connected ?? false;
|
||||
protected readonly SemaphoreSlim _commLock = new(1, 1);
|
||||
|
||||
public Tcp()
|
||||
{
|
||||
_tcpClient = new TcpClient();
|
||||
}
|
||||
|
||||
public void ConfigureDevice(string ipAddress, int port, int sendTimeout = 3000, int receiveTimeout = 3000)
|
||||
{
|
||||
IPAddress = ipAddress;
|
||||
Port = port;
|
||||
SendTimeout = sendTimeout;
|
||||
ReceiveTimeout = receiveTimeout;
|
||||
}
|
||||
|
||||
public virtual async Task<bool> ConnectAsync(CancellationToken ct = default)
|
||||
{
|
||||
await _commLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
if (_tcpClient.Connected) return true;
|
||||
|
||||
// 修复:释放并彻底清空旧的连接实例,否则复用引发异常
|
||||
_tcpClient.Close();
|
||||
_tcpClient.Dispose();
|
||||
|
||||
_tcpClient = new TcpClient();
|
||||
await _tcpClient.ConnectAsync(IPAddress, Port, ct);
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public virtual void Close()
|
||||
{
|
||||
if (_tcpClient.Connected) _tcpClient.Close();
|
||||
}
|
||||
|
||||
private async Task LoglessSendAsync(byte[] buffer, CancellationToken ct)
|
||||
{
|
||||
if (!IsConnected) throw new InvalidOperationException("TCP未连接。");
|
||||
|
||||
NetworkStream stream = _tcpClient.GetStream();
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
if (SendTimeout > 0) cts.CancelAfter(SendTimeout);
|
||||
|
||||
await stream.WriteAsync(buffer, 0, buffer.Length, cts.Token);
|
||||
}
|
||||
|
||||
public async Task SendAsync(byte[] buffer, CancellationToken ct = default)
|
||||
{
|
||||
await _commLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await LoglessSendAsync(buffer, ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendAsync(string str, CancellationToken ct = default)
|
||||
{
|
||||
await SendAsync(Encoding.UTF8.GetBytes(str), ct);
|
||||
}
|
||||
|
||||
public async Task<byte[]> ReadAsync(int length, CancellationToken ct = default)
|
||||
{
|
||||
await _commLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
if (!IsConnected) throw new InvalidOperationException("TCP未连接。");
|
||||
|
||||
NetworkStream stream = _tcpClient.GetStream();
|
||||
byte[] buffer = new byte[length];
|
||||
int offset = 0;
|
||||
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
if (ReceiveTimeout > 0) cts.CancelAfter(ReceiveTimeout);
|
||||
|
||||
while (offset < length)
|
||||
{
|
||||
int read = await stream.ReadAsync(buffer, offset, length - offset, cts.Token);
|
||||
if (read == 0) break;
|
||||
offset += read;
|
||||
}
|
||||
|
||||
return offset == 0 ? Array.Empty<byte>() : buffer[..offset];
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<string> LoglessReadAsync(string delimiter, CancellationToken ct)
|
||||
{
|
||||
if (!IsConnected) throw new InvalidOperationException("TCP未连接。");
|
||||
|
||||
delimiter ??= "\n";
|
||||
var sb = new StringBuilder();
|
||||
byte[] buffer = new byte[1024];
|
||||
NetworkStream stream = _tcpClient.GetStream();
|
||||
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
if (ReceiveTimeout > 0) cts.CancelAfter(ReceiveTimeout);
|
||||
|
||||
while (!cts.Token.IsCancellationRequested)
|
||||
{
|
||||
int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cts.Token);
|
||||
if (bytesRead == 0) throw new IOException("远程主机已关闭连接");
|
||||
|
||||
sb.Append(Encoding.UTF8.GetString(buffer, 0, bytesRead));
|
||||
|
||||
int index = sb.ToString().IndexOf(delimiter, StringComparison.Ordinal);
|
||||
if (index >= 0)
|
||||
return sb.ToString(0, index).Trim();
|
||||
}
|
||||
|
||||
throw new TimeoutException("读取数据超时");
|
||||
}
|
||||
|
||||
public async Task<string> ReadAsync(string delimiter = "\n", CancellationToken ct = default)
|
||||
{
|
||||
await _commLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
return await LoglessReadAsync(delimiter, ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
// 核心优化:确保发送与读取在同一组锁生命周期内
|
||||
public async Task<string> WriteReadAsync(string command, string delimiter = "\n", CancellationToken ct = default)
|
||||
{
|
||||
await _commLock.WaitAsync(ct);
|
||||
try
|
||||
{
|
||||
await LoglessSendAsync(Encoding.UTF8.GetBytes(command), ct);
|
||||
return await LoglessReadAsync(delimiter, ct);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_commLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_tcpClient?.Dispose();
|
||||
_commLock?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user