Migrate Java Code from Spring Data MongoDB to MongoDB Reactive Streams
@GetMapping("/mostClickedElements") public Object mostClickedElements( @RequestParam(name = "projectId", required = false) String projectId, @RequestParam(name = "submoduleId", required = false) Integer submoduleId, @RequestParam(value = "pageNum", defaultValue = "1") long pageNum, @RequestParam(value = "pageSize", defaultValue = "10") long pageSize ) { return Optional.ofNullable(Flux.from(dbClient.getCollection(kapokMongoProvider.getMongoBackendCollectionName(projectId)).aggregate(Arrays.asList( KapokLogPresenterApiController.optionalMatchSubmoduleId(submoduleId),
Aggregates.match(
Filters.and(
Filters.eq("category", "ui.click"),
Filters.exists("timestamp"),
Filters.gt("timestamp", (double)(OffsetDateTime.now().minusDays(1).toInstant().toEpochMilli()) / 1000)
)
),
Aggregates.lookup(kapokMongoProvider.getMongoTransactionsCollectionName(projectId), "event_id", "event_id", "transactionObject"),
Aggregates.lookup(kapokMongoProvider.getMongoEventsCollectionName(projectId), "event_id", "event_id", "eventObject"),
Aggregates.project(
Projections.fields(
Projections.include("timestamp", "message"),
Projections.computed("transactionObject", Document.parse("{$arrayElemAt: ['$transactionObject', 0]}")),
Projections.computed("eventObject", Document.parse("{$arrayElemAt: ['$eventObject', 0]}"))
)
),
Aggregates.group(
new Document("timestamp", "$timestamp").append("message", "$message"),
Accumulators.first("txUserId", "$transactionObject.user.id"),
Accumulators.first("evUserId", "$eventObject.user.id")
),
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.computed("timestamp", "$_id.timestamp"),
Projections.computed("elemSelector", "$_id.message"),
Projections.computed("userId", Document.parse("{ $ifNull: ['$txUserId', '\$evUserId'] }"))
)
),
Aggregates.match(Filters.exists("userId")),
Aggregates.group(
new Document("elemSelector", "$elemSelector").append("userId", "$userId"),
Accumulators.sum("opNum", 1)
),
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.computed("opNum", "$opNum"),
Projections.computed("elemSelector", "$_id.elemSelector"),
Projections.computed("userId", "$_id.userId")
)
),
Aggregates.group(
"$elemSelector",
Accumulators.sum("totalOpNum", "$opNum"),
Accumulators.sum("totalUserNum", 1)
),
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.computed("totalOpNum", "$totalOpNum"),
Projections.computed("totalUserNum", "$totalUserNum"),
Projections.computed("elemSelector", "$_id")
)
),
Aggregates.sort(Sorts.descending("totalOpNum")),
Aggregates.facet(
Aggregates.skip((pageNum - 1) * pageSize),
Aggregates.limit(pageSize)
).as("list"),
Aggregates.count().as("count")
)).allowDiskUse(true).first()).blockLast())
.orElse(new Document(){{ put("list", Collections.emptyList()); put("count", Collections.emptyList()); }});
}
原文地址: https://www.cveoy.top/t/topic/mKfm 著作权归作者所有。请勿转载和采集!