thinkphp 做分布式服务+读写分离+分库分表(分区)
- 引言 thinkphp* 大道至简
- 一、分库分表
- 分表
- php 分库分表hash算法
- 0、分表的方法(thinkphp)
- 1、ThinkPHP6 业务分表之一:UID 发号器
- 2、ThinkPHP6 业务分表之二:用户
- 其他杂项
- 1亿条数据在PHP中实现Mysql数据库分表100张
引言 thinkphp* 大道至简
一、分库分表
分表
分表分库一般分垂直和水平,垂直是指感觉业务来进行库的拆分,比如专门的用户库或者订单库这样子,但垂直还是无法解决单表数据量过大导致的性能会差的问题(这里可能会和上面矛盾,网上多数指的是 2000W 就会影响,但好像没有多少人谈过他们的表结构情况)。水平分指的是某一个表,里面的数据量非常大,我们按照一定的规则来进行一个拆分分流,比如把用户的数据由一直存放在用户表,变为可能这条数据是在用户1号表或者2号表这样子。规则可能是 范围(range)或者哈希(hash),也不知道我喜欢的取模算不算哈希。分了其实会带来一些问题的复杂性,比如分库那如何确保多库事务性的一致性、分布式锁等等很多,然后还有之前我们的 join 查询,现在可能就无法使用呢。
php 分库分表hash算法
app/common.php 公共方法定义
$userid 也可以用下面的uid 发号器定义,通过哈希来就算出表名称
//哈希分表function get_hash_table($table, $userid) {$str = crc32($userid);if ($str < 0) {$hash = "0" . substr(abs($str), 0, 1);} else {$hash = substr($str, 0, 2);}return $table . "_" . $hash;}
控制器调用计算表名
public function index() {echo $table=get_hash_table('message', '18991').'<br>';echo $table=get_hash_table('message', '18993').'<br>';echo $table=get_hash_table('message', '18994').'<br>';
}
0、分表的方法(thinkphp)
public function getPartitionTableName($data=array()) {// 对数据表进行分区if(isset($data[$this->partition['field']])) {$field = $data[$this->partition['field']];switch($this->partition['type']) {case 'id':// 按照id范围分表$step = $this->partition['expr'];$seq = floor($field / $step)+1;break;case 'year':// 按照年份分表if(!is_numeric($field)) {$field = strtotime($field);}$seq = date('Y',$field)-$this->partition['expr']+1;break;case 'mod':// 按照id的模数分表$seq = ($field % $this->partition['num'])+1;break;case 'md5':// 按照md5的序列分表$seq = (ord(substr(md5($field),0,1)) % $this->partition['num'])+1;break;default :if(function_exists($this->partition['type'])) {// 支持指定函数哈希$fun = $this->partition['type'];$seq = (ord(substr($fun($field),0,1)) % $this->partition['num'])+1;}else{// 按照字段的首字母的值分表$seq = (ord($field{0}) % $this->partition['num'])+1;}}return $this->getTableName().'_'.$seq;}else{// 当设置的分表字段不在查询条件或者数据中// 进行联合查询,必须设定 partition['num']$tableName = array();for($i=0;$i<$this->partition['num'];$i++)$tableName[] = 'SELECT * FROM '.$this->getTableName().'_'.($i+1);$tableName = '( '.implode(" UNION ",$tableName).') AS '.$this->name;return $tableName;}
}
1、ThinkPHP6 业务分表之一:UID 发号器
我们现在假设项目是新成立的,暂时没有一个技术债。目前我们要先规划一个用户表,打算划分 16 个表,按照取模的方式来查询。最先可能我们要考虑如何定义 UID 的问题,不过对我们 PHPer 来说不是什么难事,毕竟我们基本都不用 UUID 来做 UID 的,占用的空间会比较多,不利于索引。
但是不用 UUID ,选择了 INT 来做 UID,那我们要如何确保它的连续性和唯一性呢?业内常用的可能是雪花算法,但我选择自己写一个简易的发号器。
发号器需要加东西,然后取东西,我们可以利用 Redis List 来很好的实现我们需要的先进先出功能。通过一个命令或者说脚本,我们定时往列表中填上自增的 ID,然后在注册流程中来取最前面的。
代码层面
app/common.php 公共方法定义获取redis 的值
if(!function_exists('get_redis')) {function get_redis() {return new \Predis\Client('tcp://IP:端口', ['parameters' => ['password' => '密码',],]);}
}
定义一个发号器函数
app/command/GenerateUID.php
<?php
declare (strict_types = 1);namespace app\command;use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class GenerateUID extends Command
{protected $redis;/** 发号器列表 键** @var string*/protected $cache_key = 'generate:uid';/** 锁 键** @var string*/protected $lock_key = 'generate:uid_lock';/** 发号器列表最大容量,默认为 500000** @var int*/protected $max_capacity = 5E5;public function __construct(){$this->redis = get_redis();parent::__construct();}protected function configure(){// 指令配置$this->setName('generate:uid')->setDescription('生成 UID');}protected function execute(Input $input, Output $output){$current_capacity = $this->get_list_length();// 计算发号器需要增加的数量$append_capacity = $this->max_capacity - $current_capacity;$uid_max = $this->get_uid_max();$output->writeln('当前最大UID:' . $uid_max);$output->writeln('当前剩余容量:' . $current_capacity);$output->writeln('最大存储容量:' . $this->max_capacity);$output->writeln('需要追加容量:' . $append_capacity);// 如果不需要增加则结束if($append_capacity === 0) {return;}$data = [];for($i = 1; $i <= $append_capacity; $i++) {$data[] = $uid_max + $i;}// 把需要加的数据进行分块,方便快速追加$data = array_chunk($data, 1000);// 加锁$lock = $this->redis->executeRaw(['SET',$this->lock_key,1,'EX',10 * 60,'NX',]);if($lock !== 'OK') {$output->writeln('获取锁失败');return;}try {foreach ($data as $item) {$this->redis->rpush($this->cache_key, $item);}} catch (\Exception $e) {$output->error($e->getMessage());} finally {// 释放锁$this->redis->del($this->lock_key);}}/** 获取发号器列表的长度** @return int*/protected function get_list_length() :int{return $this->redis->llen($this->cache_key);}/** 获取当前最大的 UID** @return int*/protected function get_uid_max() :int{$value = (int) $this->redis->lindex($this->cache_key, -1);if($value) {return $value;}return 0;}
}
config/console.php 定义一个打印函数类
<?php
return [// 指令定义'commands' => [...'generate:uid' => 'app\command\GenerateUID',],
];
执行命令
$ php think generate:uid
当前最大UID:0
当前剩余容量:0
最大存储容量:500000
需要追加容量:500000
2、ThinkPHP6 业务分表之二:用户
上一篇我们将了 UID 的发号器,那这一篇我们将要去实现用户的注册入库和简单的查询。
首先我们要实现数据表的识别。获取表名称
app/common.php
if(!function_exists('table_name')) {/** 获取表名称** @param string $name 表名* @param int|null $uid UID* @return string*/function table_name(string $name, ?int $uid = null) {$support_table = ['users' => 16, // 表名 => 分表数];if(!isset($support_table[$name])) {return $name;}// 如果没有传递 UID,那则需要调用其他方法来获取 UIDif(is_null($uid)) {$uid = (int) 1;}if((int) $uid === 0) {throw new \Exception('UID 值异常');}// 取 UID 和 分表数的模,值转化为小写十六进制return sprintf('%s_%x', $name, $uid % $support_table[$name]);}
}
执行写入操作
app/controller/Auth.php
<?php
namespace app\controller;use app\BaseController;
use think\facade\Db;
use think\helper\Str;class Auth extends BaseController
{/** 注册接口*/public function register(){$redis = get_redis();// 获取最左的值$uid = $redis->lpop('generate:uid');// 为空说明列表已经没内容了if(is_null($uid)) {return '无法获取 UID';}// 获取表名$table_name = table_name('users', $uid);// 插入数据Db::table($table_name)->insert(['id' => $uid,'nickname' => '随机生成' . Str::random(),]);return '注册成功';}
}
其他杂项
1亿条数据在PHP中实现Mysql数据库分表100张
当数据量猛增的时候,大家都会选择库表散列等等方式去优化数据读写速度。笔者做了一个简单的尝试,1亿条数据,分100张表。具体实现过程如下:
首先创建100张表:
$i=0;
while($i<=99){
echo "$newNumber \r\n";
$sql="CREATE TABLE `code_".$i."` (`full_code` char(10) NOT NULL,`create_time` int(10) unsigned NOT NULL,PRIMARY KEY (`full_code`),
) ENGINE=MyISAM DEFAULT CHARSET=utf8";
mysql_query($sql);
$i++;
下面说一下我的分表规则,full_code作为主键,我们对full_code做hash
函数如下:
$table_name=get_hash_table('code',$full_code);
function get_hash_table($table,$code,$s=100){
$hash = sprintf("%u", crc32($code));
echo $hash;
$hash1 = intval(fmod($hash, $s));return $table."_".$hash1;
}
这样插入数据前通过get_hash_table获取数据存放的表名。
最后我们使用merge存储引擎来实现一张完整的code表
1 CREATE TABLE IF NOT EXISTS
code
(
2full_code
char(10) NOT NULL,
3create_time
int(10) unsigned NOT NULL,
4 INDEX(full_code)
5 ) TYPE=MERGE UNION=(code_0,code_1,code_2…) INSERT_METHOD=LAST ;
这样我们通过select * from code就可以得到所有的full_code数据了。