首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 网站开发 > Web Service >

WCF中,怎么实现这样一种排队机制呢

2012-12-19 
WCF中,如何实现这样一种排队机制呢?本帖最后由 Live_eviL 于 2012-06-27 11:10:38 编辑问题描述:类似于银

WCF中,如何实现这样一种排队机制呢?
本帖最后由 Live_eviL 于 2012-06-27 11:10:38 编辑 问题描述:
类似于银行的取款的服务,
银行提供服务,但在其内部,按照业务或是用户类型的不同,银行会提供不同的窗口,每个窗口的处理能力不一样,如:
A窗口:处理一般用户的小额的存取业务,能同时处理10人;
B窗口:处理大宗的存取业务,能同时处理4人;
C窗口:处理VIP客户的存取业务,能同时处理5人;
……
具体的窗口个数不定,有可能会根据情况新开或减少窗口。
窗口能处理能力也可能有所变化。
同时,有这样一种要求,VIP的客户如果做小额的存取,如果C窗口已满,可以选择到A窗口操作;
而一些特定的操作,只能在特定的窗口完成,如大宗的存取,只能在B窗口完成,或者VIP用户的某些特定操作,也只能在C窗口完成。

现在最头痛的问题是:

银行没有提供排队的操作,如果用户一拥而入,那么后来的用户,只要是超出了处理能力,一律踢出去,从而得不到服务。
所以,需要要提供一个服务,来实现排队的操作。

再对应到具体业务,说得更仔细一点:
银行服务:指的是服务提供商,通过类似于
http://192.168.1.254:8080/?cmd=test&user=abc&pwd=123456的形式,获取信息。其内部会通过user参数,确定用户类型,而通过cmd参数,确定其操作,从而选择不同的线路进行服务;
窗口:指的是内部的不同线路;
业务规则:选择不同线路的规则,前面的比喻应该是比较清楚了;
处理能力:指的是并发数,并发数之所以受限,是因为服务商对每种用户类型或是操作提供的连接数不一样;
问题:指的是服务商没有提供排队功能,超出连接数时,一律返回错误;
排队服务:需要自己用wcf实现的排队服务;

嗯,不知道有没有把问题说清楚。


个人的一些想法,
1、解决方案选择:

考虑到安全性和其他方面的一些因素,所以选用了wcf提供一个中间服务,每个客户端请求进来之后,根据其用户类型和操作,给他一个对应的访问用户名和密码,向服务商发送请求,得到信息后,返回客户端。


2、排队机制:
总的来说有两种方法:
一种是一旦用户请求,立刻把他排在相应的队伍后面,但这样有和上面规则不符合的地方,如VIP客户的小额存款,由于他既可以排在A窗口,也可以排在C窗口,在他跨进银行的这一刻,其实是无法断定他到底排在哪个窗口的,当然,考虑到处理还是比较快,可以让他多等一会;
更好的另一种选择是,类似于银行的叫号机,每处理完一个业务后,再叫下一个,就是在有空位的时候,再去决定请求由哪个线路处理;

3、现在个人能想到的解决方法:

利用wcf的限流(Thtottle),它可以自动实现排队机制,每个线路开启一个服务,但这样的缺点是显而易见的。
     a、无法动态增加线路,一旦线路有增减,需要手工操作,甚至牵涉到客户端的改动;
     b、排队方式还是第一种;
     当然,对于客户端变动这一块,似乎可以用wcf中路由的功能得到解决,但手工添加服务,似乎有些无法忍受。

4、如果我上面的分析没有问题的话,那么我的问题是,如何在wcf中实现自己的限流功能呢?
因为wcf虽然提供了限流,但已近固定死了,无法做任何手脚。也就需要自己来解决了。
按照wcf限流的理论来讲,过程是这样:
     a、宿主接受请求;
     b、消息被传到信道栈顶部;
     c、消息传送到ChannelDispatcher对象;
     d、传送至EndpointDispatcher对象;
     在传送请求至EndpointDispatcher对象之前,ChannelDispatcher对象可以检查服务当前的负载。如果该请求会导致服务超出许可的负载那么选择延迟该请求。在这种情况下,请求将被阻塞并且保存在内部的队列中直到服务的负载得以释放。
    
     那么,我该在哪个环节动手?(按照结构图,限制行为是包含在服务运行时中)
     如何检查负载?(用性能计数器?)
     如何延迟请求并阻塞?
     当负载释放时,又该如何操作呢?

     以上,接触wcf时间不长,一些特性也不熟悉,包括对业务的理解方面,大家多多讨论和提出宝贵意见。各位达人们,不吝赐教。
[最优解释]
 this.calls.Acquire(channel);
    }
 
    bool PrivateAcquireSessionListenerHandler(ListenerHandler listener)
    {
        if ((this.sessions != null) && (listener.Channel != null) && (listener.Channel.Throttle == null))
        {
            listener.Channel.Throttle = this;
            return this.sessions.Acquire(listener);
        }
        else
        {
            return true;
        }
    }
 
    bool PrivateAcquireSession(ISessionThrottleNotification source)
    {
        return (this.sessions == null 
------其他解决方案--------------------


关于限流的一些内容,可以查看这个页面:http://www.cnblogs.com/artech/archive/2010/04/22/1718188.html
的第7篇。
另外,C#中FlowThrottle的实现参考代码:



namespace System.ServiceModel.Dispatcher
{
    using System;
    using System.Diagnostics;
    using System.ServiceModel.Channels;
    using System.ServiceModel.Diagnostics;
    using System.Collections.Generic;
    using System.Threading;
 
    sealed class FlowThrottle
    {
        int capacity;
        int count;
        object mutex;
        WaitCallback release;
        Queue<object> waiters;
        String propertyName;
        String configName;
  
        internal FlowThrottle(WaitCallback release, int capacity, String propertyName, String configName)
        {
            if (capacity <= 0)
                throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxThrottleLimitMustBeGreaterThanZero0)));
  
            this.count = 0;
            this.capacity = capacity;
            this.mutex = new object();
            this.release = release;
            this.waiters = new Queue<object>();
            this.propertyName = propertyName;
            this.configName = configName;
        }
  
        internal int Capacity
        {
            get { return this.capacity; }
            set
            {
                if (value <= 0)
                    throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.SFxThrottleLimitMustBeGreaterThanZero0)));
                this.capacity = value;
            }
        }
 
        internal bool Acquire(object o)
        {


            lock (this.mutex)
            {
                if (this.count < this.capacity)
                {
                    this.count++;
                    return true;
                }
                else
                {
                    if (this.waiters.Count == 0)
                    {
                        if (DiagnosticUtility.ShouldTraceInformation)
                        {
                            DiagnosticUtility.DiagnosticTrace.TraceEvent(
                                TraceEventType.Information,
                                TraceCode.ServiceThrottleLimitReached,
                                SR.GetString(SR.TraceCodeServiceThrottleLimitReached,
                                             this.propertyName, this.capacity, this.configName));
                        }
                    }
  
                    this.waiters.Enqueue(o);
                    return false;
                }
            }
        }
  
        internal void Release()
        {
            object next = null;
 
            lock (this.mutex)


            {
                if (this.waiters.Count > 0)
                {
                    next = this.waiters.Dequeue();
                    if (this.waiters.Count == 0)
                        this.waiters.TrimExcess();
                }
                else
                {
                    this.count--;
                }
            }
  
            if (next != null)
                this.release(next);
        }
    }




[其他解释]
以及,ServiceThrottle的参考实现:


namespace System.ServiceModel.Dispatcher
{
using System;
using System.ServiceModel;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Runtime.Serialization;
 
interface ISessionThrottleNotification
{
    void ThrottleAcquired();
}
 
public sealed class ServiceThrottle
{
    internal const int DefaultMaxConcurrentCalls = 16;
    internal const int DefaultMaxConcurrentSessions = 10;
  
    FlowThrottle calls;
    FlowThrottle sessions;
    QuotaThrottle dynamic;
    FlowThrottle instanceContexts;
  
    ServiceHostBase host;
    bool isActive;
    object thisLock = new object();
  
    internal ServiceThrottle(ServiceHostBase host)
    {
        if (!((host != null)))
        {
            DiagnosticUtility.DebugAssert("ServiceThrottle.ServiceThrottle: (host != null)");
            throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("host");
        }
        this.host = host;
        this.MaxConcurrentCalls = ServiceThrottle.DefaultMaxConcurrentCalls;


        this.MaxConcurrentSessions = ServiceThrottle.DefaultMaxConcurrentSessions;
  
        this.isActive = true;
    }
  
    FlowThrottle Calls
    {
        get
        {
            lock (this.ThisLock)
            {
                if (this.calls == null)
                {
                    this.calls = new FlowThrottle(this.GotCall, ServiceThrottle.DefaultMaxConcurrentCalls,
                                                    ServiceThrottle.MaxConcurrentCallsPropertyName, ServiceThrottle.MaxConcurrentCallsConfigName);
                }
                return this.calls;
            }
        }
    }
  
    FlowThrottle Sessions
    {
        get
        {
            lock (this.ThisLock)
            {
                if (this.sessions == null)
                {
                    this.sessions = new FlowThrottle(this.GotSession, ServiceThrottle.DefaultMaxConcurrentSessions,
                                                        ServiceThrottle.MaxConcurrentSessionsPropertyName, ServiceThrottle.MaxConcurrentSessionsConfigName);
                }
                return this.sessions;
            }
        }
    }
 
    QuotaThrottle Dynamic
    {
        get
        {
            lock (this.ThisLock)
            {


                if (this.dynamic == null)
                {
                    this.dynamic = new QuotaThrottle(this.GotDynamic, new object());
                    this.dynamic.Owner = "ServiceHost";
                }
                this.UpdateIsActive();
                return this.dynamic;
            }
        }
    }
  
    internal int ManualFlowControlLimit
    {
        get { return this.Dynamic.Limit; }
        set { this.Dynamic.SetLimit(value); }
    }
 
    const string MaxConcurrentCallsPropertyName = "MaxConcurrentCalls";
    const string MaxConcurrentCallsConfigName = "maxConcurrentCalls";
    public int MaxConcurrentCalls
    {
        get { return this.Calls.Capacity; }
        set
        {
            this.ThrowIfClosedOrOpened(MaxConcurrentCallsPropertyName);
            this.Calls.Capacity = value;
            this.UpdateIsActive();
        }
    }
 
    const string MaxConcurrentSessionsPropertyName = "MaxConcurrentSessions";
    const string MaxConcurrentSessionsConfigName = "maxConcurrentSessions";
    public int MaxConcurrentSessions
    {
        get { return this.Sessions.Capacity; }
        set
        {
            this.ThrowIfClosedOrOpened(MaxConcurrentSessionsPropertyName);
            this.Sessions.Capacity = value;
            this.UpdateIsActive();
        }
    }
  
    const string MaxConcurrentInstancesPropertyName = "MaxConcurrentInstances";
    const string MaxConcurrentInstancesConfigName = "maxConcurrentInstances";
    public int MaxConcurrentInstances
    {
        get { return this.InstanceContexts.Capacity; }
        set


        {
            this.ThrowIfClosedOrOpened(MaxConcurrentInstancesPropertyName);
            this.InstanceContexts.Capacity = value;
            this.UpdateIsActive();
        }
    }
  
    FlowThrottle InstanceContexts
    {
        get
        {
            lock (this.ThisLock)
            {
                if (this.instanceContexts == null)
                {
                    this.instanceContexts = new FlowThrottle(this.GotInstanceContext, Int32.MaxValue,
                                                                ServiceThrottle.MaxConcurrentInstancesPropertyName, ServiceThrottle.MaxConcurrentInstancesConfigName);
                }
                return this.instanceContexts;
            }
        }
    }
 
    internal bool IsActive
    {
        get { return this.isActive; }
    }
  
    internal object ThisLock
    {
        get { return this.thisLock; }
    }
 
    bool PrivateAcquireCall(ChannelHandler channel)
    {
        return (this.calls == null) 

热点排行