Gearman分布式任务系统

简介

Gearman是一个轻量级分布式任务系统,它提供了一套程序框架将任务分发到不同的机器或进程,以便更好的处理任务。它允许并行工作、负载均衡处理、多语言间调用。能应用于多种场景,从高可用网站、图片缩放到数据库复制。

Gearman有以下几个优点:

  1. 开源, BSD协议
  2. 多语言支持,提供了多种编程语言API
  3. 灵活,可以快速整合
  4. 快速高效,Server采用C/C++编写,协议简单
  5. 无单点故障
  6. 容易水平扩展
  7. 无消息容量限制

工作原理

Gearman驱动的应用程序由三部分组成:Client,Worker,Job Server

  • Client:创建Job并发送任务到Job Server(生产者
  • Job Server:发现合适的Worker并把Job转发过去(协调者
  • Worker:处理Job Server转发过来的Job, 完成后通过Job Server发送响应到Client(消费者

Gearman提供了Client和Worker API,应用程序通过调用API与Gearman Job Server通讯。你不需要自己处理网络问题或者Job的映射。

在Gearman内部,Client和Worker API通过TCP socket与Job Server通讯。

gearman-stack.png

跨语言调用

Gearman提供了多种编程语言API,支持跨语言调用,假设需要通过PHP调用C语言程序, 可以使用PHP调用Client API发送数据,C语言程序调用Worker API处理数据并返回结果。

分布式

当某一个Job Server宕机后,连接到它的Client和Worker会自动迁移到其他存活的Job Server,你可能不希望运行太多的Job Server,但使用两到三台Job Server进行冗余是不错的选择。

持久化

Gearman Job Server内部的Job 队列是存储在内存里的。这意味着当服务器重启或崩溃的时,处于等待状态的Job会丢失,不会被Worker运行。持久化队列允许后台任务存储到外部的持久化队列中,以便服务器重启或崩溃后Job队列依然可用。持久化队列只对后台任务有效。因为前台任务是与Client绑定的,当Job Server离线后,Client能够检测到,并重新开始这个Job。任务加入到内部Job队列前就会被添加到持久化队列,当任务完成时,任务会从持久化队列中移除。目前Gearman支持Drizzle,MySQL,Postgresql,SQLite3,Memcached等持久化介质。

安装

Gearman安装

wget https://github.com/gearman/gearmand/releases/download/1.1.16/gearmand-1.1.16.tar.gztar -xvzf gearmand-1.1.16.tar.gzcd gearmand-1.1.16./configuremake && make install

Gearman PHP 扩展安装

wget https://pecl.php.net/get/gearman-1.1.2.tgztar -xvzf gearman-1.1.2.tgzcd gearman-1.1.2phpize./configuremake && make install

启动

/usr/local/sbin/gearmand -d

应用

简单任务(反转字符串)

gearman-data-flow.png

Client

<?php
 
$client= new GearmanClient();
//添加Server
$client->addServer('127.0.0.1', 4730);
$data = "Hello World!";
//发送任务
$result = $client->doNormal("reverse", serialize($data));
echo $result;

Worker


<?php
  
$worker = new GearmanWorker();
//添加Server
$worker->addServer('127.0.0.1', 4730);
//注册Worker提供的reverse功能
$worker->addFunction('reverse', function($job){
    //获取Job数据
    $workload = $job->workload();
    $data = unserialize($workload);
    return strrev($data);
});
 
//阻塞,监听Job
while($worker->work()) {
    if ($worker->returnCode() != GEARMAN_SUCCESS) {
        //error
    }
}

并行任务(多次查询)

Client

<?php
 
$client = new GearmanClient();
//添加Server
$client->addServer('127.0.0.1', 4730);
 
//初始化用户信息和最近文章
$userInfo = $post = null;
 
//设置任务完成后的回调 $context 用于标识是哪个任务的回调
$client->setCompleteCallback(function(GearmanTask $task, $context) use (&$userInfo, &$friends, &$posts) {
    switch($context) {
        case 'getUserInfo':
            $userInfo = $task->data();
            break;
        case 'getLatestPost':
            $post = $task->data();
            break;
    }
});
 
//添加并行任务
$client->addTask('getUserInfo', 'joe@joe.com', 'getUserInfo');
$client->addTask('getLatestPost', 'joe@joe.com', 'getLatestPost');
 
//执行并行任务
$client->runTasks();
 
var_dump($userInfo, $post);

Worker

<?php
 
$worker = new GearmanWorker();
//添加Server
$client->addServer('127.0.0.1', 4730);
 
//注册Worker提供的getUserInfo功能
$worker->addFunction('getUserInfo', function(GearmanJob $job){
    //获取用户信息,假设要3s
    sleep(3);
    $username = $job->workload();
    return ['username' => $username, 'age' => 25];
});
  
//注册Worker提供的getLatestPost功能
$worker->addFunction('getLatestPost', function(GearmanJob $job){
    //获取用户信息,假设要3s
    sleep(3);
    $username = $job->workload();
    return ['title' => 'demo post', 'author' => $username];
});
 
while ($worker->work());

注意事项

  • 集群部署时,需要开放相应端口。
  • 使用MySQL持久化时,可能因为wait_timeout过小导致MySQL连接丢失,可以将参数调大,或者换用其他持久化介质。
%1 $ S

发表回复