java分组同时执行程序_java——如何用我的模型在Flink内部进行分组

java分组同时执行程序_java——如何用我的模型在Flink内部进行分组

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.CrossFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.GroupReduceFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.util.Collector;

import java.io.Serializable;

import java.util.ArrayList;

public class UserRecommendation {

public static void main(String[] args) throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// le o arquivo cm o dataset

DataSet<String> text = env.readTextFile("/Users/paulo/Downloads/dataset.csv");

// cria tuple com: customer | item | count

DataSet<Tuple3<Long, Long, Integer>> csv = text.flatMap(new LineFieldSplitter()).groupBy(0, 1).reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Integer>>() {

@Override

public void reduce(Iterable<Tuple2<Long, Long>> iterable, Collector<Tuple3<Long, Long, Integer>> collector) throws Exception {

Long customerId = 0L;

Long itemId = 0L;

Integer count = 0;

for (Tuple2<Long, Long> item : iterable) {

customerId = item.f0;

itemId = item.f1;

count = count + 1;

}

collector.collect(new Tuple3<>(customerId, itemId, count));

}

});

// agrupa os items do customer dentro do customer

final DataSet<CustomerItems> customerItems = csv.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple3<Long, Long, Integer>, CustomerItems>() {

@Override

public void reduce(Iterable<Tuple3<Long, Long, Integer>> iterable, Collector<CustomerItems> collector) throws Exception {

ArrayList<Long> newItems = new ArrayList<>();

Long customerId = 0L;

for (Tuple3<Long, Long, Integer> item : iterable) {

customerId = item.f0;

newItems.add(item.f1);

}

collector.collect(new CustomerItems(customerId, newItems));

}

});

// obtém todos os itens do customer que pertence a um usuário parecido

DataSet<CustomerItems> ci = customerItems.cross(customerItems).with(new CrossFunction<CustomerItems, CustomerItems, CustomerItems>() {

@Override

public CustomerItems cross(CustomerItems customerItems, CustomerItems customerItems2) throws Exception {

if (!customerItems.customerId.equals(customerItems2.customerId)) {

boolean has = false;

for (Long item : customerItems2.items) {

if (customerItems.items.contains(item)) {

has = true;

break;

}

}

if (has) {

for (Long item : customerItems2.items) {

if (!customerItems.items.contains(item)) {

customerItems.ritems.add(item);

}

}

}

}

return customerItems;

}

}).groupBy(new KeySelector<CustomerItems, Long>() {

@Override

public Long getKey(CustomerItems customerItems) throws Exception {

return customerItems.customerId;

}

}).reduceGroup(new GroupReduceFunction<CustomerItems, CustomerItems>() {

@Override

public void reduce(Iterable<CustomerItems> iterable, Collector<CustomerItems> collector) throws Exception {

CustomerItems c = new CustomerItems();

for (CustomerItems current : iterable) {

c.customerId = current.customerId;

for (Long item : current.ritems) {

if (!c.ritems.contains(item)) {

c.ritems.add(item);

}

}

}

collector.collect(c);

}

});

ci.first(100).print();

System.out.println(ci.count());

}

public static class CustomerItems implements Serializable {

public Long customerId;

public ArrayList<Long> items = new ArrayList<>();

public ArrayList<Long> ritems = new ArrayList<>();

public CustomerItems() {

}

public CustomerItems(Long customerId, ArrayList<Long> items) {

this.customerId = customerId;

this.items = items;

}

@Override

public String toString() {

StringBuilder itemsData = new StringBuilder();

if (items != null) {

for (Long item : items) {

if (itemsData.length() == 0) {

itemsData.append(item);

} else {

itemsData.append(", ").append(item);

}

}

}

StringBuilder ritemsData = new StringBuilder();

if (ritems != null) {

for (Long item : ritems) {

if (ritemsData.length() == 0) {

ritemsData.append(item);

} else {

ritemsData.append(", ").append(item);

}

}

}

return String.format("[ID: %d, Items: %s, RItems: %s]", customerId, itemsData, ritemsData);

}

}

public static final class LineFieldSplitter implements FlatMapFunction<String, Tuple2<Long, Long>> {

@Override

public void flatMap(String value, Collector<Tuple2<Long, Long>> out) {

// normalize and split the line

String[] tokens = value.split(" ");

if (tokens.length > 1) {

out.collect(new Tuple2<>(Long.valueOf(tokens[0]), Long.valueOf(tokens[1])));

}

}

}

}

Read more

前端防范 XSS(跨站脚本攻击)

目录 一、防范措施 1.layui util  核心转义的特殊字符 示例 2.js-xss.js库 安装 1. Node.js 环境(npm/yarn) 2. 浏览器环境 核心 API 基础使用 1. 基础过滤(默认规则) 2. 自定义过滤规则 (1)允许特定标签 (2)允许特定属性 (3)自定义标签处理 (4)自定义属性处理 (5)转义特定字符 常见场景示例 1. 过滤用户输入的评论内容 2. 允许特定富文本标签(如富文本编辑器内容) 注意事项 更多配置 XSS(跨站脚本攻击)是一种常见的网络攻击手段,它允许攻击者将恶意脚本注入到其他用户的浏览器中。

详细教程:如何从前端查看调用接口、传参及返回结果(附带图片案例)

详细教程:如何从前端查看调用接口、传参及返回结果(附带图片案例)

目录 1. 打开浏览器开发者工具 2. 使用 Network 面板 3. 查看具体的API请求 a. Headers b. Payload c. Response d. Preview e. Timing 4. 实际操作步骤 5. 常见问题及解决方法 a. 无法看到API请求 b. 请求失败 c. 跨域问题(CORS) 作为一名后端工程师,理解前端如何调用接口、传递参数以及接收返回值是非常重要的。下面将详细介绍如何通过浏览器开发者工具(F12)查看和分析这些信息,并附带图片案例帮助你更好地理解。 1. 打开浏览器开发者工具 按下 F12 或右键点击页面选择“检查”可以打开浏览器的开发者工具。常用的浏览器如Chrome、Firefox等都内置了开发者工具。下面是我选择我的一篇文章,打开开发者工具进行演示。 2. 使用

Cursor+Codex隐藏技巧:用截图秒修前端Bug的保姆级教程(React/Chakra UI案例)

Cursor+Codex隐藏技巧:用截图秒修前端Bug的保姆级教程(React/Chakra UI案例) 前端开发中最令人头疼的莫过于那些难以定位的UI问题——元素错位、样式冲突、响应式失效...传统调试方式往往需要反复修改代码、刷新页面、检查元素。现在,通过Cursor编辑器集成的Codex功能,你可以直接用截图交互快速定位和修复这些问题。本文将带你从零开始,掌握这套革命性的调试工作流。 1. 环境准备与基础配置 在开始之前,确保你已经具备以下环境: * Cursor编辑器最新版(v2.5+) * Node.js 18.x及以上版本 * React 18项目(本文以Chakra UI 2.x为例) 首先在Cursor中安装Codex插件: 1. 点击左侧扩展图标 2. 搜索"Codex"并安装 3. 登录你的OpenAI账户(需要ChatGPT Plus订阅) 关键配置项: // 在项目根目录创建.