RabbitMQ Golang教程(二)
RabbitMQ Golang教程(二)
任务队列
什么是任务队列 ?
把要执行的任务放在队列中。使用较多的任务队列有machiney、Celery、goWorker、YTask。每一个任务队列都有自己的特点,这里就不细讲了。 我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。
工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。
这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(例如:注册时发送邮件或短信验证码等场景)。
首先,需要编写发送端的程序。该程序会将任务安排到我们的工作队列中,因此将其命名为new_task.go
package mainimport ( "fmt" "github.com/streadway/amqp" "log" "os" "strings")func main() { // 1. 尝试连接RabbitMQ,建立连接 // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。 conn,err := amqp.Dial("amqp://admin:admin@xx.xxx.xxx.xxx:xxx/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() // 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。 ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 3. 要发送,我们必须声明要发送到的队列。 q, err := ch.QueueDeclare( "task_queue", // name true, // 持久的 false, // delete when unused false, // 独有的 false, // no-wait nil, // arguments ) if err != nil { fmt.Printf("declare a queue failed, err:%v\n", err) return } // 4. 然后我们可以将消息发布到声明的队列 body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // 立即 false, // 强制 amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久 ContentType: "text/plain", Body: []byte(body), }) if err != nil { fmt.Printf("publish a message failed, err:%v\n", err) return } log.Printf(" [x] Sent %s", body)}// bodyFrom 从命令行获取将要发送的消息内容func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s}
然后,需要编写接收端程序,它需要为消息正文中出现的每个.伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此将其称为worker.go
package mainimport ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time")func main() { conn,err := amqp.Dial("amqp://admin:admin@49.234.192.212:5672/") if err != nil { fmt.Printf("connect to RabbitMQ failed, err:%v\n", err) return } defer conn.Close() ch,err := conn.Channel() if err != nil { fmt.Printf("open a channel failed, err:%v\n", err) return } defer ch.Close() // 声明一个queue q, err := ch.QueueDeclare( "task_queue", // name true, // 声明为持久队列 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { fmt.Printf("ch.Qos() failed, err:%v\n", err) return } // 立即返回一个Delivery的通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // 注意这里传false,关闭自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { fmt.Printf("ch.Consume failed, err:%v\n", err) return } // 开启循环不断地消费消息 forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dotCount := bytes.Count(d.Body, []byte(".")) t := time.Duration(dotCount) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) // 手动传递消息确认 } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever}
执行结果
单个执行
打开两个终端,分别执行new_task.go和worker.go了。
go run worker.go
go run new_task.go
查看是否链接成功
查看队列是否存在
通过命令查看
rabbitmqctl list_queues
循环调度
使用任务队列的优点之一是能够轻松并行化工作。
首先,尝试同时运行两个worker.go脚本。它们都将从队列中获取消息。
需要打开三个控制台。其中两个将运行worker.go脚本。这些控制台将成为我们的两个消费者——C1和C2。

查看是否链接成功
查看队列是否存在
通过命令查看
rabbitmqctl list_queues
赞 (0)