Mongodb aggregation $ group, restringe la longitud de la matriz

Quiero agrupar todos los documentos según un campo pero restringir la cantidad de documentos agrupados para cada valor.

Cada mensaje tiene una conversation_ID. Necesito obtener 10 o menos mensajes para cada conversation_ID.

Puedo agrupar de acuerdo con el siguiente comando, pero no puedo encontrar la manera de restringir el número de documentos agrupados, aparte de Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}}) los resultados Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

¿Cómo limitar la longitud de la matriz de msgs para cada conversation_ID a 10?

Moderno

Desde MongoDB 3.6 hay un enfoque “novedoso” al usar $lookup para realizar una “autocombinación” de la misma manera que el procesamiento del cursor original que se muestra a continuación.

Dado que en este lanzamiento puede especificar un argumento "pipeline" para $lookup como fuente para el “join”, esto esencialmente significa que puede usar $match y $limit para reunir y “limitar” las entradas para el conjunto:

 db.messages.aggregate([ { "$group": { "_id": "$conversation_ID" } }, { "$lookup": { "from": "messages", "let": { "conversation": "$_id" }, "pipeline": [ { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }}, { "$limit": 10 }, { "$project": { "_id": 1 } } ], "as": "msgs" }} ]) 

Opcionalmente, puede agregar una proyección adicional después de $lookup para hacer que los elementos de la matriz sean simplemente los valores en lugar de los documentos con la tecla _id , pero el resultado básico está allí simplemente haciendo lo anterior.

Todavía existe el extraordinario SERVER-9277 que realmente solicita un “límite para enviar” directamente, pero usar $lookup de esta manera es una alternativa viable en el ínterin.

NOTA : También hay $slice que se introdujo después de escribir la respuesta original y se menciona por “problema JIRA sobresaliente” en el contenido original. Si bien puede obtener el mismo resultado con conjuntos de resultados pequeños, implica todavía “presionar todo” en la matriz y luego limitar la salida final de la matriz a la longitud deseada.

Esa es la principal distinción y por qué generalmente no es práctico $slice para obtener resultados grandes. Pero, por supuesto, se puede usar alternativamente en los casos en que lo es.

Hay más detalles sobre los valores del grupo mongodb por múltiples campos sobre el uso alternativo.


Original

Como se dijo anteriormente, esto no es imposible, pero sin duda es un problema horrible.

En realidad, si su principal preocupación es que las matrices resultantes serán excepcionalmente grandes, lo mejor que puede hacer es enviarlas para cada “conversation_ID” distinto como una consulta individual y luego combinar sus resultados. En la syntax de MongoDB 2.6 que podría necesitar algunos ajustes dependiendo de la implementación de su lenguaje en realidad:

 var results = []; db.messages.aggregate([ { "$group": { "_id": "$conversation_ID" }} ]).forEach(function(doc) { db.messages.aggregate([ { "$match": { "conversation_ID": doc._id } }, { "$limit": 10 }, { "$group": { "_id": "$conversation_ID", "msgs": { "$push": "$_id" } }} ]).forEach(function(res) { results.push( res ); }); }); 

Pero todo depende de si eso es lo que estás tratando de evitar. Entonces a la respuesta real:


El primer problema aquí es que no hay una función para “limitar” la cantidad de elementos que se “presionan” en una matriz. Sin duda, es algo que nos gustaría, pero la funcionalidad no existe actualmente.

El segundo problema es que incluso cuando se insertan todos los elementos en una matriz, no se puede usar $slice ni ningún operador similar en la canalización de agregación. Por lo tanto, no existe una forma actual de obtener solo los resultados “top 10” de una matriz producida con una operación simple.

Pero en realidad puede producir un conjunto de operaciones para “cortar” de manera efectiva los límites de su agrupación. Es bastante complicado, y por ejemplo, aquí reduciré los elementos de la matriz “en rodajas” a “seis” solamente. La razón principal aquí es demostrar el proceso y mostrar cómo hacerlo sin ser destructivo con matrices que no contienen el total al que desea “cortar”.

Dada una muestra de documentos:

 { "_id" : 1, "conversation_ID" : 123 } { "_id" : 2, "conversation_ID" : 123 } { "_id" : 3, "conversation_ID" : 123 } { "_id" : 4, "conversation_ID" : 123 } { "_id" : 5, "conversation_ID" : 123 } { "_id" : 6, "conversation_ID" : 123 } { "_id" : 7, "conversation_ID" : 123 } { "_id" : 8, "conversation_ID" : 123 } { "_id" : 9, "conversation_ID" : 123 } { "_id" : 10, "conversation_ID" : 123 } { "_id" : 11, "conversation_ID" : 123 } { "_id" : 12, "conversation_ID" : 456 } { "_id" : 13, "conversation_ID" : 456 } { "_id" : 14, "conversation_ID" : 456 } { "_id" : 15, "conversation_ID" : 456 } { "_id" : 16, "conversation_ID" : 456 } 

Puedes ver que al agrupar según tus condiciones obtendrás una matriz con diez elementos y otra con “cinco”. Lo que quiere hacer aquí reduce los dos a la parte superior “seis” sin “destruir” la matriz que solo coincidirá con los “cinco” elementos.

Y la siguiente consulta:

 db.messages.aggregate([ { "$group": { "_id": "$conversation_ID", "first": { "$first": "$_id" }, "msgs": { "$push": "$_id" }, }}, { "$unwind": "$msgs" }, { "$project": { "msgs": 1, "first": 1, "seen": { "$eq": [ "$first", "$msgs" ] } }}, { "$sort": { "seen": 1 }}, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } }, "first": { "$first": "$first" }, "second": { "$first": "$msgs" } }}, { "$unwind": "$msgs" }, { "$project": { "msgs": 1, "first": 1, "second": 1, "seen": { "$eq": [ "$second", "$msgs" ] } }}, { "$sort": { "seen": 1 }}, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } }, "first": { "$first": "$first" }, "second": { "$first": "$second" }, "third": { "$first": "$msgs" } }}, { "$unwind": "$msgs" }, { "$project": { "msgs": 1, "first": 1, "second": 1, "third": 1, "seen": { "$eq": [ "$third", "$msgs" ] }, }}, { "$sort": { "seen": 1 }}, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } }, "first": { "$first": "$first" }, "second": { "$first": "$second" }, "third": { "$first": "$third" }, "forth": { "$first": "$msgs" } }}, { "$unwind": "$msgs" }, { "$project": { "msgs": 1, "first": 1, "second": 1, "third": 1, "forth": 1, "seen": { "$eq": [ "$forth", "$msgs" ] } }}, { "$sort": { "seen": 1 }}, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } }, "first": { "$first": "$first" }, "second": { "$first": "$second" }, "third": { "$first": "$third" }, "forth": { "$first": "$forth" }, "fifth": { "$first": "$msgs" } }}, { "$unwind": "$msgs" }, { "$project": { "msgs": 1, "first": 1, "second": 1, "third": 1, "forth": 1, "fifth": 1, "seen": { "$eq": [ "$fifth", "$msgs" ] } }}, { "$sort": { "seen": 1 }}, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } }, "first": { "$first": "$first" }, "second": { "$first": "$second" }, "third": { "$first": "$third" }, "forth": { "$first": "$forth" }, "fifth": { "$first": "$fifth" }, "sixth": { "$first": "$msgs" }, }}, { "$project": { "first": 1, "second": 1, "third": 1, "forth": 1, "fifth": 1, "sixth": 1, "pos": { "$const": [ 1,2,3,4,5,6 ] } }}, { "$unwind": "$pos" }, { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$eq": [ "$pos", 1 ] }, "$first", { "$cond": [ { "$eq": [ "$pos", 2 ] }, "$second", { "$cond": [ { "$eq": [ "$pos", 3 ] }, "$third", { "$cond": [ { "$eq": [ "$pos", 4 ] }, "$forth", { "$cond": [ { "$eq": [ "$pos", 5 ] }, "$fifth", { "$cond": [ { "$eq": [ "$pos", 6 ] }, "$sixth", false ]} ]} ]} ]} ]} ] } } }}, { "$unwind": "$msgs" }, { "$match": { "msgs": { "$ne": false } }}, { "$group": { "_id": "$_id", "msgs": { "$push": "$msgs" } }} ]) 

Obtiene los mejores resultados en la matriz, hasta seis entradas:

 { "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] } { "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] } 

Como puedes ver aquí, mucha diversión.

Después de haber agrupado inicialmente, básicamente quiere “mostrar” el $first valor $first de la stack para los resultados de la matriz. Para simplificar un poco este proceso, lo hacemos en la operación inicial. Entonces el proceso se convierte en:

  • $unwind la matriz
  • Compare con los valores ya vistos con una coincidencia de igualdad $eq
  • $sort los resultados para “flotar” valores false no vistos en la parte superior (esto aún conserva el orden)
  • $group back again y “pop” el $first valor no visto como el siguiente miembro en la stack. También esto usa el operador $cond para reemplazar los valores “vistos” en la stack de la matriz con false para ayudar en la evaluación.

La acción final con $cond está allí para garantizar que las iteraciones futuras no solo agreguen el último valor de la matriz una y otra vez donde el recuento “slice” es mayor que los miembros de la matriz.

Todo ese proceso debe repetirse para tantos elementos como desee “cortar”. Como ya encontramos el “primer” elemento en la agrupación inicial, eso significa n-1 iteraciones para el resultado del corte deseado.

Los pasos finales son en realidad solo una ilustración opcional de convertir todo nuevamente en matrices para el resultado como finalmente se muestra. Así que solo presionar condicionalmente ítems o retroceder false por su posición coincidente y finalmente “filtrar” todos los valores false para que los arreglos finales tengan “seis” y “cinco” miembros respectivamente.

Por lo tanto, no hay un operador estándar para acomodar esto, y no se puede simplemente “limitar” la inserción a 5 o 10 o los elementos de la matriz. Pero si realmente tienes que hacerlo, entonces este es tu mejor enfoque.


Posiblemente podría abordar esto con mapReduce y abandonar el marco de agregación todos juntos. El enfoque que tomaría (dentro de unos límites razonables) sería tener efectivamente un hash-map en memoria en el servidor y acumular matrices para eso, al usar el segmento de JavaScript para “limitar” los resultados:

 db.messages.mapReduce( function () { if ( !stash.hasOwnProperty(this.conversation_ID) ) { stash[this.conversation_ID] = []; } if ( stash[this.conversation_ID.length < maxLen ) { stash[this.conversation_ID].push( this._id ); emit( this.conversation_ID, 1 ); } }, function(key,values) { return 1; // really just want to keep the keys }, { "scope": { "stash": {}, "maxLen": 10 }, "finalize": function(key,value) { return { "msgs": stash[key] }; }, "out": { "inline": 1 } } ) 

Así que, básicamente, construye el objeto "en memoria" que coincide con las "claves" emitidas con una matriz que nunca excede el tamaño máximo que desea obtener de los resultados. Además, esto ni siquiera se molesta en "emitir" el elemento cuando se alcanza la stack máxima.

La parte de reducción en realidad no hace nada más que simplemente reducir a "clave" y un solo valor. Entonces, en caso de que no se llamara a nuestro reductor, como sería cierto si solo existiera un valor para una clave, la función de finalización se encarga de asignar las claves "escondidas" a la salida final.

La efectividad de esto varía en el tamaño de la salida, y la evaluación de JavaScript ciertamente no es rápida, pero posiblemente más rápida que el procesamiento de grandes matrices en una tubería.


Vota los problemas de JIRA para tener realmente un operador de "división" o incluso un "límite" en "$ push" y "$ addToSet", que serían prácticos. Esperando personalmente que al menos se pueda realizar alguna modificación en el operador de $map para exponer el valor del "índice actual" durante el procesamiento. Eso permitiría efectivamente "rebanar" y otras operaciones.

Realmente querrías codificar esto para "generar" todas las iteraciones requeridas. Si la respuesta aquí es suficiente amor y / u otro tiempo pendiente de que tenga tuits, entonces podría agregar algún código para demostrar cómo hacerlo. Ya es una respuesta razonablemente larga.


Código para generar canalización:

 var key = "$conversation_ID"; var val = "$_id"; var maxLen = 10; var stack = []; var pipe = []; var fproj = { "$project": { "pos": { "$const": [] } } }; for ( var x = 1; x <= maxLen; x++ ) { fproj["$project"][""+x] = 1; fproj["$project"]["pos"]["$const"].push( x ); var rec = { "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ] }; if ( stack.length == 0 ) { rec["$cond"].push( false ); } else { lval = stack.pop(); rec["$cond"].push( lval ); } stack.push( rec ); if ( x == 1) { pipe.push({ "$group": { "_id": key, "1": { "$first": val }, "msgs": { "$push": val } }}); } else { pipe.push({ "$unwind": "$msgs" }); var proj = { "$project": { "msgs": 1 } }; proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] }; var grp = { "$group": { "_id": "$_id", "msgs": { "$push": { "$cond": [ { "$not": "$seen" }, "$msgs", false ] } } } }; for ( n=x; n >= 1; n-- ) { if ( n != x ) proj["$project"][""+n] = 1; grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n }; } pipe.push( proj ); pipe.push({ "$sort": { "seen": 1 } }); pipe.push(grp); } } pipe.push(fproj); pipe.push({ "$unwind": "$pos" }); pipe.push({ "$group": { "_id": "$_id", "msgs": { "$push": stack[0] } } }); pipe.push({ "$unwind": "$msgs" }); pipe.push({ "$match": { "msgs": { "$ne": false } }}); pipe.push({ "$group": { "_id": "$_id", "msgs": { "$push": "$msgs" } } }); 

Eso construye el enfoque iterativo básico hasta maxLen con los pasos de $unwind a $group . También se incluyen detalles de las proyecciones finales requeridas y la statement condicional "anidada". El último es básicamente el enfoque adoptado sobre esta pregunta:

¿La cláusula $ in de MongoDB garantiza el orden?

El operador $ slice no es un operador de agregación, por lo que no puede hacer esto (como sugerí en esta respuesta, antes de la edición):

 db.messages.aggregate([ { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}}, { $project : { _id : 1, msgs : { $slice : 10 }}}]); 

La respuesta de Neil es muy detallada, pero puede usar un enfoque ligeramente diferente (si se ajusta a su caso de uso). Puede agregar sus resultados y enviarlos a una nueva colección:

 db.messages.aggregate([ { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}}, { $out : "msgs_agg" } ]); 

El operador $ out escribirá los resultados de la agregación en una nueva colección. A continuación, puede usar una consulta de búsqueda regular para proyectar sus resultados con el operador $ slice:

 db.msgs_agg.find({}, { msgs : { $slice : 10 }}); 

Para este documento de prueba:

 > db.messages.find().pretty(); { "_id" : 1, "conversation_ID" : 123 } { "_id" : 2, "conversation_ID" : 123 } { "_id" : 3, "conversation_ID" : 123 } { "_id" : 4, "conversation_ID" : 123 } { "_id" : 5, "conversation_ID" : 123 } { "_id" : 7, "conversation_ID" : 1234 } { "_id" : 8, "conversation_ID" : 1234 } { "_id" : 9, "conversation_ID" : 1234 } 

El resultado será:

 > db.msgs_agg.find({}, { msgs : { $slice : 10 }}); { "_id" : 1234, "msgs" : [ { "msgid" : 7 }, { "msgid" : 8 }, { "msgid" : 9 } ] } { "_id" : 123, "msgs" : [ { "msgid" : 1 }, { "msgid" : 2 }, { "msgid" : 3 }, { "msgid" : 4 }, { "msgid" : 5 } ] } 

Editar

Supongo que esto significaría duplicar toda la colección de mensajes. ¿No es eso exagerado?

Bueno, obviamente este enfoque no se escalará con grandes colecciones. Sin embargo, dado que está considerando utilizar grandes tuberías de agregación o grandes trabajos de reducción de mapas, probablemente no use esto para solicitudes en “tiempo real”.

Hay muchos inconvenientes de este enfoque: 16 MB de límite BSON si está creando enormes documentos con agregación, desperdiciando espacio en disco / memoria con duplicación, aumento de disco I / O …

Las ventajas de este enfoque: es simple de implementar y, por lo tanto, es fácil de cambiar. Si su colección rara vez se actualiza, puede usar esta colección de “salida” como un caché. De esta forma, no tendría que realizar la operación de agregación varias veces e incluso podría soportar solicitudes de clientes “en tiempo real” en la colección “out”. Para actualizar sus datos, puede hacer agregación periódicamente (por ejemplo, en un trabajo de fondo que se ejecuta todas las noches).

Como se dijo en los comentarios, este no es un problema fácil y no hay una solución perfecta para esto (todavía!). Le mostré otro enfoque que puede usar, depende de usted comparar y decidir qué es más apropiado para su caso de uso.