diff --git a/src/main/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessor.java b/src/main/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessor.java index db17166..3bf2f52 100644 --- a/src/main/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessor.java +++ b/src/main/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessor.java @@ -26,22 +26,6 @@ public void process(List recordHandlerList) { 数据格式定义 https://www.sensorsdata.cn/manual/data_schema.html */ String originalData = recordHandler.getOriginalData(); - - // 例如传入的一条需要处理的数据是: - // - // { - // "distinct_id":"2b0a6f51a3cd6775", - // "time":1434556935000, - // "type":"track", - // "event":"ViewProduct", - // "properties":{ - // "product_name":"苹果" - // } - // } - // - // 如果是“苹果”或“梨”, 那么添加一个字段标记产品为“水果”; - // 如果是“苹果汁”或“梨汁”, 那么标记为“饮料”; - ObjectMapper objectMapper = new ObjectMapper(); JsonNode recordNode = null; try { @@ -50,23 +34,56 @@ public void process(List recordHandlerList) { /* 这里异常需要进行处理,否则在抛出异常后,会导致之后的数据失效 */ - logger.warn("Parse origin data failed. OriginalData: {}", e); + logger.warn("Parse origin data failed. OriginalData: {}", originalData, e); /* 出错后,不处理数据,将数据原封不动的传回 */ recordHandler.send(); continue; } - ObjectNode propertiesNode = (ObjectNode) recordNode.get("properties"); - if (propertiesNode != null && propertiesNode.has("product_name")) { - String productName = propertiesNode.get("product_name").asText(); - if ("苹果".equals(productName) || "梨".equals(productName)) { - propertiesNode.put("product_classify", "水果"); - // 输出日志到 /data/sa_cluster/logs/extractor 下的 extractor.log 中 - logger.info("Find a fruit: {}", productName); - } else if ("苹果汁".equals(productName) || "梨汁".equals(productName)) { - propertiesNode.put("product_classify", "饮料"); + + /** + * { + * "lib": { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.13" }, + * "distinct_id": "108828724", + * "time": 1715847003996, + * "type": "profile_set", + * "properties": { "$is_login_id": true }, + * "recv_time": 1715847004000, + * "project_id": 1, + * "project": "default", + * "ver": 2 + * } + */ + JsonNode projectNode = recordNode.get("project"); + JsonNode typeNode = recordNode.get("type"); + + ObjectNode libNode = (ObjectNode) recordNode.get("lib"); + ObjectNode propertiesNode = (ObjectNode) recordNode.get("properties"); + if (libNode != null && propertiesNode != null && projectNode != null && typeNode != null) { + // { "$lib": "Java", "$lib_method": "code", "$lib_version": "3.1.13" }, + JsonNode libNodeInner = libNode.get("$lib"); + JsonNode libMethodNode = libNode.get("$lib_method"); + JsonNode libVersionNode = libNode.get("$lib_version"); + JsonNode isLoginIdNode = propertiesNode.get("$is_login_id"); + if (libNodeInner != null && libMethodNode != null && libVersionNode != null && isLoginIdNode != null) { + String project = projectNode.asText(); + String type = typeNode.asText(); + String lib = libNodeInner.asText(); + String libMethod = libMethodNode.asText(); + String libVersion = libVersionNode.asText(); + String isLoginId = isLoginIdNode.asText(); + if (project.equals("default") && + type.equals("profile_set") && + lib.equals("Java") && + libMethod.equals("code") && + libVersion.equals("3.1.13") && + isLoginId.equals("true") + ) { + logger.warn("filter sps invalid data. [data={}]", originalData); + continue; + } } } /* diff --git a/src/test/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessorTest.java b/src/test/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessorTest.java index 3addade..f50fe83 100644 --- a/src/test/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessorTest.java +++ b/src/test/java/com/sensorsdata/analytics/extractor/processor/SamplePreProcessorTest.java @@ -8,6 +8,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -16,70 +17,77 @@ * @since 2018/11/20 */ public class SamplePreProcessorTest { + private static List sendData = new ArrayList<>(); - private List getInputHandler() { - RecordHandler recordHandler = new RecordHandler() { - String record = - "{\"distinct_id\":\"2b0a6f51a3cd6775\",\"time\":1434556935000,\"type\":\"track\",\"event\":\"ViewProduct\",\"properties\":{\"product_name\":\"苹果\"}}"; - @Override public String getOriginalData() { - return record; - } + static class RecordHandlerTest implements RecordHandler{ + private String record; + public RecordHandlerTest(String record) { + this.record = record; + } - @Override public void send() { + @Override public String getOriginalData() { + return record; + } - } + @Override public void send() { + sendData.add(record); + } - @Override public void send(String data) { - record = data; - } + @Override public void send(String data) { + record = data; + send(); + } - @Override public String getNginxLogProject() { - return null; - } + @Override public String getNginxLogProject() { + return null; + } - @Override public String getNginxUserAgent() { - return null; - } + @Override public String getNginxUserAgent() { + return null; + } - @Override public String getNginxLogIp() { - return null; - } + @Override public String getNginxLogIp() { + return null; + } - @Override public long getNginxLogTime() { - return 0; - } + @Override public long getNginxLogTime() { + return 0; + } - @Override public String getNginxLogCookie() { - return null; - } + @Override public String getNginxLogCookie() { + return null; + } - @Override public String getImportToken() { - return null; - } - }; - return Collections.singletonList(recordHandler); + @Override public String getImportToken() { + return null; + } } + @Test + public void test() { + String record1 = + "{\"distinct_id\":\"2b0a6f51a3cd6775\",\"time\":1434556935000,\"type\":\"track\",\"event\":\"ViewProduct\",\"properties\":{\"product_name\":\"苹果\"}}"; + // 过滤 + String record2 = + "{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.13\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{\"$is_login_id\":true},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}"; + String record3 = + "{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{\"$is_login_id\":true},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}"; + String record4 = + "{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"properties\":{},\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}"; + String record5 = + "{\"lib\":{\"$lib\":\"Java\",\"$lib_method\":\"code\",\"$lib_version\":\"3.1.12\"},\"distinct_id\":\"108828724\",\"time\":1715847003996,\"type\":\"profile_set\",\"recv_time\":1715847004000,\"project_id\":1,\"project\":\"default\",\"ver\":2}"; - @Test public void testForFirstProcessor() throws IOException { - List recordHandler = getInputHandler(); - SamplePreProcessor samplePreProcessor = new SamplePreProcessor(); - samplePreProcessor.process(recordHandler); - String result = recordHandler.get(0).getOriginalData(); - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.readTree(result); - Assert.assertEquals("水果", jsonNode.get("properties").get("product_classify").asText()); - } - @Test public void testForSecondProcessor() throws Exception { - List recordHandler = getInputHandler(); SamplePreProcessor samplePreProcessor = new SamplePreProcessor(); - SamplePreProcessor2 samplePreprocessor2 = new SamplePreProcessor2(); - samplePreProcessor.process(recordHandler); - samplePreprocessor2.process(recordHandler); - String result = recordHandler.get(0).getOriginalData(); - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.readTree(result); - Assert.assertEquals("吃的", jsonNode.get("properties").get("product_type").asText()); + List recordHandlerList = new ArrayList<>(); + recordHandlerList.add(new RecordHandlerTest(record1)); + recordHandlerList.add(new RecordHandlerTest(record2)); + recordHandlerList.add(new RecordHandlerTest(record3)); + recordHandlerList.add(new RecordHandlerTest(record4)); + recordHandlerList.add(new RecordHandlerTest(record5)); + + samplePreProcessor.process(recordHandlerList); + System.out.println(sendData); + assert sendData.size() == 4; } }