聚合搜索平台项目笔记

项目介绍

一个聚合搜索平台,可以让用户在同一个入口(同一个页面)集中搜索出不同来源、不同类型的内容用户:提升用户的检索效率、提升用户体验。企业:无需针对每一个项目都去开发一个搜索功能,当你有新的内容、新的网站时,可以复用同一套搜索系统,提升开发效率

技术栈

前端

  • Vue
  • Ant Design Vue
  • Lodash

后端

  • Spring Boot

  • MySQL

  • ElasticSearch(Elastic Stack)搜索引擎

  • 数据抓取

  • 数据同步

  • 4种

  • logstash

  • Canal

  • Guava Retrying

  • 怎么保证API稳定性

业务流程

  1. 先得到各种不同分类的数据
  2. 提供一个搜索页面(单一搜索 + 聚合搜索),支持搜索
  3. (可以做一些优化,比如关键词高亮,防抖节流)

直播安排

第一期

  1. 前端、后端项目初始化
  2. 前端搜索页面开发完成
  3. 后端基本搜索接口开发完成

第二期

  1. 数据抓取
  2. 聚合搜索接口的开发
  3. Elasticsearch 搭建及入门

第三期

  1. Elasticsearch的使用(建表、读写数据、调 API、和 Java 整合)
  2. 数据同步(四种同步方式)

第四期

  1. 接口稳定性保证
  2. 项目优化(关键词高亮、搜索建议、防抖节流)

第一期

  1. 前端、后端项目初始化
  2. 前端搜索页面开发完成
  3. 后端基本搜索接口开发完成

Git推送Github命令

1
2
3
4
5
6
git init
git add .
git commit -m "init"
git branch -M main
git remote add origin
git push -u origin main

问题:

17653@DESKTOP-TV26QNV MINGW64 /e/Java星球项目/lingxi-search/lingxi-search (main)$ git push -u orign mainfatal: ‘orign’ does not appear to be a git repositoryfatal: Could not read from remote repository.

Please make sure you have the correct access rightsand the repository exists.

git pull origin main –allow-unrelated-histories

前端项目初始化

  1. 安装脚手架工具npm install -g @vue/cli
  2. 创建项目vue create antd-demo
  3. 用webstorm打开项目,搜索prettier,进行手动配置
  4. 安装Ant Design vue组件库npm i –save ant-design-vue@4.x
  5. 删除AboutPage和其路由
  6. 更改main.ts文件
1
2
3
4
5
6
7
8
import {createApp} from 'vue'
import App from './App.vue';
import Antd from 'ant-design-vue';
import 'ant-design-vue/dist/reset.css';
import router from "@/router";


createApp(App).use(Antd).use(router).mount('#app');
  1. 安装axiosnpm install axios

后端项目初始化

  1. 更改pom文件的项目名name,包名com.hjj、工件名lingxibi和Spring Boot版本(包名要全局更改)
  2. 检查和更改项目的JDK版本
  3. 更改swagger接口文档的包扫描范围
  4. 更改yml文件的端口,项目名spring.application.name和数据库的信息,如url、username、password
  5. 更改Spring Boot启动类@MapperScan的包扫描路径
  6. 修改banner.txt文件

前端开发

用 url 记录页面搜索状态x当用户刷新页面时,能够从 url 还原之前的搜索状态

url <=> 页面状态

核心小技巧:把同步状态改为单向,即只允许 url 来改变页面状态,不允许反向改变

  1. 让用户在操作的时候,改变 url 地址(点击搜索框,搜索内容填充到 url 上?切换 tab 时,也是填充)
  2. 当 url 改变的时候,去改变页面状态(监听 url 的改变)

联调后端

第二期

  1. 获取多种不同类型的数据源
  • 文章(内部)
  • 用户(内部)
  • 图片(外部,不是我们自自己的)
  1. 前后端单独搜索接口联调,跑通整个页面
  2. 分析现状的问题 => 优化,聚合接口的开发

获取不同类型的数据源

1. 获取文章

内部没有,就只能从互联网上获取基础数据 => 爬虫

获取到文章后要入库(定时获取或者只获取一次)=> 离线抓取

2. 用户获取

每个网站的用户都是自己的

3. 获取图片

实时抓取:我们自己的网站不存这些数据,用户要搜的时候,直接从别人的接口(网站)去搜

防盗链

数据抓取的几种方式

  1. 直接请求数据接口(最方便)httpclient、OkHttp、Restemplate、Hutool
  2. 登网页渲染出明文内容后,从前端的页面内容抓取
  3. 有一些网站可能是动态请求的,他不会一次性加载所有的数据,而要是你点某个按钮,输入某个验证码才会显示出数据。 => 无头浏览器:selenium、node.js puppeteer

数据抓取流程

  1. 分析数据源(怎么获取)

  2. 拿到数据后,怎么处理

  3. 写入数据库等存储

程序单次运行

1
2
3
4
5
public class FetchInitPostList implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
}
}

前后端联调,跑通整个页面

目前是在页面加载时,调用三个接口分别获取文章、图片、用户数据。

几种不同的业务场景:

  1. 其实可以用户点击某个 tab 的时候,只调用这个 tab 的查询接口
  2. 如果是针对聚合内容的网页,其实可以一个请求搞定
  3. 有可能还要查询其他信息,比如其他数据的总数,同时给用户反馈

根据实际情况去选择方式

聚合接口

  1. 请求数量比较多,可能会收到浏览器的限制 => 用一个接口请求完所有数据{user userService.querypost postService.querypicture pictureService.queryreturn user + post + picture}
  2. 请求不同接口的参数可能不一致,增加前后端沟通成本=>用一个接口把请求参数统一,前端每次传固定的参数,后端去对参数进行转换{统一传searchText后端把searchText转换为userName=>queryUser}统一返回结果:比如Page页面封装
  3. 前端要写调用多个接口的代码,重复代码 => 用一个接口,通过不同的参数区分查询的数据源{前端传type调用后端同一个接口,后端根据type调用不同的service查询type = user,userService.query}532ms 1.04s 1.74s => 1.88s 2.12s

并发不一定更快!短板效应、要以实际测试为准

第三期

  1. 聚合接口优化(支持前端更灵活的拿数据)
  2. 从0开始学习 Elastic Stack(Elasticsearch)
  3. 学习数据同步,怎么把一个数据库内的数据同步到其他数据库

聚合接口优化

怎么样能让前端又能一次搜出所有数据、又能够分别获取某一类数据(比如分页场景)

新增:前端传type调用后端同一个接口,后端根据type调用不同的service查询比如:type=user,userService.query

  1. 如果type为空,那么搜索出所有的数据
  2. 如果type不为空a. 如果type合法,那么查出对应数据b. 否则报错

问题:type增多后,要把查询逻辑堆积在controller代码里

本质:怎么能让搜索系统接入更多的数据呢?

怎么能让搜索系统更轻松地接入更多的数据呢?

门面模式

帮助我们用户(客户端)去更轻松地实现功能,不需要关心门面背后的细节。聚合搜索类业务基本都是门面模式:即前端不用关心后端从哪里、怎么去取不同来源、怎么去聚合不同来源的数据,更方便地获取到内容。

补充:当调用你系统(接口)的客户端觉得麻烦的时候,你就应该思考,是不是可以抽象一个门面了。

适配器模式

  1. 定制统一的数据源接入规范(标准):什么数据源允许接入?你的数据源接入时要满足什么要求?要做什么事情?任何介入我们系统的数据必须要能够根据关键词搜索,并且支持分页搜索声明接口来定义规范
  2. 假如说我们的数据源已经支持了搜索,但是原有的方法参数和我们的规范不一致,怎么办?适配器模式的作用:通过转换,让两个系统能够完成对接

注册器模式(本质也是单例)

提前通过一个map或者其他类型存储好后面需要调用的对象。效果:代码量大幅度减少,可维护可扩展。

搜索优化

问题:搜索不够灵活!!!

Java 和 Python都能搜到,但是Java Python并不能搜索出来

Elastic Stack

包含Elastic search的一套数据

官网:https://www.elastic.co/cn/elasticsearch

beats:从各种不同类型的文件/应用来采集数据a,b,c,d,e,aa,bb,ccLogstash:从多个采集器或数据源来抽取/转换数据,向es输送aa,bb,ccelasticsearch:存储、查询数据kibana:可视化es的数据

安装ES

elasticsearch:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/setup.html

kibana:https://www.elastic.co/guide/en/kibana/7.17/introduction.html

Elasticsearch 概念

你就把MySQL一样的数据库

Index 索引 => MySQL 里的表(table)

建表、增删改查(查询需要花费的学习时间最多)用客户端去调用 ElasticSearch(3种)

语法:SQL、代码的方法(4种语法)

ES 相比于MySQL,能够自动帮我们做分词,能够非常高效、灵活地查询内容

索引(文档)

正向索引:理解为书籍的目录,可以快速帮你找到对应的内容(根据目录找文章)

倒排索引:根据文章内容找文章。会建立一个倒排索引表,其中key为搜索关键词(文章内容),value为搜索内容(文章)

ES 的几种调用方式

  1. restful api(http调用)localhost:9200(ES的启动端口)localhost:5601(kibana 启动端口)ES 启动端口:9200:供用户(客户端)使用的端口9300:给ES集群内部通信的外部调用不了
  2. kibana devtools自由地对 ES 进行操作(本质也是restful api)devtools 不建议生产环境使用
  3. 客户端调用方式(Java、Go)

ES 的查询语法

DSL

json格式好理解,elasticsearch的原生查询语言和 http 请求最兼容(比较推荐)

建表、插入数据
1
2
3
4
5
POST post/_doc
{
"title":"鱼皮2",
"desc":"鱼皮的描述2"
}
查询数据(https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-dsl.html)
1
2
3
4
5
6
GET post/_search
{
"query": {
"match_all": {}
}
}
根据 id 查询

GET post/_doc/foxiA44By3NFHN2iLu4F

删除

DELETE _data_stream/post

EQL

专门查询 ECS 文档(标准指标文档)

更加规范,适合特定场景

https://www.elastic.co/guide/en/elasticsearch/reference/7.17/eql.html

https://bcdh.yuque.com/staff-wpxfif/resource/ic46ng54qy6bh52n#WNSjf

SQL

学习成本低(可能需要配合其他插件且性能低)

1
2
3
4
POST /_sql?format=txt
{
"query": "SELECT * FROM post"
}

Painless Script

编程式取值,更灵活,但是学习成本呢高

Mapping

理解为数据库的表结构,有哪些字段、字段类型ES支持动态mapping,表结构可以动态改变,而不像MySQL一样必须手动建表,没有的字段就不能插入

显示创建mapping:

1
2
3
4
5
6
7
8
9
10
PUT /my-index-000001
{
"mappings": {
"properties": {
"age": { "type": "integer" },
"email": { "type": "keyword" },
"name": { "type": "text" }
}
}
}

第四期

  1. 继续讲 ElasticStack 的概念
  2. 学习用 Java 来调用 ElasticSearch
  3. 使用 ES 来优化聚合搜索接口
  4. 已有的DB的数据和ES数据同步(增量、全量、实时、非实时)
  5. jmeter 压力测试
  6. 保证接口稳定性
  7. 其他的拓展思路

ElasticStack 概念

ES索引(Index)=>表ES field(字段)=>列倒排索引(根据内容查找索引)调用方式(DSL、EQL、SQL等)Mapping 表结构

  • 自动生成 mapping
  • 手动指定 mapping

分词器

文档链接:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/test-analyzer.html

分词的一种规则

空格分词器:

1
2
3
4
5
POST _analyze
{
"analyzer": "whitespace",
"text": "The quick brown fox."
}

标准分词器:

1
2
3
4
5
6
POST _analyze
{
"tokenizer": "standard",
"filter": [ "lowercase", "asciifolding" ],
"text": "Is this déja vu?"
}

关键词分词器:就是不分词,整句话当做专业术语

1
2
3
4
5
POST _analyze
{
"analyzer": "keyword",
"text": "The 2 QUICK Brown-Foxes jumped over the lazy dog's bone."
}

IK 分词器(国内用的比较多)

中文友好:https://github.com/infinilabs/analysis-ik

插件下载地址:https://github.com/infinilabs/analysis-ik

ik_smart 和 ik_max_word的区别:

ik_smart:只能分词,尽量选择最像一个词的拆分方式,比如“小”、“小黑子”

1
2
3
4
5
POST _analyze
{
"analyzer": "ik_smart",
"text": "我是汉堡,我不是小黑子,我是坤坤真爱粉"
}

ik_max_word:以最细粒度尽可能分词,可以包括组合词,比如“小黑”,“小黑子”

1
2
3
4
5
POST _analyze
{
"analyzer": "ik_max_word",
"text": "我是汉堡,我不是小黑子,我是坤坤真爱粉"
}

思考:

ES 打分机制

有 3 条内容:

  1. 鱼皮是狗
  2. 鱼皮是小黑子
  3. 我是小黑子

用户搜索:

  1. 鱼皮,第一条分数最高,因为第一条匹配了关键词,而且更短(匹配度更高)
  2. 鱼皮小黑子 => 鱼皮,小,黑子 => 2 > 3 >1

参考文章:https://liyupi.blog.csdn.net/article/details/119176943

官方参考文章:https://www.elastic.co/guide/en/elasticsearch/guide/master/controlling-relevance.html

Java 操作 ES

3种:

  1. 官方 提供给Java 的 API:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/connecting.html
  2. ES 以前官方的 Java API,HighLevelRestClient(已废弃,不建议)
  3. Spring Data ElasticSearch

spring-data 系列:spring 提供的操作数据的全家桶

spring-data-redis:操作 Redis

spring-data-mongodb:操作 MongoDB

spring-data-elasticsearch:操作elasticsearch

官方文档:https://docs.spring.io/spring-data/elasticsearch/docs/4.4.10/reference/html/

自定义方法:

用户指定接口的方法名称,框架自动帮你

用 ES 实现搜索接口

1、建表(简历索引)

post 表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 帖子表
create table if not exists post
(
id bigint auto_increment comment 'id' primary key,
title varchar(512) null comment '标题',
content text null comment '内容',
tags varchar(1024) null comment '标签列表(json 数组)',
thumbNum int default 0 not null comment '点赞数',
favourNum int default 0 not null comment '收藏数',
userId bigint not null comment '创建用户 id',
createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间',
updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',
isDelete tinyint default 0 not null comment '是否删除',
index idx_userId (userId)
) comment '帖子' collate = utf8mb4_unicode_ci;

ES Mapping:

id(可以不放到字段设置里面)

ES 中,尽量存放用户筛选(搜索)的数据,频繁变化的数据就没必要存放 ES 中

aliases:别名(为了后续方便数据迁移)

字段类型是 text,这个字段是可以分词的、可模糊查询的;而如果是 keyword,只能完全匹配,精确查询

存储的时候尽量选择 ik_max_word 分词器,索引更多、更可能被搜出来

搜索的时候尽量选择 ik_smart 分词器,更偏向于用户想搜的分词

如果想要让 text 类型的分词字段也支持精确查询,可以创建 keyword 类型的子字段:

1
2
3
4
5
6
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256 // 超过 256 长度则忽略查询
}
}

建表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
POST post_v1
{
"aliases": {
"post": {}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tags": {
"type": "keyword"
},
"thumbNum": {
"type": "long"
},
"favourNum": {
"type": "long"
},
"userId": {
"type": "keyword"
},
"createTime": {
"type": "date"
},
"updateTime": {
"type": "date"
},
"isDelete": {
"type": "keyword"
}
}
}
}

PostEsDTO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Data
public class PostEsDTO implements Serializable {

private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";

/**
* id
*/
@Id
private Long id;

/**
* 标题
*/
private String title;

/**
* 内容
*/
private String content;

/**
* 标签列表
*/
private List<String> tags;

/**
* 创建用户 id
*/
private Long userId;

/**
* 创建时间
*/
@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN) // 指定日期格式
private Date createTime;

/**
* 更新时间
*/
@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)
private Date updateTime;

/**
* 是否删除
*/
private Integer isDelete;

private static final long serialVersionUID = 1L;

/**
* 对象转包装类
*
* @param post
* @return
*/
public static PostEsDTO objToDto(Post post) {
if (post == null) {
return null;
}
PostEsDTO postEsDTO = new PostEsDTO();
BeanUtils.copyProperties(post, postEsDTO);
String tagsStr = post.getTags();
if (StringUtils.isNotBlank(tagsStr)) {
postEsDTO.setTags(JSONUtil.toList(tagsStr, String.class));
}
return postEsDTO;
}

/**
* 包装类转对象
*
* @param postEsDTO
* @return
*/
public static Post dtoToObj(PostEsDTO postEsDTO) {
if (postEsDTO == null) {
return null;
}
Post post = new Post();
BeanUtils.copyProperties(postEsDTO, post);
List<String> tagList = postEsDTO.getTags();
if (CollUtil.isNotEmpty(tagList)) {
post.setTags(JSONUtil.toJsonStr(tagList));
}
return post;
}
}

增删改查

ES 中,_开头的字段表示系统默认字段,比如 _id,如果系统不指定会自动生成,但是不会在 _source 字段中补充 id 的值,所以建议大家手动指定。

  1. ElasticsearchRepository<PostEsDTO,Long>,默认提供了简单的增删改查,多用于自定义查询
1
2
3
4
@NoRepositoryBean
public interface ElasticsearchRepository<T, ID> extends PagingAndSortingRepository<T, ID> {
Page<T> searchSimilar(T entity, @Nullable String[] fields, Pageable pageable);
}
  1. Spring 默认给我们提供操作 es 的客户端对象 ElasticSearchRestTemplate,也提供了增删改查,它的增删改查更加复杂,适合更复杂的场景

对于复杂的场景建议使用第 二种。

三个步骤:

  1. 取参数
  2. 把参数组合为 ES 支持的搜索条件
  3. 从返回值中区结果

查询 DSL:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-filter-context.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
GET /_search
{
"query": {
"bool": {
"must": [ // must 必须包含的查询字段
{ "match": { "title": "鱼皮" }}, // match 模糊查询,term 精确查询,range 范围查询,wildcard 模糊查询,regexp 正则查询
{ "match": { "content": "知识星球" }}
],
"filter": [
{ "term": { "status": "published" }},
{ "range": { "publish_date": { "gte": "2015-01-01" }}}
]
}
}
}
GET /_search
{
"query": {
"bool": {
"must": [
{ "term": { "tags": "java" }},
{ "match": { "content": "知识星球" }}
],
"filter": [
{ "term": { "status": "published" }},
{ "range": { "publish_date": { "gte": "2015-01-01" }}}
]
}
}
}

查询结果中,score 代表匹配分数

minimum_should_match:最小满足的字段

searchHit:

totalHits 搜索条数,

maxScore:最大分数

searchHits:搜索出来的数据

建议先测试 DSL、再翻译成 Java

查出数据后,再根据查到内容的 id 去数据库查找到动态数据。

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致性

MySQL => ES(单向)

首次安装完 ES,把MySQL 数据全量同步到 ES 里,写一次单次脚本

4 种方式,全量同步(首次)+ 增量同步(新数据):

  1. 定时任务,比如1分钟1次,找到MySQL中过去几分钟内(至少是定时周期的 2 到 3 倍)发生改变的数据,然后更新到 ES。优点:简单易懂,占用资源少,不用引入第三方中间件缺少:有时间差应用场景:数据短时间内不同步影响不大、或者数据几乎不发生修改
  2. 双写:写数据库的时候必须也去写 ES;更新删除数据库同理。(事务:建议先保证写入 MySQL 成功,如果 ES 写失败了,可以通过定时任务 + 日志 + 告警进行监测和修复(补偿))
  3. 用 Logstash 数据同步管道(一般要配合 kafka 消息队列 + beates 采集器)

Logstash

传输和处理数据的管道

官方文档:https://www.elastic.co/guide/en/logstash/7.17/first-event.html

快速开启文档:https://www.elastic.co/guide/en/logstash/7.17/running-logstash-windows.html#running-logstash-windows

好处:用起来方便,插件多

缺点:成本更大、一般要配合其他组件使用比如kafka

mytask.conf

1
2
3
4
5
6
7
8
9
10
11
input {
udp {
port => 514
type => "syslog"
}
}

output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}

logstash.bat -f ..\config\syslog.conf // bin 目录下启动命令

修改 logstash config目录下的配置文件的 jdbc 驱动文件位置和绝对路径

增量配置:是不是只查询最新更新的?可以记录上次更新的时间,只查询 > 更新时间的数据

小知识:预编译 SQL 的优点?

  1. 灵活
  2. 模板好懂
  3. 快(有缓存)
  4. 部分防注入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
jdbc {
jdbc_driver_library => "E:\study\ELK\logstash-7.17.18\config\mysql-connector-j-8.0.31.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/lingxi_search"
jdbc_user => "root"
jdbc_password => "123456"
parameters => { "favorite_artist" => "Beethoven" }
use_column_value => true
tracking_column => "updatetime"
tracking_column_type => "timestamp"
schedule => "*/5 * * * *"
statement => "SELECT * from post where updateTime > :sql_last_value"
jdbc_default_timezone => "Asia/Shanghai"
}
}


output {
stdout { codec => rubydebug }
}

sql_last_value 是取 logstash_jdbc_last_run 文件中的值

全量更新写入 ES(注意删除 logstash_jdbc_last_run 文件中的数据):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
jdbc {
jdbc_driver_library => "E:\study\ELK\logstash-7.17.18\config\mysql-connector-j-8.0.31.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/lingxi_search"
jdbc_user => "root"
jdbc_password => "123456"
parameters => { "favorite_artist" => "Beethoven" }
use_column_value => true
tracking_column => "updatetime"
tracking_column_type => "timestamp"
schedule => "*/5 * * * *"
statement => "SELECT * from post where updateTime > :sql_last_value"
jdbc_default_timezone => "Asia/Shanghai"
}
}


output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "http://localhost:9200"
index => "post_v1"
document_id => "%{id}"
data_stream => "true"
data_stream_type => "metrics"
data_stream_dataset => "foo"
data_stream_namespace => "bar"
}
}

注意查询语句要按 updateTime排序,保证最后一条是最大的

statement => “SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc”

同步之后的两个问题:

  1. 字段全部变成小写了
  2. 多了一些我们不想要的字段

可以编写过滤:

1
2
3
4
5
6
7
8
9
10
11
filter {
mutate {
rename => {
"updatetime" => "updateTime"
"userid" => "userId"
"createtime" => "createTime"
"isdelete" => "isDelete"
}
remove_field => ["thumbnum", "favournum"]
}
}

最终版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
jdbc {
jdbc_driver_library => "E:\study\ELK\logstash-7.17.18\config\mysql-connector-j-8.0.31.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/lingxi_search"
jdbc_user => "root"
jdbc_password => "123456"
parameters => { "favorite_artist" => "Beethoven" }
use_column_value => true
tracking_column => "updatetime"
tracking_column_type => "timestamp"
schedule => "*/5 * * * *"
statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"
jdbc_default_timezone => "Asia/Shanghai"
}
}

filter {
mutate {
rename => {
"updatetime" => "updateTime"
"userid" => "userId"
"createtime" => "createTime"
"isdelete" => "isDelete"
}
remove_field => ["thumbnum", "favournum"]
}
}

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "127.0.0.1:9200"
index => "post_v1"
document_id => "%{id}"
}
}

配置 kinbana 可视化看板

  1. 创建索引
  2. 导入数据
  3. 创建索引模式
  4. 选择图标拖拉拽
  5. 保存

订阅数据库流水的同步方式 Canal

https://github.com/alibaba/canal?tab=readme-ov-file

优点:实时同步,同步性非常强

原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理

cannal:帮你监听 binlog,并解析 binlog 为你可以理解的内容

windows系统,找到你本地的mysql安装目录,在根目录下新建my.ini文件:

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

为 canal 添加 MySQL 账户:

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

如果 Java 找不到,修改 startup.bat 文件

压力测试

https://jmeter.apache.org/找到jar包:apache-jmeter--5.5 \apache-jmeter–5.5\bin\ApacheJMeterjar启动配置线程组=>请求头=>默认请求=>单个请求=>响应断言=>聚合报告/结果树

90%分位:99%的用户都在这个响应时间内

吞吐量:每秒处理的请求数 qps

搜索建议

搜索高亮

https://www.elastic.co/guide/en/elasticsearch/reference/7.17/highlighting.html

前端防抖节流

loadash 工具:

节流:每段时间最多执行x次,https://www.lodashjs.com/docs/lodash.throttle

防抖:等待一段时间内没有其他操作了,才执行操作:https://www.lodashjs.com/docs/lodash.debounce