苏飞论坛

 找回密码
 马上注册

QQ登录

只需一步,快速开始

分布式系统框架(V2.0) 轻松承载百亿数据,千万流量!讨论专区 - 源码下载 - 官方教程

HttpHelper爬虫框架(V2.7-含.netcore) HttpHelper官方出品,爬虫框架讨论区 - 源码下载 - 在线测试和代码生成

HttpHelper爬虫类(V2.0) 开源的爬虫类,支持多种模式和属性 源码 - 代码生成器 - 讨论区 - 教程- 例子

查看: 26482|回复: 26

[功能帮助类] C# NetHelper网络通信编程类[异步]

[复制链接]
发表于 2013-6-26 17:02:36 | 显示全部楼层 |阅读模式

之前飞哥发了一个同步的版本,我来个异步的吧.{:soso_e113:}(只写了tcp,udp懒得写了.)
http://www.sufeinet.com/thread-3678-1-1.html
注释都写了.过程也理清楚了.完全当学习的东西看吧..

  1. <P>/*
  2. 说明:Socket辅助类
  3. 作者:Ro4ters
  4. 编写时间: 26-06-2013
  5. 主要功能:
  6.    接收发送数据.(异步)</P>
  7. <P>*/
  8.        class SocketHelper
  9.     {
  10.         public delegate void Uplistbox(string txt);
  11.         public Uplistbox uplb;  //更新listbox委托变量
  12.         protected static readonly ILog logger = LogManager.GetLogger(typeof(TCPSocketServer2));
  13.         private ManualResetEvent allDone = new ManualResetEvent(false); //线程控制
  14.         protected Hashtable m_workerSocketHt = Hashtable.Synchronized(new Hashtable());//保存客户端socket哈希表
  15.         public AsyncCallback asyncCallBack;  //声明异步完成时调用的方法.
  16.         private Socket m_mainSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);//主通信Socket
  17.         public bool isExit = false;//全局控制SOCKET启动和关闭
  18.         private long m_clientCount = 0L;//记录客户端数量
  19.         /// <summary>
  20.         /// 发送消息
  21.         /// </summary>
  22.         /// <param name="buffer">需要发送的字节</param>
  23.         public void Send(byte[] buffer)
  24.         {
  25.             try
  26.             {
  27.                 //目标地址
  28.                 IPEndPoint ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234);
  29.                 //发送通信socket
  30.                 Socket sk = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  31.                 sk.Connect(ip);//连接
  32.                 sk.Send(buffer);//发送
  33.                 SocketUser su = new SocketUser();
  34.                 su.CurrentSocket = sk;
  35.                 su.DataBuffer = buffer;
  36.                 //异步接收
  37.                 asyncCallBack = new AsyncCallback(OnDataReceived);
  38.                 sk.BeginReceive(su.DataBuffer, 0, su.DataBuffer.Length, SocketFlags.None, asyncCallBack, su);</P>
  39. <P>            }
  40.             catch (Exception ex)
  41.             {
  42.                 string msg = ex.ToString() + " " + DateTime.Now.ToString();
  43.                 uplb(msg);
  44.                 logger.Debug(msg);
  45.             }
  46.         }
  47.         /// <summary>
  48.         /// 发送消息[重载]
  49.         /// </summary>
  50.         /// <param name="SocketUser">需要发送的SocketUser对象</param>
  51.         /// <param name="buffer">需要发送的字节</param>
  52.         public void Send(SocketUser su, byte[] buffer)
  53.         {
  54.             try
  55.             {
  56.                 //目标地址
  57.                 IPEndPoint ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234);
  58.                 //发送通信socket
  59.                 su.CurrentSocket.Connect(ip);//连接
  60.                 su.CurrentSocket.Send(buffer);//发送
  61.                 su.DataBuffer = buffer;
  62.                 //异步接收
  63.                 asyncCallBack = new AsyncCallback(OnDataReceived);
  64.                 su.CurrentSocket.BeginReceive(su.DataBuffer, 0, su.DataBuffer.Length, SocketFlags.None, asyncCallBack, su);</P>
  65. <P>            }
  66.             catch (Exception ex)
  67.             {
  68.                 string msg = ex.ToString() + " " + DateTime.Now.ToString();
  69.                 uplb(msg);
  70.                 logger.Debug(msg);
  71.             }
  72.         }
  73.         /// <summary>
  74.         /// 建立监听连接
  75.         /// </summary>
  76.         /// <param name="port">监听的端口号</param>
  77.         private void AcceptConnection(object port)
  78.         {
  79.             try
  80.             {</P>
  81. <P>                IPEndPoint ipLocal = new IPEndPoint(IPAddress.Any, (int)port);
  82.                 //绑定地址
  83.                 m_mainSocket.Bind(ipLocal);
  84.                 //开始监听
  85.                 m_mainSocket.Listen(10000);
  86.                 while (isExit == false)
  87.                 {
  88.                     try
  89.                     {
  90.                         allDone.Reset(); //线程阻塞
  91.                         logger.Debug("TcpListener启动成功: " + ipLocal.Address.ToString() + "," + ipLocal.Port.ToString());
  92.                         //接收异步连接并创建新的socket对象.
  93.                         m_mainSocket.BeginAccept(new AsyncCallback(OnClientConnect), m_mainSocket);
  94.                         allDone.WaitOne();
  95.                     }
  96.                     catch (SocketException sf)
  97.                     {
  98.                         logger.Error("客户连接请求异常" + sf.ToString());
  99.                     }
  100.                     catch (Exception f)
  101.                     {
  102.                         logger.Error("客户连接请求异常" + "错误信息:" + f.Message);
  103.                         break;
  104.                     }</P>
  105. <P>                }
  106.             }
  107.             catch (SocketException se)
  108.             {
  109.                 logger.Error("TcpListener启动失败" + se.ToString() + "错误号:" + se.ErrorCode.ToString());</P>
  110. <P>            }
  111.             catch (Exception e)
  112.             {
  113.                 logger.Error("TcpListener错误" + "错误信息:" + e.Message);
  114.             }</P>
  115. <P>        }
  116.         /// <summary>
  117.         /// 开始监听
  118.         /// </summary>
  119.         /// <param name="Port">监听的端口</param>
  120.         /// <param name="buffer">发送的字节</param>
  121.         public bool StartListening(int Port, params byte[] buffer)
  122.         {
  123.             if (Port == 0)
  124.             {
  125.                 logger.Debug("TcpListener未启动,请指定监听的端口号.");
  126.                 return false;
  127.             }</P>
  128. <P>            // 创建监听socket
  129.             Thread AcceptTh = new Thread(new ThreadStart(delegate
  130.             {
  131.                 AcceptConnection(Port);
  132.             }));
  133.             AcceptTh.Start();
  134.             Thread sendth = new Thread(new ThreadStart(delegate
  135.             {
  136.                 Send(buffer);
  137.             }));
  138.             sendth.Start();
  139.             logger.Debug("启动TCP监听,端口: " + Port);
  140.             return true;
  141.         }</P>
  142. <P>//下面方法在二楼
  143.       }
  144. }</P>
  145. <P> </P>
复制代码
Have Fun; {:soso_e121:}
26-06-2013


1. 开通SVIP会员,免费下载本站所有源码,不限次数据,不限时间
2. 加官方QQ群,加官方微信群获取更多资源和帮助
3. 找站长苏飞做网站、商城、CRM、小程序、App、爬虫相关、项目外包等点这里
 楼主| 发表于 2013-6-26 17:02:50 | 显示全部楼层
  1.   /// <summary>
  2.         /// 释放Socket
  3.         /// </summary>
  4.         /// <param name="user"></param>
  5.         public void ClientDisConnect(SocketUser user)
  6.         {
  7.             try
  8.             {
  9.                 logger.Error("数据中心:" + user.ClientName + "," + user.LoginIP + " 断开连接");
  10.                 if (user != null)
  11.                 {
  12.                     if (user.CurrentSocket != null)
  13.                     {
  14.                         //释放连接
  15.                         user.CurrentSocket.Shutdown(SocketShutdown.Both);
  16.                         user.CurrentSocket.Close();
  17.                         user.CurrentSocket = null;
  18.                     }
  19.                     user = null;

  20.                 }
  21.             }
  22.             catch (Exception e)
  23.             {
  24.                 logger.Error("关闭TCP连接错误:" + e.ToString());
  25.             }
  26.         }
  27.         /// <summary>
  28.         /// 异步接收传入连接
  29.         /// </summary>
  30.         /// <param name="asyn"></param>
  31.         public void OnClientConnect(IAsyncResult asyn)
  32.         {
  33.             try
  34.             {
  35.                 allDone.Set();
  36.                 if (m_mainSocket == null)
  37.                 {
  38.                     logger.Debug("TCP监听服务已关闭");
  39.                     return;
  40.                 }
  41.                 Socket workerSocket = (Socket)asyn.AsyncState;
  42.                 Socket Temuser = m_mainSocket.EndAccept(asyn);//异步接收
  43.                 Interlocked.Increment(ref m_clientCount);  //连接数量
  44.                 SocketUser user = new SocketUser(Temuser, m_clientCount);
  45.                 WaitForData(user); //等待线程数据

  46.             }
  47.             catch (Exception ex)
  48.             {
  49.                 System.Windows.Forms.MessageBox.Show(ex.ToString());
  50.                 return;
  51.             }

  52.         }



  53.         /// <summary>
  54.         /// 线程等待数据
  55.         /// </summary>
  56.         /// <param name="soc"></param>
  57.         /// <param name="clientNumber"></param>
  58.         public void WaitForData(SocketUser theUser)
  59.         {
  60.             try
  61.             {
  62.                 asyncCallBack = new AsyncCallback(OnDataReceived); //实例化委托.
  63.                 if (theUser != null)
  64.                 {
  65.                     //异步接收
  66.                     theUser.CurrentSocket.BeginReceive(theUser.DataBuffer, 0, theUser.DataBuffer.Length, SocketFlags.None, asyncCallBack, theUser);
  67.                 }

  68.             }
  69.             catch (SocketException se)
  70.             {
  71.                 logger.Error("数据处理异常1" + se.ToString());
  72.             }
  73.         }
  74.         byte[] buffer = new byte[1024 * 1024];
  75.         /// <summary>
  76.         /// 接收数据(异步回调)
  77.         /// </summary>
  78.         /// <param name="asyn">异步返回的连接状态</param>
  79.         public void OnDataReceived(IAsyncResult asyn)
  80.         {
  81.             SocketUser theUser = (SocketUser)asyn.AsyncState;
  82.             try
  83.             {
  84.                 if (theUser.CurrentSocket == null)
  85.                 {
  86.                     logger.Error("socket is null ");
  87.                     return;
  88.                 }
  89.                 int iRx = theUser.CurrentSocket.EndReceive(asyn); //接收到的消息长度
  90.                 if (iRx == 0)
  91.                 {
  92.                     logger.Error("没有接收的数据,断开tcp连接");
  93.                     throw new SocketException(10054);
  94.                 }
  95.                 //提取有效字节数据
  96.                 byte[] data = new byte[iRx];
  97.                 byte[] buff = theUser.DataBuffer;
  98.                 for (int i = 0; i < iRx; i++)
  99.                 {
  100.                     data[i] = buff[i];
  101.                 }

  102.                 //数据处理
  103.                 ProcessReceiveData(theUser, data);

  104.                 //继续等待数据
  105.                 if (isExit == false)
  106.                 {
  107.                     WaitForData(theUser);
  108.                 }
  109.             }
  110.             catch (ObjectDisposedException ode)
  111.             {
  112.                 logger.Error("数据处理异常2" + ode.ToString());
  113.             }
  114.             catch (SocketException se)
  115.             {
  116.                 if (se.ErrorCode == 10054)
  117.                 {
  118.                     logger.Error("Tcp连接被关闭 " + se.Message);
  119.                     ClientDisConnect(theUser);
  120.                 }
  121.                 else
  122.                 {
  123.                     logger.Error("数据处理异常3" + se.ToString());
  124.                 }
  125.             }
  126.         }
  127.         /// <summary>
  128.         /// 处理数据.
  129.         /// </summary>
  130.         /// <param name="socketuser">接收消息的SocketUser</param>
  131.         /// <param name="buffer">接收的消息</param>
  132.         private void ProcessReceiveData(SocketUser socketuser, byte[] buffer)
  133.         {
  134.             string msg = System.Text.Encoding.ASCII.GetString(buffer);
  135.             uplb(msg);
  136.         }
  137.    
复制代码
 楼主| 发表于 2013-6-26 17:06:20 | 显示全部楼层
还是发个全的吧.....

SocketAsyncHelper.zip

4.58 KB, 下载次数: 663, 下载积分: 金钱 -1

发表于 2013-6-26 17:12:24 | 显示全部楼层
异步的我的库里也有,只是还不稳定,没开放,异步不同于同步的那么稳定,我目前还没有研究透彻,我建议你能个例子,大量数据测试一下,异步的功能和同步的就是多了个线程和委托控件,最关键的是稳定性。
其他 的我感觉都是次要的。
楼主这个看着行,不知道实际怎么样。有时间给能个例子上来测试下吧
 楼主| 发表于 2013-6-26 19:04:07 | 显示全部楼层
站长苏飞 发表于 2013-6-26 17:12
异步的我的库里也有,只是还不稳定,没开放,异步不同于同步的那么稳定,我目前还没有研究透彻,我建议你能 ...

实际大批量数据效果还有待测试,我服务器上代码用的是异步,每天2000+客户端同时连接(5-15秒发送一条数据)...暂时没发现太大的问题,可能数据有点小的原因吧.
发表于 2013-6-26 20:37:50 | 显示全部楼层
ro4ters 发表于 2013-6-26 19:04
实际大批量数据效果还有待测试,我服务器上代码用的是异步,每天2000+客户端同时连接(5-15秒发送一条数据). ...

那也不错了,还有一个就是并发,你这个量,并发的可能不大,不错了,挺好
发表于 2014-4-22 21:58:47 | 显示全部楼层
想学习这个  但现在 还在到基础 。。
发表于 2014-6-4 14:34:41 | 显示全部楼层
强烈支持楼主ing……
发表于 2014-6-6 16:55:46 | 显示全部楼层
受教了,学习中……
发表于 2014-6-9 15:40:27 | 显示全部楼层
刚开始学习这个东西 能不能给个实例呀 ,, 万分感谢 ,, ,,,zll88108@126.com
您需要登录后才可以回帖 登录 | 马上注册

本版积分规则

QQ|手机版|小黑屋|手机版|联系我们|关于我们|广告合作|苏飞论坛 ( 豫ICP备18043678号-2)

GMT+8, 2025-1-23 07:05

© 2014-2021

快速回复 返回顶部 返回列表