100分,一个优化挑战问题,请大家帮忙,解决一下多线程的问题,谢谢.
小弟现在做一个信息的系统,现在设计出来的信息的发送程序是单线程的.一天如果发送的信息量超过10万就会很慢,但现在平台的需求是一天有几十万,所以想改成多线程的,但本人对多线程又不熟悉,还请大家多帮忙.大概思路是先使用一个线程从数据库内取数据出来,然后用一个或多个(最好是多个)线程负责专门的发送,然后再用一个线程负责把发送好的结果往数据库内更新,请大家帮帮忙,用c#帮我写一下例子参与,谢谢.
[解决办法]
你这个问题太大,不好描述,
我的建议,你用一个或者两个队列(可以用LinkedList类来实现)作为线程的交换数据,取数据线程,不一定从数据库读取1000条,你可以用DataReader来读取所有的数据,每读到一条数据就生成一个实体类实例并放入发送队列中,实体类包含要发送的内容和结果,
发送线程可以预先启动,可以比读取线程先启动,发送线程内部是一个死循环,用信号量控制读取发送队列,一旦有数据进入队列就恢复运行,否则就阻塞等待队列中有内容,一旦队列中有数据放入,发送线程的死循环就从队列中取出实体类实例,执行发送操作,完成后把结果写入实体类实例,然后把实例放入另一个队列,比如保存队列中中,
然后保存线程也是预先启动好,也是个死循环,不断读取保存队列,用信号量控制,有实体类进入保存队列保存线程就恢复运行,然后把结果写入数据库,也可以批量写入以提高效率,
[解决办法]
除了Thread类之外,你可以看信号量Semaphore 类,这个类用来控制队列的读写很完美,可以让读取线程、发送线程和保存线程的速度一致,
[解决办法]
关注......期待......
[解决办法]
每个线程中处理10个,不用再次创建10个线程,单个线程去处理10个足以,只要你保证读取数据线程和处理线程要同步,即读完了,处理,处理完了,再次读取
[解决办法]
1天10万看起来不算多,但要知道每条数据的信息量是多少,如果每条都很大呢?
先检查瓶颈吧,到底是在网络还是在内部运算(包括取数据)。
网络是用udp还是tcp,对方有没有接收到是怎么判断的,这些都可能成为瓶颈。
[解决办法]
随手写的!这是代码,你测试看下是否正确
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using System.Collections;
using System.Threading;
namespace ConsoleApplication1
{
class Program
{
static SqlConnection con = new SqlConnection("Data Source=IP;Initial Catalog=数据库;User ID=sa;Password=;");
static SqlDataAdapter da;
static List<cescasio> GetListces = new List<cescasio>();//数据
static List<cescasio> SetListces = new List<cescasio>();//更新数据
static bool state = false;//是否创建线程
/// <summary> 获取数据 </summary>
static void GetData()
{
while (true) //采集数据
{
if (GetListces.Count == 0)//如果没有数据了采集数据
{
state = false;//停止
DataTable Dt = new DataTable();
con.Open();
da = new SqlDataAdapter("select top 1000 from 数据表", con);//查询语句
da.Fill(Dt);
con.Close();
for (int i = 0; i < Dt.Rows.Count; i++)
{
cescasio cess = new cescasio();
cess.xingming = Dt.Rows[i]["xingming"].ToString();
cess.ip = Dt.Rows[i]["id"].ToString();//主键
cess.id = Dt.Rows[i]["ip"].ToString();
GetListces.Add(cess);
}
state = true;//采集完毕 开始发送数据
}
Thread.Sleep(1000);//一秒一次自己控制
}
}
/// <summary>更新数据 </summary>
static void Update()
{
while (true)
{
if (SetListces.Count > 0)
{
//.....更新数据
//这个自己写吧
}
Thread.Sleep(10000);//十秒执行一次更新数据你自己控制
}
}
/// <summary>创建线程 </summary>
static void Create_Th()
{
while (true) //采集数据
{
if (state && GetListces.Count > 0)//是否创建
{
for (int i = 0; i <= GetListces.Count; i++)
{
List<cescasio> Create_Thlist = new List<cescasio>();
Create_Thlist.Add(GetListces[i]);//添加
GetListces.RemoveAt(i);//移除
if (Create_Thlist.Count == 100)
{
Thread th = new Thread(FaSong);
th.Start(Create_Thlist);
}
else if (i == GetListces.Count - 1)
{
Thread th = new Thread(FaSong);
th.Start(Create_Thlist);
}
}
}
Thread.Sleep(1000);//一秒一次自己控制
}
}
/// <summary>发送数据</summary>
static void FaSong(object Data_list_o)
{
while (true)
{
List<cescasio> Data_list = (List<cescasio>)Data_list_o;
for (int i = 0; i < Data_list.Count; i++)
{
//......在这里写发送数据
Console.WriteLine(Data_list[i].ip);//发送数据
//.............
SetListces.Add(Data_list[i]);//放入已发送的数据
}
Thread.Sleep(10);
}
}
static void Main(string[] args)
{
Thread thGetData = new Thread(GetData);//获取数据
Thread thUpdate = new Thread(Update);//更新数据
Thread thCreate_Th = new Thread(Create_Th);//创建线程发送数据
thGetData.Start();
thUpdate.Start();
thCreate_Th.Start();
}
}
/// <summary>
/// 结构体
/// </summary>
class cescasio
{
/// <summary>File_Name</summary>
public string xingming { get; set; }
public string ip { get; set; }
public string id { get; set; }
}
}
[解决办法]
逻辑上有个错误,修改下:你测试下,有问题回复我
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using System.Collections;
using System.Threading;
namespace ConsoleApplication1
{
class Program
{
static SqlConnection con = new SqlConnection("Data Source=IP;Initial Catalog=数据库;User ID=sa;Password=;");
static SqlDataAdapter da;
static List<cescasio> GetListces = new List<cescasio>();//数据
static List<cescasio> SetListces = new List<cescasio>();//更新数据
static bool state = false;//是否创建线程
static bool state_Get = false;//是否创建线程
static int cont = 0;
/// <summary> 获取数据 </summary>
static void GetData()
{
while (true) //采集数据
{
if (GetListces.Count == 0 && state_Get)//如果没有数据了采集数据
{
state = false;//停止
DataTable Dt = new DataTable();
con.Open();
da = new SqlDataAdapter("select top 1000 from 数据表", con);//查询语句
da.Fill(Dt);
con.Close();
for (int i = 0; i < Dt.Rows.Count; i++)
{
cescasio cess = new cescasio();
cess.xingming = Dt.Rows[i]["xingming"].ToString();
cess.ip = Dt.Rows[i]["id"].ToString();//主键
cess.id = Dt.Rows[i]["ip"].ToString();
GetListces.Add(cess);
}
cont = GetListces.Count;
state = true;//采集完毕 开始发送数据
state_Get = false;//等待更新后再继续采集
}
Thread.Sleep(1000);//一秒一次自己控制
}
}
/// <summary>更新数据 </summary>
static void Update()
{
while (true)
{
if (SetListces.Count == cont)
{
//.....更新数据
//这个自己写吧
}
state_Get = true;//开始下次采集
Thread.Sleep(100);//十秒执行一次更新数据你自己控制
}
}
/// <summary>创建线程 </summary>
static void Create_Th()
{
while (true) //采集数据
{
if (state && GetListces.Count > 0)//是否创建
{
List<cescasio> Create_Thlist = new List<cescasio>();
for (int i = 0; i <= GetListces.Count; i++)
{
Create_Thlist.Add(GetListces[i]);//添加
GetListces.RemoveAt(i);//移除
if (Create_Thlist.Count == 100)
{
Thread th = new Thread(FaSong);
th.Start(Create_Thlist);
Create_Thlist.Clear();
}
else if (i == GetListces.Count - 1)
{
Thread th = new Thread(FaSong);
th.Start(Create_Thlist);
Create_Thlist.Clear();
}
}
}
Thread.Sleep(1000);//一秒一次自己控制
}
}
/// <summary>发送数据</summary>
static void FaSong(object Data_list_o)
{
while (true)
{
List<cescasio> Data_list = (List<cescasio>)Data_list_o;
for (int i = 0; i < Data_list.Count; i++)
{
try
{
//......在这里写发送数据
Console.WriteLine(Data_list[i].ip);//发送数据
//.............
SetListces.Add(Data_list[i]);//放入已发送的数据
}
catch
{
//发送失败
cont = cont - 1;
}
}
Thread.Sleep(10);
}
}
static void Main(string[] args)
{
Thread thGetData = new Thread(GetData);//获取数据
Thread thUpdate = new Thread(Update);//更新数据
Thread thCreate_Th = new Thread(Create_Th);//创建线程发送数据
thGetData.Start();
thUpdate.Start();
thCreate_Th.Start();
}
}
/// <summary>
/// 结构体
/// </summary>
class cescasio
{
/// <summary>File_Name</summary>
public string xingming { get; set; }
public string ip { get; set; }
public string id { get; set; }
}
}
[解决办法]
最终代码,修改了下,
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Data;using System.Data.SqlClient;using System.Collections;using System.Threading;namespace ConsoleApplication1{ class Program { static SqlConnection con = new SqlConnection("Data Source=IP;Initial Catalog=数据库;User ID=sa;Password=;"); static SqlDataAdapter da; static List<cescasio> GetListces = new List<cescasio>();//数据 static List<cescasio> SetListces = new List<cescasio>();//更新数据 static bool state = false;//是否创建线程 static bool state_Get = false;//是否创建线程 static int cont = 0; /// <summary> 获取数据 </summary> static void GetData() { while (true) //采集数据 { if (GetListces.Count == 0 && state_Get)//如果没有数据了采集数据 { Console.WriteLine("开始采集数据...."); DataTable Dt = new DataTable(); con.Open(); da = new SqlDataAdapter("select top 1000 from 数据表", con);//查询语句 da.Fill(Dt); con.Close(); for (int i = 0; i < Dt.Rows.Count; i++) { cescasio cess = new cescasio(); cess.xingming = Dt.Rows[i]["xingming"].ToString(); cess.ip = Dt.Rows[i]["id"].ToString();//主键 cess.id = Dt.Rows[i]["ip"].ToString(); GetListces.Add(cess); } Console.WriteLine("采集完毕..."); cont = GetListces.Count; state = true;//采集完毕 开始发送数据 state_Get = false;//等待更新后再继续采集 } Thread.Sleep(1000);//一秒一次自己控制 } } /// <summary>更新数据 </summary> static void Update() { while (true) { if (SetListces.Count == cont) { for (int i = 0; i < SetListces.Count; i++) { //......在这里写发送数据 Console.WriteLine(SetListces[i].ip);//发送数据 SetListces.RemoveAt(i); } } state_Get = true;//开始下次采集 Thread.Sleep(10);//十秒执行一次更新数据你自己控制 } } /// <summary>创建线程 </summary> static void Create_Th() { while (true) //采集数据 { if (state && GetListces.Count > 0)//是否创建 { Console.WriteLine("开始创建进程"); List<cescasio> Create_Thlist = new List<cescasio>(); for (int i = 0; i <= GetListces.Count; i++) { Create_Thlist.Add(GetListces[i]);//添加 GetListces.RemoveAt(i);//移除 if (Create_Thlist.Count == 100) { Thread th = new Thread(FaSong); th.Start(Create_Thlist); Create_Thlist.Clear(); } else if (i == GetListces.Count - 1) { Thread th = new Thread(FaSong); th.Start(Create_Thlist); Create_Thlist.Clear(); } } state = false;//停止创建等待下次创建 Console.WriteLine("创建完毕"); } Thread.Sleep(10);//一秒一次自己控制 } } /// <summary>发送数据</summary> static void FaSong(object Data_list_o) { while (true) { List<cescasio> Data_list = (List<cescasio>)Data_list_o; for (int i = 0; i < Data_list.Count; i++) { try { //......在这里写发送数据 Console.WriteLine(Data_list[i].ip);//发送数据 //............. SetListces.Add(Data_list[i]);//放入已发送的数据 } catch { //发送失败 cont = cont - 1; } } Thread.Sleep(10); } } static void Main(string[] args) { Thread thGetData = new Thread(GetData);//获取数据 Thread thUpdate = new Thread(Update);//更新数据 Thread thCreate_Th = new Thread(Create_Th);//创建线程发送数据 thGetData.Start(); thUpdate.Start(); thCreate_Th.Start(); } } /// <summary> /// 结构体 /// </summary> class cescasio { /// <summary>File_Name</summary> public string xingming { get; set; } public string ip { get; set; } public string id { get; set; } }}
[解决办法]
昏 你直接把程序给我 我直接给你改 郁闷了 浪费钱
[解决办法]
楼上这位仁兄好热情~~~~~
[解决办法]