cómo escuchar N canales? (statement de selección dinámica)

para comenzar un bucle sin fin de ejecutar dos goroutines, puedo usar el siguiente código:

después de recibir el mensaje, comenzará una nueva rutina y continuará para siempre.

c1 := make(chan string) c2 := make(chan string) go DoShit(c1, 5) go DoShit(c2, 2) for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

Ahora me gustaría tener el mismo comportamiento para N rutinas, pero ¿cómo se verá la statement seleccionada en ese caso?

Este es el bit de código con el que he comenzado, pero estoy confundido sobre cómo codificar la sentencia select

 numChans := 2 //I keep the channels in this slice, and want to "loop" over them in the select statemnt var chans = [] chan string{} for i:=0;i<numChans;i++{ tmp := make(chan string); chans = append(chans, tmp); go DoShit(tmp, i + 1) //How shall the select statment be coded for this case? for ; true; { select { case msg1 := <-c1: fmt.Println("received ", msg1) go DoShit(c1, 1) case msg2 := <-c2: fmt.Println("received ", msg2) go DoShit(c2, 9) } } 

Puede hacer esto usando la función Select del paquete reflect:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Seleccionar ejecuta una operación de selección descrita por la lista de casos. Al igual que la instrucción de selección Ir, bloquea hasta que al menos uno de los casos pueda continuar, realiza una elección pseudoaleatoria uniforme y luego ejecuta ese caso. Devuelve el índice del caso elegido y, si ese caso fue una operación de recepción, el valor recibido y un booleano que indica si el valor corresponde a un envío en el canal (a diferencia de un valor cero recibido porque el canal está cerrado).

Usted SelectCase una serie de estructuras SelectCase que identifican el canal para seleccionar, la dirección de la operación y un valor para enviar en el caso de una operación de envío.

Entonces podrías hacer algo como esto:

 cases := make([]reflect.SelectCase, len(chans)) for i, ch := range chans { cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} } chosen, value, ok := reflect.Select(cases) # ok will be true if the channel has not been closed. ch := chans[chosen] msg := value.String() 

Puede experimentar con un ejemplo más desarrollado aquí: http://play.golang.org/p/8zwvSk4kjx

Puede lograr esto al envolver cada canal en una rutina que “reenvía” mensajes a un canal “agregado” compartido. Por ejemplo:

 agg := make(chan string) for _, ch := range chans { go func(c chan string) { for msg := range c { agg <- msg } }(ch) } select { case msg <- agg: fmt.Println("received ", msg) } 

Si necesita saber de qué canal proviene el mensaje, puede envolverlo en una estructura con cualquier información adicional antes de reenviarlo al canal agregado.

En mi prueba (limitada), este método supera con creces el uso del paquete de reflexión:

 $ go test dynamic_select_test.go -test.bench=. ... BenchmarkReflectSelect 1 5265109013 ns/op BenchmarkGoSelect 20 81911344 ns/op ok command-line-arguments 9.463s 

Código de referencia aquí

Ampliar algunos comentarios sobre respuestas anteriores y proporcionar una comparación más clara aquí es un ejemplo de ambos enfoques presentados hasta ahora con la misma entrada, una porción de canales para leer y una función para llamar a cada valor que también necesita saber cuál canal de donde vino el valor.

Hay tres diferencias principales entre los enfoques:

  • Complejidad. Aunque puede ser parcialmente una preferencia del lector, encuentro que el enfoque del canal es más idiomático, directo y legible.

  • Actuación. En mi sistema Xeon amd64, los goroutines + channels realizan la solución de reflexión en aproximadamente dos órdenes de magnitud (en general, la reflexión en Go es a menudo más lenta y solo debe usarse cuando sea absolutamente necesario). Por supuesto, si hay un retraso significativo en la función que procesa los resultados o en la escritura de valores en los canales de entrada, esta diferencia de rendimiento puede volverse fácilmente insignificante.

  • Semántica de locking / almacenamiento en búfer. La importancia de esto depende del caso de uso. En la mayoría de los casos, no importará o el ligero almacenamiento adicional en la solución de fusión de goroutine puede ser útil para el rendimiento. Sin embargo, si es deseable tener la semántica de que solo un escritor está desbloqueado y su valor se maneja completamente antes de desbloquear a cualquier otro escritor, eso solo se puede lograr con la solución de reflection.

Tenga en cuenta que ambos enfoques se pueden simplificar si no se requiere el “id” del canal de envío o si los canales de origen nunca se cerrarán.

Canal de fusión de Goroutine:

 // Process1 calls `fn` for each value received from any of the `chans` // channels. The arguments to `fn` are the index of the channel the // value came from and the string value. Process1 returns once all the // channels are closed. func Process1(chans []<-chan string, fn func(int, string)) { // Setup type item struct { int // index of which channel this came from string // the actual string item } merged := make(chan item) var wg sync.WaitGroup wg.Add(len(chans)) for i, c := range chans { go func(i int, c <-chan string) { // Reads and buffers a single item from `c` before // we even know if we can write to `merged`. // // Go doesn't provide a way to do something like: // merged <- (<-c) // atomically, where we delay the read from `c` // until we can write to `merged`. The read from // `c` will always happen first (blocking as // required) and then we block on `merged` (with // either the above or the below syntax making // no difference). for s := range c { merged <- item{i, s} } // If/when this input channel is closed we just stop // writing to the merged channel and via the WaitGroup // let it be known there is one fewer channel active. wg.Done() }(i, c) } // One extra goroutine to watch for all the merging goroutines to // be finished and then close the merged channel. go func() { wg.Wait() close(merged) }() // "select-like" loop for i := range merged { // Process each value fn(i.int, i.string) } } 

Selección de reflexión:

 // Process2 is identical to Process1 except that it uses the reflect // package to select and read from the input channels which guarantees // there is only one value "in-flight" (ie when `fn` is called only // a single send on a single channel will have succeeded, the rest will // be blocked). It is approximately two orders of magnitude slower than // Process1 (which is still insignificant if their is a significant // delay between incoming values or if `fn` runs for a significant // time). func Process2(chans []<-chan string, fn func(int, string)) { // Setup cases := make([]reflect.SelectCase, len(chans)) // `ids` maps the index within cases to the original `chans` index. ids := make([]int, len(chans)) for i, c := range chans { cases[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), } ids[i] = i } // Select loop for len(cases) > 0 { // A difference here from the merging goroutines is // that `v` is the only value "in-flight" that any of // the workers have sent. All other workers are blocked // trying to send the single value they have calculated // where-as the goroutine version reads/buffers a single // extra value from each worker. i, v, ok := reflect.Select(cases) if !ok { // Channel cases[i] has been closed, remove it // from our slice of cases and update our ids // mapping as well. cases = append(cases[:i], cases[i+1:]...) ids = append(ids[:i], ids[i+1:]...) continue } // Process each value fn(ids[i], v.String()) } } 

[Código completo en el patio de Go .]

¿Por qué este enfoque no funcionaría suponiendo que alguien está enviando eventos?

 func main() { numChans := 2 var chans = []chan string{} for i := 0; i < numChans; i++ { tmp := make(chan string) chans = append(chans, tmp) } for true { for i, c := range chans { select { case x = <-c: fmt.Printf("received %d \n", i) go DoShit(x, i) default: continue } } } }