Skip to content
Open

add #13

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ public void process(List<RecordHandler> recordHandlerList) {
数据格式定义 https://www.sensorsdata.cn/manual/data_schema.html
*/
String originalData = recordHandler.getOriginalData();

// 例如传入的一条需要处理的数据是:
//
// {
// "distinct_id":"2b0a6f51a3cd6775",
// "time":1434556935000,
// "type":"track",
// "event":"ViewProduct",
// "properties":{
// "product_name":"苹果"
// }
// }
//
// 如果是“苹果”或“梨”, 那么添加一个字段标记产品为“水果”;
// 如果是“苹果汁”或“梨汁”, 那么标记为“饮料”;

ObjectMapper objectMapper = new ObjectMapper();
JsonNode recordNode = null;
try {
Expand All @@ -50,23 +34,56 @@ public void process(List<RecordHandler> recordHandlerList) {
/*
这里异常需要进行处理,否则在抛出异常后,会导致之后的数据失效
*/
logger.warn("Parse origin data failed. OriginalData: {}", e);
logger.warn("Parse origin data failed. OriginalData: {}", originalData, e);
/*
出错后,不处理数据,将数据原封不动的传回
*/
recordHandler.send();
continue;
}
ObjectNode propertiesNode = (ObjectNode) recordNode.get("properties");

if (propertiesNode != null && propertiesNode.has("product_name")) {
String productName = propertiesNode.get("product_name").asText();
if ("苹果".equals(productName) || "梨".equals(productName)) {
propertiesNode.put("product_classify", "水果");
// 输出日志到 /data/sa_cluster/logs/extractor 下的 extractor.log 中
logger.info("Find a fruit: {}", productName);
} else if ("苹果汁".equals(productName) || "梨汁".equals(productName)) {
propertiesNode.put("product_classify", "饮料");

/**
* {
* "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.13" },
* "distinct_id": "108828724",
* "time": 1715847003996,
* "type": "profile_set",
* "properties": { "$is_login_id": true },
* "recv_time": 1715847004000,
* "project_id": 1,
* "project": "default",
* "ver": 2
* }
*/
JsonNode projectNode = recordNode.get("project");
JsonNode typeNode = recordNode.get("type");

ObjectNode libNode = (ObjectNode) recordNode.get("lib");
ObjectNode propertiesNode = (ObjectNode) recordNode.get("properties");
if (libNode != null && propertiesNode != null && projectNode != null && typeNode != null) {
// { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.13" },
JsonNode libNodeInner = libNode.get("$lib");
JsonNode libMethodNode = libNode.get("$lib_method");
JsonNode libVersionNode = libNode.get("$lib_version");
JsonNode isLoginIdNode = propertiesNode.get("$is_login_id");
if (libNodeInner != null && libMethodNode != null && libVersionNode != null && isLoginIdNode != null) {
String project = projectNode.asText();
String type = typeNode.asText();
String lib = libNodeInner.asText();
String libMethod = libMethodNode.asText();
String libVersion = libVersionNode.asText();
String isLoginId = isLoginIdNode.asText();
if (project.equals("default") &&
type.equals("profile_set") &&
lib.equals("Java") &&
libMethod.equals("code") &&
libVersion.equals("3.1.13") &&
isLoginId.equals("true")
) {
logger.warn("filter sps invalid data. [data={}]", originalData);
continue;
}
}
}
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -16,70 +17,77 @@
* @since 2018/11/20
*/
public class SamplePreProcessorTest {
private static List<String> sendData = new ArrayList<>();

private List<RecordHandler> getInputHandler() {
RecordHandler recordHandler = new RecordHandler() {
String record =
"{\"distinct_id\":\"2b0a6f51a3cd6775\",\"time\":1434556935000,\"type\":\"track\",\"event\":\"ViewProduct\",\"properties\":{\"product_name\":\"苹果\"}}";

@Override public String getOriginalData() {
return record;
}
static class RecordHandlerTest implements RecordHandler{
private String record;
public RecordHandlerTest(String record) {
this.record = record;
}

@Override public void send() {
@Override public String getOriginalData() {
return record;
}

}
@Override public void send() {
sendData.add(record);
}

@Override public void send(String data) {
record = data;
}
@Override public void send(String data) {
record = data;
send();
}

@Override public String getNginxLogProject() {
return null;
}
@Override public String getNginxLogProject() {
return null;
}

@Override public String getNginxUserAgent() {
return null;
}
@Override public String getNginxUserAgent() {
return null;
}

@Override public String getNginxLogIp() {
return null;
}
@Override public String getNginxLogIp() {
return null;
}

@Override public long getNginxLogTime() {
return 0;
}
@Override public long getNginxLogTime() {
return 0;
}

@Override public String getNginxLogCookie() {
return null;
}
@Override public String getNginxLogCookie() {
return null;
}

@Override public String getImportToken() {
return null;
}
};
return Collections.singletonList(recordHandler);
@Override public String getImportToken() {
return null;
}
}
@Test
public void test() {
String record1 =
"{\"distinct_id\":\"2b0a6f51a3cd6775\",\"time\":1434556935000,\"type\":\"track\",\"event\":\"ViewProduct\",\"properties\":{\"product_name\":\"苹果\"}}";
// 过滤
String record2 =
"{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.13\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{\"$is_login_id\":true},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}";
String record3 =
"{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{\"$is_login_id\":true},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}";
String record4 =
"{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}";
String record5 =
"{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}";

@Test public void testForFirstProcessor() throws IOException {
List<RecordHandler> recordHandler = getInputHandler();
SamplePreProcessor samplePreProcessor = new SamplePreProcessor();
samplePreProcessor.process(recordHandler);
String result = recordHandler.get(0).getOriginalData();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
Assert.assertEquals("水果", jsonNode.get("properties").get("product_classify").asText());
}

@Test public void testForSecondProcessor() throws Exception {
List<RecordHandler> recordHandler = getInputHandler();
SamplePreProcessor samplePreProcessor = new SamplePreProcessor();
SamplePreProcessor2 samplePreprocessor2 = new SamplePreProcessor2();
samplePreProcessor.process(recordHandler);
samplePreprocessor2.process(recordHandler);
String result = recordHandler.get(0).getOriginalData();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
Assert.assertEquals("吃的", jsonNode.get("properties").get("product_type").asText());
List<RecordHandler> recordHandlerList = new ArrayList<>();
recordHandlerList.add(new RecordHandlerTest(record1));
recordHandlerList.add(new RecordHandlerTest(record2));
recordHandlerList.add(new RecordHandlerTest(record3));
recordHandlerList.add(new RecordHandlerTest(record4));
recordHandlerList.add(new RecordHandlerTest(record5));

samplePreProcessor.process(recordHandlerList);
System.out.println(sendData);
assert sendData.size() == 4;
}
}