项目一:ELK用于日志分析
需求:集中收集分布式服务的日志
1逻辑模块程序随时输出日志
package com.itheima.es; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Random; /** * creste by itheima.itcast */ @SpringBootTest @RunWith(SpringRunner.class) public class TestLog { private static final Logger LOGGER= LoggerFactory.getLogger(TestLog.class); @Test public void testLog(){ Random random =new Random(); while (true){ int userid=random.nextInt(10); LOGGER.info("userId:{},send:{}",userid,"hello world.I am "+userid); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
日志配置文件
<?xml version="1.0" encoding="UTF-8"?> <configuration> <!--定义日志文件的存储地址,使用绝对路径--> <property name="LOG_HOME" value="d:/logs"/> <!-- Console 输出设置 --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> <charset>utf8</charset> </encoder> </appender> <!-- 按照每天生成日志文件 --> <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志文件输出的文件名--> <fileNamePattern>${LOG_HOME}/log-%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- 异步输出 --> <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 --> <discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为256 --> <queueSize>512</queueSize> <!-- 添加附加的appender,最多只能添加一个 --> <appender-ref ref="FILE"/> </appender> <logger name="org.apache.ibatis.cache.decorators.LoggingCache" level="DEBUG" additivity="false"> <appender-ref ref="CONSOLE"/> </logger> <logger name="org.springframework.boot" level="DEBUG"/> <root level="info"> <!--<appender-ref ref="ASYNC"/>--> <appender-ref ref="FILE"/> <appender-ref ref="CONSOLE"/> </root> </configuration>
2logstash收集日志到es
input { file { path => ["D:/logs/log-*.log"] start_position => "beginning" } } filter { grok { match => { "message" => "%{DATA:datetime}\ \[%{DATA:thread}\]\ %{DATA:level}\ \ %{DATA:class} - %{GREEDYDATA:logger}" } remove_field => [ "message" ] } date { match => ["datetime", "yyyy-MM-dd HH:mm:ss.SSS"] } if "_grokparsefailure" in [tags] { drop { } } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "logger-%{+YYYY.MM.dd}" } }
grok 内置类型
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b POSINT \b(?:[1-9][0-9]*)\b NONNEGINT \b(?:[0-9]+)\b WORD \b\w+\b NOTSPACE \S+ SPACE \s* DATA .*? GREEDYDATA .* QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} ## Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) HOST %{HOSTNAME} IPORHOST (?:%{HOSTNAME}|%{IP}) HOSTPORT %{IPORHOST}:%{POSINT} ## paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? URIHOST %{IPORHOST}(?::%{POSINT:port})? ## uripath comes loosely from RFC1738, but mostly from what Firefox ## doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? ## Months: January, Feb, 3, 03, 12, December MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) ## Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) ## Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) ## '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) ## datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[PMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} ## Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG (?:[\w._/%-]+) SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} ## Shortcuts QS %{QUOTEDSTRING} ## Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} ## Log Levels LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
写logstash配置文件。
启动 logstash:logstash.bat -f …/config/test6.conf
3kibana展现数据
项目二:学成在线站内搜索模块
mysql导入course_pub表
创建索引xc_course
创建映射
PUT /xc_course { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "properties": { "description" : { "analyzer" : "ik_max_word", "search_analyzer": "ik_smart", "type" : "text" }, "grade" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "mt" : { "type" : "keyword" }, "name" : { "analyzer" : "ik_max_word", "search_analyzer": "ik_smart", "type" : "text" }, "users" : { "index" : false, "type" : "text" }, "charge" : { "type" : "keyword" }, "valid" : { "type" : "keyword" }, "pic" : { "index" : false, "type" : "keyword" }, "qq" : { "index" : false, "type" : "keyword" }, "price" : { "type" : "float" }, "price_old" : { "type" : "float" }, "st" : { "type" : "keyword" }, "status" : { "type" : "keyword" }, "studymodel" : { "type" : "keyword" }, "teachmode" : { "type" : "keyword" }, "teachplan" : { "analyzer" : "ik_max_word", "search_analyzer": "ik_smart", "type" : "text" }, "expires" : { "type" : "date", "format": "yyyy-MM-dd HH:mm:ss" }, "pub_time" : { "type" : "date", "format": "yyyy-MM-dd HH:mm:ss" }, "start_time" : { "type" : "date", "format": "yyyy-MM-dd HH:mm:ss" }, "end_time" : { "type" : "date", "format": "yyyy-MM-dd HH:mm:ss" } } } }
logstash创建模板文件
Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash使用。
在logstach的config目录创建xc_course_template.json,内容如下:
{ "mappings" : { "doc" : { "properties" : { "charge" : { "type" : "keyword" }, "description" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "end_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "expires" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "grade" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "mt" : { "type" : "keyword" }, "name" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "pic" : { "index" : false, "type" : "keyword" }, "price" : { "type" : "float" }, "price_old" : { "type" : "float" }, "pub_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "qq" : { "index" : false, "type" : "keyword" }, "st" : { "type" : "keyword" }, "start_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "status" : { "type" : "keyword" }, "studymodel" : { "type" : "keyword" }, "teachmode" : { "type" : "keyword" }, "teachplan" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "users" : { "index" : false, "type" : "text" }, "valid" : { "type" : "keyword" } } } }, "template" : "xc_course" }
logstash配置mysql.conf
1、ES采用UTC时区问题
ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时
where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)
2、logstash每个执行完成会在/config/logstash_metadata记录执行时间下次以此时间为基准进行增量同步数据到索引库。
启动
.\logstash.bat -f ..\config\mysql.conf
后端代码
Controller
@RestController @RequestMapping("/search/course") public class EsCourseController { @Autowired EsCourseService esCourseService; @GetMapping(value="/list/{page}/{size}") public QueryResponseResult<CoursePub> list(@PathVariable("page") int page, @PathVariable("size") int size, CourseSearchParam courseSearchParam) { return esCourseService.list(page,size,courseSearchParam); } }
Service
@Service public class EsCourseService { @Value("${heima.course.source_field}") private String source_field; @Autowired RestHighLevelClient restHighLevelClient; //课程搜索 public QueryResponseResult<CoursePub> list(int page, int size, CourseSearchParam courseSearchParam) { if (courseSearchParam == null) { courseSearchParam = new CourseSearchParam(); } //1创建搜索请求对象 SearchRequest searchRequest = new SearchRequest("xc_course"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //过虑源字段 String[] source_field_array = source_field.split(","); searchSourceBuilder.fetchSource(source_field_array, new String[]{}); //创建布尔查询对象 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); //搜索条件 //根据关键字搜索 if (StringUtils.isNotEmpty(courseSearchParam.getKeyword())) { MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(courseSearchParam.getKeyword(), "name", "description", "teachplan") .minimumShouldMatch("70%") .field("name", 10); boolQueryBuilder.must(multiMatchQueryBuilder); } if (StringUtils.isNotEmpty(courseSearchParam.getMt())) { //根据一级分类 boolQueryBuilder.filter(QueryBuilders.termQuery("mt", courseSearchParam.getMt())); } if (StringUtils.isNotEmpty(courseSearchParam.getSt())) { //根据二级分类 boolQueryBuilder.filter(QueryBuilders.termQuery("st", courseSearchParam.getSt())); } if (StringUtils.isNotEmpty(courseSearchParam.getGrade())) { //根据难度等级 boolQueryBuilder.filter(QueryBuilders.termQuery("grade", courseSearchParam.getGrade())); } //设置boolQueryBuilder到searchSourceBuilder searchSourceBuilder.query(boolQueryBuilder); //设置分页参数 if (page <= 0) { page = 1; } if (size <= 0) { size = 12; } //起始记录下标 int from = (page - 1) * size; searchSourceBuilder.from(from); searchSourceBuilder.size(size); //设置高亮 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.preTags("<font class='eslight'>"); highlightBuilder.postTags("</font>"); //设置高亮字段 // <font class='eslight'>node</font>学习 highlightBuilder.fields().add(new HighlightBuilder.Field("name")); searchSourceBuilder.highlighter(highlightBuilder); searchRequest.source(searchSourceBuilder); QueryResult<CoursePub> queryResult = new QueryResult(); List<CoursePub> list = new ArrayList<CoursePub>(); try { //2执行搜索 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //3获取响应结果 SearchHits hits = searchResponse.getHits(); long totalHits=hits.getTotalHits().value; //匹配的总记录数 // long totalHits = hits.totalHits; queryResult.setTotal(totalHits); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { CoursePub coursePub = new CoursePub(); //源文档 Map<String, Object> sourceAsMap = hit.getSourceAsMap(); //取出id String id = (String) sourceAsMap.get("id"); coursePub.setId(id); //取出name String name = (String) sourceAsMap.get("name"); //取出高亮字段name Map<String, HighlightField> highlightFields = hit.getHighlightFields(); if (highlightFields != null) { HighlightField highlightFieldName = highlightFields.get("name"); if (highlightFieldName != null) { Text[] fragments = highlightFieldName.fragments(); StringBuffer stringBuffer = new StringBuffer(); for (Text text : fragments) { stringBuffer.append(text); } name = stringBuffer.toString(); } } coursePub.setName(name); //图片 String pic = (String) sourceAsMap.get("pic"); coursePub.setPic(pic); //价格 Double price = null; try { if (sourceAsMap.get("price") != null) { price = (Double) sourceAsMap.get("price"); } } catch (Exception e) { e.printStackTrace(); } coursePub.setPrice(price); //旧价格 Double price_old = null; try { if (sourceAsMap.get("price_old") != null) { price_old = (Double) sourceAsMap.get("price_old"); } } catch (Exception e) { e.printStackTrace(); } coursePub.setPrice_old(price_old); //将coursePub对象放入list list.add(coursePub); } } catch (IOException e) { e.printStackTrace(); } queryResult.setList(list); QueryResponseResult<CoursePub> queryResponseResult = new QueryResponseResult<CoursePub>(CommonCode.SUCCESS, queryResult); return queryResponseResult; } }