本文实例讲述了php实现的memcache环形队列类。分享给大家供大家参考。具体如下:
这里介绍了php实现的memcache环形队列类。没咋学过数据结构,因为业务需要,所以只是硬着头皮模拟的! 参考php memcache 队列代码。为使队列随时可入可出,且不受int长度越界危险(单链采取head自增的话不作处理有越界可能),所以索性改写成环形队列。可能还有bug,忘见谅!
<?php
/**
* php memcache 环形队列类
* 原作者 lkk/lianq.net
* 修改 foxhunter
* 因业务需要只保留的队列中的pop和push,修改过期时间为0即永久
*/
class mqueue
{
public static $client;
private $expire; //过期时间,秒,1~2592000,即30天内
private $sleeptime; //等待解锁时间,微秒
private $queuename; //队列名称,唯一值
private $retrynum; //尝试次数
private $maxnum; //最大队列容量
private $canrewrite; //是否可以覆写开关,满出来的内容从头部开始覆盖重写原来的数据
private $head; //下一步要进入的指针位置
private $tail; //下一步要进入的指针位置
private $len; //队列现有长度
const lock_key = '_fox_mq_lock_'; //锁存储标示
const length_key = '_fox_mq_length_'; //队列现长度存储标示
const valu_key = '_fox_mq_val_'; //队列键值存储标示
const head_key = '_fox_mq_head_'; //队列head指针位置标示
const tail_key = '_fox_mq_tail_'; //队列tail指针位置标示
/*
* 构造函数
* 对于同一个$queuename,实例化时必须保障构造函数的参数值一致,否则pop和push会导队列顺序混乱
*/
public function __construct($queuename = '', $maxqueue = 1, $canrewrite = false, $expire = 0, $config = '')
{
if (empty($config)) {
self::$client = memcache_pconnect('127.0.0.1', 11211);
} elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')
self::$client = memcache_pconnect($config['host'], $config['port']);
} elseif (is_string($config)) { //"127.0.0.1:11211"
$tmp = explode(':', $config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'], $conf['port']);
}
if (!self::$client)
return false;
ignore_user_abort(true); //当客户断开连接,允许继续执行
set_time_limit(0); //取消脚本执行延时上限
$this->access = false;
$this->sleeptime = 1000;
$expire = (empty($expire)) ? 0 : (int) $expire + 1;
$this->expire = $expire;
$this->queuename = $queuename;
$this->retrynum = 20000;
$this->maxnum = $maxqueue != null ? $maxqueue : 1;
$this->canrewrite = $canrewrite;
$this->getheadandtail();
if (!isset($this->head) || empty($this->head))
$this->head = 0;
if (!isset($this->tail) || empty($this->tail))
$this->tail = 0;
if (!isset($this->len) || empty($this->len))
$this->len = 0;
}
//获取队列首尾指针信息和长度
private function getheadandtail()
{
$this->head = (int) memcache_get(self::$client, $this->queuename . self::head_key);
$this->tail = (int) memcache_get(self::$client, $this->queuename . self::tail_key);
$this->len = (int) memcache_get(self::$client, $this->queuename . self::length_key);
}
// 利用memcache_add原子性加锁
private function lock()
{
if ($this->access === false) {
$i = 0;
while (!memcache_add(self::$client, $this->queuename . self::lock_key, 1, false, $this->expire)) {
usleep($this->sleeptime);
@$i++;
if ($i > $this->retrynum) { //尝试等待n次
return false;
break;
}
}
return $this->access = true;
}
return false;
}
//更新头部指针指向,指向下一个位置
private function incrhead()
{
//$this->getheadandtail(); //获取最新指针信息 ,由于本方法体均在锁内调用,其锁内已调用了此方法,本行注释
$this->head++; //头部指针下移
if ($this->head >= $this->maxnum) {
$this->head = 0; //边界值修正
}
;
$this->len--; //head的移动由pop触发,所以相当于数量减少
if ($this->len < 0) {
$this->len = 0; //边界值修正
}
;
memcache_set(self::$client, $this->queuename . self::head_key, $this->head, false, $this->expire); //更新
memcache_set(self::$client, $this->queuename . self::length_key, $this->len, false, $this->expire); //更新
}
//更新尾部指针指向,指向下一个位置
private function incrtail()
{
//$this->getheadandtail(); //获取最新指针信息,由于本方法体均在锁内调用,其锁内已调用了此方法,本行注释
$this->tail++; //尾部指针下移
if ($this->tail >= $this->maxnum) {
$this->tail = 0; //边界值修正
}
;
$this->len++; //head的移动由push触发,所以相当于数量增加
if ($this->len >= $this->maxnum) {
$this->len = $this->maxnum; //边界值长度修正
}
;
memcache_set(self::$client, $this->queuename . self::tail_key, $this->tail, false, $this->expire); //更新
memcache_set(self::$client, $this->queuename . self::length_key, $this->len, false, $this->expire); //更新
}
// 解锁
private function unlock()
{
memcache_delete(self::$client, $this->queuename . self::lock_key);
$this->access = false;
}
//判断是否满队列
public function isfull()
{
//外部直接调用的时候由于没有锁所以此处的值是个大概值,并不很准确,但是内部调用由于在前面有lock,所以可信
if ($this->canrewrite)
return false;
return $this->len == $this->maxnum ? true : false;
}
//判断是否为空
public function isempty()
{
//外部直接调用的时候由于没有锁所以此处的值是个大概值,并不很准确,但是内部调用由于在前面有lock,所以可信
return $this->len == 0 ? true : false;
}
public function getlen()
{
//外部直接调用的时候由于没有锁所以此处的值是个大概值,并不很准确,但是内部调用由于在前面有lock,所以可信
return $this->len;
}
/*
* push值
* @param mixed 值
* @return bool
*/
public function push($data = '')
{
$result = false;
if (empty($data))
return $result;
if (!$this->lock()) {
return $result;
}
$this->getheadandtail(); //获取最新指针信息
if ($this->isfull()) { //只有在非覆写下才有full概念
$this->unlock();
return false;
}
if (memcache_set(self::$client, $this->queuename . self::valu_key . $this->tail, $data, memcache_compressed, $this->expire)) {
//当推送后,发现尾部和头部重合(此时指针还未移动),且右边仍有未由head读取的数据,那么移动head指针,避免尾部指针跨越head
if ($this->tail == $this->head && $this->len >= 1) {
$this->incrhead();
}
$this->incrtail(); //移动尾部指针
$result = true;
}
$this->unlock();
return $result;
}
/*
* pop一个值
* @param [length] int 队列长度
* @return array
*/
public function pop($length = 0)
{
if (!is_numeric($length))
return false;
if (!$this->lock())
return false;
$this->getheadandtail();
if (empty($length))
$length = $this->len; //默认读取所有
if ($this->isempty()) {
$this->unlock();
return false;
}
//获取长度超出队列长度后进行修正
if ($length > $this->len)
$length = $this->len;
$data = $this->popkeyarray($length);
$this->unlock();
return $data;
}
/*
* pop某段长度的值
* @param [length] int 队列长度
* @return array
*/
private function popkeyarray($length)
{
$result = array();
if (empty($length))
return $result;
for ($k = 0; $k < $length; $k++) {
$result[] = @memcache_get(self::$client, $this->queuename . self::valu_key . $this->head);
@memcache_delete(self::$client, $this->queuename . self::valu_key . $this->head, 0);
//当提取值后,发现头部和尾部重合(此时指针还未移动),且右边没有数据,即队列中最后一个数据被完全掏空,此时指针停留在本地不移动,队列长度变为0
if ($this->tail == $this->head && $this->len <= 1) {
$this->len = 0;
memcache_set(self::$client, $this->queuename . self::length_key, $this->len, false, $this->expire); //更新
break;
} else {
$this->incrhead(); //首尾未重合,或者重合但是仍有未读取出的数据,均移动head指针到下一处待读取位置
}
}
return $result;
}
/*
* 重置队列
* * @return null
*/
private function reset($all = false)
{
if ($all) {
memcache_delete(self::$client, $this->queuename . self::head_key, 0);
memcache_delete(self::$client, $this->queuename . self::tail_key, 0);
memcache_delete(self::$client, $this->queuename . self::length_key, 0);
} else {
$this->head = $this->tail = $this->len = 0;
memcache_set(self::$client, $this->queuename . self::head_key, 0, false, $this->expire);
memcache_set(self::$client, $this->queuename . self::tail_key, 0, false, $this->expire);
memcache_set(self::$client, $this->queuename . self::length_key, 0, false, $this->expire);
}
}
/*
* 清除所有memcache缓存数据
* @return null
*/
public function memflush()
{
memcache_flush(self::$client);
}
public function clear($all = false)
{
if (!$this->lock())
return false;
$this->getheadandtail();
$head = $this->head;
$length = $this->len;
$curr = 0;
for ($i = 0; $i < $length; $i++) {
$curr = $this->$head + $i;
if ($curr >= $this->maxnum) {
$this->head = $curr = 0;
}
@memcache_delete(self::$client, $this->queuename . self::valu_key . $curr, 0);
}
$this->unlock();
$this->reset($all);
return true;
}
}
希望本文所述对大家的php程序设计有所帮助。
评论列表:
发布于 3天前回复该评论
发布于 3天前回复该评论
发布于 3天前回复该评论
发布于 3天前回复该评论
发布于 2天前回复该评论
发布于 2天前回复该评论
发布于 2天前回复该评论
发布于 2天前回复该评论