首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Beanstalk 源码分析-涉及到的构造体

2012-09-06 
Beanstalk 源码分析----涉及到的结构体Beanstalkd 一个高性能分布式内存队列系统设计思想高性能离不开异步

Beanstalk 源码分析----涉及到的结构体

Beanstalkd 一个高性能分布式内存队列系统

设计思想

高性能离不开异步,异步离不开队列,而其内部都是Producer-Comsumer模式的原理。

 

核心概念

Beanstalkd设计里面的核心概念:

  • job

    一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

    • tube

      一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

      • producer

        Job的生产者,通过put命令来将一个job放到一个tube中。

        • consumer

          Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

          Beanstalkd中一个job的生命周期如图2所示。一个job有READY, RESERVED, DELAYED, BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete, release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete BURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

          下面我把项目中设计到的说有数据结构写出来。

          一个job就是一个工作单位

          struct job {    Jobrec r; // persistent fields; these get written to the wal    /* bookeeping fields; these are in-memory only */    char pad[6];    tube tube;    job prev, next; /* linked list of jobs */    job ht_next; /* Next job in a hash table list */    size_t heap_index; /* where is this job in its current heap */    File *file;    job  fnext;    job  fprev;    void *reserver;    int walresv;    int walused;    char body[]; // written separately to the wal};

           

          涉及到Jobrec,他的左右就是记录每个job的各种属性值

          struct Jobrec {    uint64 id;    uint32 pri;    int64  delay;    int64  ttr;    int32  body_size;    int64  created_at;    int64  deadline_at;    uint32 reserve_ct;    uint32 timeout_ct;    uint32 release_ct;    uint32 bury_ct;    uint32 kick_ct;    byte   state;};

          每个job设计到file处理

          struct File {    File *next;    uint refs;    int  seq;    int  iswopen; // is open for writing    int  fd;    int  free;    int  resv;    char *path;    Wal  *w;    struct job jlist; // jobs written in this file};
          file文件的一个链表
          struct Wal {    int    filesize;    int    use;    char   *dir;    File   *head;    File   *cur;    File   *tail;    int    nfile;    int    next;    int    resv;  // bytes reserved    int    alive; // bytes in use    int64  nmig;  // migrations    int64  nrec;  // records written ever    int    wantsync;    int64  syncrate;    int64  lastsync;    int    nocomp; // disable binlog compaction?};

           

          消息的一个队列  所有的job将放入这里进行处理

          struct tube {    uint refs;    char name[MAX_TUBE_NAME_LEN];    Heap ready;    Heap delay;    struct ms waiting; /* set of conns */    struct stats stat;    uint using_ct;    uint watching_ct;    int64 pause;    int64 deadline_at;    struct job buried;};


          这个结构体,应该是一个连接数,支持9.5million的书连接连接

          struct Heap {    int     cap;    int     len;    void    **data;    Less    less;    Record  rec;};

           

          下面就是涉及到了网络的一些东西 server设置服务器的东西。每次启动就会初始化wall文件链表

          struct Server {    char *port;    char *addr;    char *user;    Wal    wal;    Socket sock;    Heap   conns;};

          下面是socket需要的一些参数 初始的一些参数

          struct Socket {    int    fd;    Handle f;    void   *x;    int    added;};

          要注意的是HANDLE 回调函数

          typedef void(*Handle)(void*, int rw); // rw can also be 'h' for hangup

           

          这个结构体主要是网络处理job

          struct Conn {    Server *srv;    Socket sock;    char   state;    char   type;    Conn   *next;    tube   use;    int64  tickat;      // time at which to do more work    int    tickpos;     // position in srv->conns    job    soonest_job; // memoization of the soonest job    int    rw;          // currently want: 'r', 'w', or 'h'    int    pending_timeout;    char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated    int  cmd_len;    int  cmd_read;    char *reply;    int  reply_len;    int  reply_sent;    char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated    // How many bytes of in_job->body have been read so far. If in_job is NULL    // while in_job_read is nonzero, we are in bit bucket mode and    // in_job_read's meaning is inverted -- then it counts the bytes that    // remain to be thrown away.    int in_job_read;    job in_job; // a job to be read from the client    job out_job;    int out_job_sent;    struct ms  watch;    struct job reserved_jobs; // linked list header};


           

          //

          struct ms {    size_t used, cap, last;    void **items;    ms_event_fn oninsert, onremove;};enum{    Walver = 7};/////job的六种状态。
          enum // Jobrec.state{    Invalid,    Ready,    Reserved,    Buried,    Delayed,    Copy};

          还有涉及到状态设置

          struct stats {    uint urgent_ct;    uint waiting_ct;    uint buried_ct;    uint reserved_ct;    uint pause_ct;    uint64   total_delete_ct;    uint64   total_jobs_ct;};

           

          主要是涉及到了以上的数据结构,来进行数据处理。

          更多代码分析,慢慢将会介绍

          更多文章,欢迎访问:http://blog.csdn.net/wallwind

热点排行