7
回答
Golang的channel处理阻塞,能否帮忙看下何因?
利用AWS快速构建适用于生产的无服务器应用程序,免费试用12个月>>>   

@astaxie 你好,想跟你请教个问题:

我尝试使用golang+redis建立一个分发程序,即从redis源队列(List结构)中取一个数据单元,然后轮询分发或轮询复制分发到多个REDIS LIST中,但在执行情况中,发现阻塞,不知何因。具体代码如下。

配置文件如下:

{
	"BindHost": ":9876",
	"Timeout": 5,
	"MaxProcs": 4,
	"Dup": false,
	"RedisProxy": "10.88.30.98:6379",
	"QueueName": "taxi.queue.test.gps",
	"Items": [
	{
		"RedisProxy": "10.88.30.98:6379",
		"QueueName": "taxi.queue.test.output.1",
		"TouchName": "taxi.touch.test.output.1",
		"Valid": true
	},
	{
		"RedisProxy": "10.88.30.98:6379",
		"QueueName": "taxi.queue.test.output.2",
		"TouchName": "taxi.touch.test.output.2",
		"Valid": true
	}
]}
其中BindHost参数未使用。

Items项目列表为目标。

思路是:从源redis LIST中取出一个数据单元,轮询分发或复制分发到Items中的各个项中。


package main

import (
	"encoding/json"
	"github.com/astaxie/goredis"
	"io/ioutil"
	"log"
	"os"
	"runtime"
	"time"
)

type Config struct {
	BindHost   string      `json:"BindHost"`
	Timeout    int         `json:"Timeout"`
	MaxProcs   int         `json:"MaxProcs"`
	Dup        bool        `json:"Dup"`
	RedisProxy string      `json:"RedisProxy"`
	QueueName  string      `json:"QueueName"`
	Items      []WatchItem `json:"Items"`
}

type WatchItem struct {
	RedisProxy string `json:"RedisProxy"`
	RedisConn  goredis.Client
	QueueName  string `json:"QueueName"`
	TouchName  string `json:"TouchName"`
	Valid      bool   `json:"Valid"`
}

// 全局配置项
var cfg Config

func main() {
	// 全局配置处理
	if len(os.Args) < 2 {
		log.Println("Usage: godistr <ConfigFile>\n")
		return
	}

	cfgbuf, err := ioutil.ReadFile(os.Args[1])
	if err != nil {
		log.Println("Read config file failed:", err)
		return
	}

	err = json.Unmarshal(cfgbuf, &cfg)
	if err != nil {
		log.Println("Parse config failed:", err)
		return
	}

	runtime.GOMAXPROCS(cfg.MaxProcs) // 协程并行处理数量

	src_redis := goredis.Client{Addr: cfg.RedisProxy} // 源REDIS连接初始化
	log.Println("src_redis created:", src_redis)
	for _, item := range cfg.Items { // 目标REDIS初始化
		if item.Valid {
			item.RedisConn = goredis.Client{Addr: item.RedisProxy}
			log.Println("dst_redis created:", item.RedisConn)
		}
	}

	// 数据分发处理
	src_chan, err_chan := Reader(src_redis)
	if cfg.Dup {
		log.Println("dup!")
		Dup(src_chan, err_chan)
	} else {
		log.Println("distr!")
		Distr(src_chan, err_chan)
	}

	// 错误信息输出
	for {
		log.Println(<-err_chan)
	}
}

// 读取
func Reader(redisvr goredis.Client) (chan []byte, chan string) {
	channel := make(chan []byte)     // 数据通道
	err_channel := make(chan string) // 错误信息通道

	go func() {
		for {
			reply, err := redisvr.Rpop(cfg.QueueName)
			if err == nil {
				channel <- reply
			} else {
				err_channel <- err.Error()
			}
		}
	}()
	return channel, err_channel
}

// 分发
func Distr(src_chan chan []byte, err_chan chan string) {
	for _, item := range cfg.Items {
		if item.Valid {
			go func() {
				for {
					select {
					case v := <-src_chan:
						item.RedisConn.Lpush(item.QueueName, v)
						item.RedisConn.Incr(item.TouchName)
					case <-time.After(time.Duration(cfg.Timeout) * time.Second):
						err_chan <- "Distr Timeout"
					}
				}
			}()
		}
	}
}

// 复制分发
func Dup(src_chan chan []byte, err_chan chan string) {
	go func() {
		for {
			select {
			case v := <-src_chan:
				for _, item := range cfg.Items {
					if item.Valid {
						item.RedisConn.Lpush(item.QueueName, v)
						item.RedisConn.Incr(item.TouchName)
					} else {
						err_chan <- "Dup item not valid!"
					}
				}
			case <-time.After(time.Duration(cfg.Timeout) * time.Second):
				err_chan <- "Dup Timeout"
			}
		}
	}()
}


Go
举报
老黄
发帖于5年前 7回/1K+阅
共有7个答案 最后回答: 3年前

你这个channel用的相当奇葩啊,channel不是这样用的啊,看着很不习惯啊

1.这一句:

runtime.GOMAXPROCS(cfg.MaxProcs)

一般是这样的runtime.GOMAXPROCS(runtime.CPUNUM())

2.你应该控制一下channel,首先申明一个全局的buffer channel,然后起一个goroutine读取的数据放在这个chan里面,然后起一个goroutine一个进程select监控数据,执行push操作(这个里面可以使用超时控制)。

@astaxie

我把代码进行了调整,channel定义为全局,只是轮询分发,但发现最终只形成一个目标队列。

代码如下:

package main

import (
	"encoding/json"
	"github.com/astaxie/goredis"
	"io/ioutil"
	"log"
	"os"
	"runtime"
	"time"
)

type Config struct {
	BindHost   string      `json:"BindHost"`
	Timeout    int         `json:"Timeout"`
	RedisProxy string      `json:"RedisProxy"`
	QueueName  string      `json:"QueueName"`
	Items      []WatchItem `json:"Items"`
}

type WatchItem struct {
	RedisProxy string `json:"RedisProxy"`
	QueueName  string `json:"QueueName"`
	TouchName  string `json:"TouchName"`
	Valid      bool   `json:"Valid"`
}

type ValidItem struct {
	RedisConn goredis.Client
	QueueName string
	TouchName string
}

// 全局配置项
var cfg Config
var channel chan []byte     // 数据通道
var err_channel chan string // 错误信息通道

func main() {
	// 全局配置处理
	if len(os.Args) < 2 {
		log.Println("Usage: godistr <ConfigFile>\n")
		return
	}

	cfgbuf, err := ioutil.ReadFile(os.Args[1])
	if err != nil {
		log.Println("Read config file failed:", err)
		return
	}

	err = json.Unmarshal(cfgbuf, &cfg)
	if err != nil {
		log.Println("Parse config failed:", err)
		return
	}

	runtime.GOMAXPROCS(runtime.NumCPU())              // 协程并行处理数量
	src_redis := goredis.Client{Addr: cfg.RedisProxy} // 源REDIS连接初始化

	// 
	// 如果直接使用cfg.Items循环访问item子项,速度很慢,不知何因?
	// 生成配置ITEMS中合法项数组
	var dst_items []ValidItem
	for _, item := range cfg.Items {
		if item.Valid {
			dst_items = append(dst_items, ValidItem{goredis.Client{Addr: item.RedisProxy}, item.QueueName, item.TouchName})
		}
	}

	channel = make(chan []byte)     // 数据通道
	err_channel = make(chan string) // 错误信息通道

	log.Println("start!")
	go Reader(src_redis)
	go Distr(dst_items)

	// 错误信息输出
	for {
		log.Println(<-err_channel)
		return
	}
}

// 读取
func Reader(redisvr goredis.Client) {
	for {
		reply, err := redisvr.Rpop(cfg.QueueName)
		if err == nil {
			channel <- reply
		} else {
			err_channel <- err.Error()
		}
	}
}

// 分发
// - channel可以多个goroutine访问吗
func Distr(dst_items []ValidItem) {
	for _, item := range dst_items {
		go func() {
			for {
				select {
				case v := <-channel:
					item.RedisConn.Lpush(item.QueueName, v)
					item.RedisConn.Incr(item.TouchName)
				case <-time.After(time.Duration(cfg.Timeout) * time.Second):
					err_channel <- "Distr Timeout"
				}
			}
		}()
	}
}
配置文件如下:

{
	"Timeout": 5,
	"RedisProxy": "10.88.30.98:6379",
	"QueueName": "taxi.queue.test.gps",
	"Items": [
	{
		"RedisProxy": "10.88.30.98:6379",
		"QueueName": "taxi.queue.test.output.1",
		"TouchName": "taxi.touch.test.output.1",
		"Valid": true
	},
	{
		"RedisProxy": "10.88.30.98:6379",
		"QueueName": "taxi.queue.test.output.2",
		"TouchName": "taxi.touch.test.output.2",
		"Valid": true
	}
]}

你这个逻辑错了啊,应该是在select里面的channel逻辑里面对items进行循环,因为channel的数值你只能读取一次啊

这里的想法是:派生两个goroutine去读取同一个channel,而这两个goroutine对不同的目标REDIS队列进行LPUSH。

同一个channel可以这样操作吗?

发现这个REDIS的GO接口包很厉害,当REDIS关闭再启动时,能自动重新连接到REDIS服务器,GO程序运行没有影响。

一个channel难道真的只可以两个goroutine进行操作?我把代码改了一下,一个Reader产生个channel,然后传递channel给一个Distr进行处理。channel数据与配置文件中的Items的项目相同。

代码如下:

package main

import (
	"encoding/json"
	"fmt"
	"github.com/astaxie/goredis"
	"io/ioutil"
	"log"
	"os"
	"runtime"
	"time"
)

type Config struct {
	Timeout    int         `json:"Timeout"`
	RedisProxy string      `json:"RedisProxy"`
	QueueName  string      `json:"QueueName"`
	Items      []WatchItem `json:"Items"`
}

type WatchItem struct {
	RedisProxy string `json:"RedisProxy"`
	QueueName  string `json:"QueueName"`
	TouchName  string `json:"TouchName"`
	Valid      bool   `json:"Valid"`
}

type ValidItem struct {
	RedisConn goredis.Client
	QueueName string
	TouchName string
}

// 全局配置项
var Cfg Config
var QuitChannel chan string

func main() {
	// 全局配置处理
	if len(os.Args) < 2 {
		log.Println("Usage: godistr <ConfigFile>\n")
		return
	}

	cfgbuf, err := ioutil.ReadFile(os.Args[1])
	if err != nil {
		log.Println("Read config file failed:", err)
		return
	}

	err = json.Unmarshal(cfgbuf, &Cfg)
	if err != nil {
		log.Println("Parse config failed:", err)
		return
	}

	runtime.GOMAXPROCS(runtime.NumCPU())
	QuitChannel = make(chan string)                   // 触发退出通道
	src_redis := goredis.Client{Addr: Cfg.RedisProxy} // 源REDIS连接初始化

	// 
	// 如果直接使用cfg.Items循环访问item子项,速度很慢,不知何因?
	// 生成配置ITEMS中合法项数组
	var dst_items []ValidItem
	for _, item := range Cfg.Items {
		if item.Valid {
			dst_items = append(dst_items, ValidItem{goredis.Client{Addr: item.RedisProxy}, item.QueueName, item.TouchName})
		}
	}

	// 数据分发处理
	// 
	// Redis:List -->Reader goroutine 1 ->channel 1 ->Distr 1 --> Redis:List (按目标队列名合并或拆分)
	//            -->Reader goroutine 2 ->channel 2 ->Distr 2 --> Redis:List 
	// 
	log.Println("start!")

	var channel chan []byte
	grp_channel := make([]chan []byte, 0)
	for _, item := range dst_items {
		channel = Reader(src_redis)
		grp_channel = append(grp_channel, channel)
		Distr(item, channel)
		log.Println("item:", item.QueueName, " created.", " channel:", channel)
	}

	// 退出
	log.Println(<-QuitChannel)
}

// 读取
func Reader(redisvr goredis.Client) chan []byte {
	channel := make(chan []byte, 100) // 数据通道
	go func() {
		defer func() {
			QuitChannel <- fmt.Sprintf("Reader: %s closed.", channel)
		}()
		for {
			reply, err := redisvr.Rpop(Cfg.QueueName)
			if err == nil {
				channel <- reply
			} else {
				log.Println(err.Error())
			}
		}
	}()
	return channel
}

// 分发
func Distr(item ValidItem, src_chan chan []byte) {
	go func() {
		defer func() {
			QuitChannel <- fmt.Sprintf("Distr: %s closed.", src_chan)
		}()
		for {
			select {
			case v := <-src_chan:
				err := item.RedisConn.Lpush(item.QueueName, v)
				if err != nil {
					log.Println(err.Error())
				}
				item.RedisConn.Incr(item.TouchName)
			case <-time.After(time.Duration(Cfg.Timeout) * time.Second):
				log.Println("Distr Timeout")
			}
		}
	}()
}
配置文件如下:这个配置是将一台REDIS中队列中的数据,通过五个channel进行传送,最终合并到另一台REDIS中。
{
        "Timeout": 5,
        "RedisProxy": "10.88.30.98:6379",
        "QueueName": "queue.gps.test",
        "Items": [
        {
                "RedisProxy": "127.0.0.1:6379",
                "QueueName": "queue.output",
                "TouchName": "touch.output",
                "Valid": true
        },
        {
                "RedisProxy": "127.0.0.1:6379",
                "QueueName": "queue.output",
                "TouchName": "touch.output",
                "Valid": true
        },
        {
                "RedisProxy": "127.0.0.1:6379",
                "QueueName": "queue.output",
                "TouchName": "touch.output",
                "Valid": true
        },
        {
                "RedisProxy": "127.0.0.1:6379",
                "QueueName": "queue.output",
                "TouchName": "touch.output",
                "Valid": true
        },
        {
                "RedisProxy": "127.0.0.1:6379",
                "QueueName": "queue.output",
                "TouchName": "touch.output",
                "Valid": true
        }
]}


顶部