用PHP写的基于Memcache的Queue实现代码 |
本文标签:Memcache,Queue php类代码: 复制代码 代码如下: <?php class MQ{ public static $client; private static $m_real; private static $m_front; private static $m_data = array(); const QUEUE_MAX_NUM = 100000000; const QUEUE_FRONT_KEY = _queue_item_front; const QUEUE_REAL_KEY = _queue_item_real; public static function setupMq($conf) { self::$client = memcache_pconnect($conf); self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY); self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY); if (!isset(self::$m_real) || emptyempty(self::$m_real)) { self::$real= 0; } if (!isset(self::$m_front) || emptyempty(self::$m_front)) { self::$m_front = 0; } return self::$client; } public static function add($queue, $data) { $result = false; if (self::$m_real < self::QUEUE_MAX_NUM) { if (memcache_add(self::$client, $queue.self::$m_real, $data)) { self::mqRealChange(); $result = true; } } return $result; } public static function get($key, $count) { $num = 0; for ($i=self::$m_front;$i<self::$m_front + $count;$i++) { if ($dataTmp = memcache_get(self::$client, $key.$i)) { self::$m_data[] = $dataTmp; memcache_delete(self::$client, $key.$i); $num++; } } if ($num>0) { self::mqFrontChange($num); } return self::$m_data; } private static function mqRealChange() { memcache_add(self::$client, self::QUEUE_REAL_KEY, 0); self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1); } private static function mqFrontChange($num) { memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0); self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num); } public static function mflush($memcache_obj) { memcache_flush($memcache_obj); } public static function Debug() { echo real:.self::$m_real."<br>/r/n"; echo front:.self::$m_front."<br>/r/n"; echo wait for process data:.intval(self::$m_real - self::$m_front); echo "<br>/r/n"; echo <pre>; print_r(self::$m_data); echo <pre>; } } define(FLUSH_MQ,0);//CLEAN ALL DATA define(IS_ADD,0);//SET DATA $mobj = MQ::setupMq(127.0.0.1,11211); if (FLUSH_MQ) { MQ::mflush($mobj); } else { if (IS_ADD) { MQ::add(user_sync, 1test); MQ::add(user_sync, 2test); MQ::add(user_sync, 3test); MQ::add(user_sync, 4test); MQ::add(user_sync, 5test); MQ::add(user_sync, 6test); } else { MQ::get(user_sync, 10); } } MQ::Debug(); ?> 使用方法 复制代码 代码如下: MQ::setupMq(127.0.0.1,11211);//连接 MQ::add($key, $value);//添加数据到队列 MQ::add($key, $value);//添加数据到队列 MQ::add($key, $value);//添加数据到队列 MQ::add($key, $value);//添加数据到队列 MQ::add($key, $value);//添加数据到队列 MQ::add($key, $value);//添加数据到队列 MQ:get($key, 10);//取出一定数量的数据 |