博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume 如何自定义 Mysql Sink?
阅读量:4073 次
发布时间:2019-05-25

本文共 5053 字,大约阅读时间需要 16 分钟。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见

正文

场景描述

官方提供的sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些sink。如:需要把接受到的数据按照规则进行过滤之后写入到某张mysql表中,所以此时需要我们自己实现MySQLSink。

自定义 Mysql Sink 步骤

  • 1、根据官方说明自定义 MysqlSink 需要继承 AbstractSink 类并实现 Configurable

  • 2、实现对应的方法

    • configure(Context context)
      • 初始化context
    • start()
      • 启动准备操作
    • process()
      • 从channel获取数据,然后解析之后,保存在mysql表中
    • stop()
      • 关闭相关资源

实践

  1. 创建 mysql 数据库以及 mysql 数据库表
--创建一个数据库CREATEDATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;--创建一个表,用户保存拉取目标表位置的信息CREATE TABLE mysqlsource.flume2mysql(    id         int(11) NOT NULL AUTO_INCREMENT,    createTime varchar(64)  NOT NULL,    content    varchar(255) NOT NULL,    PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1. 构建maven工程,添加依赖
1.9.0
8.0.24
org.apache.flume
flume-ng-core
${flume.version}
mysql
mysql-connector-java
${mysql.version}
org.apache.commons
commons-lang3
3.12.0
  1. 定义 MysqlSink 类
package com.shockang.study.bigdata.flume;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.SQLException;import java.text.SimpleDateFormat;import java.util.Date;/** * 自定义MysqlSink */public class MysqlSink extends AbstractSink implements Configurable {
private String mysqlurl = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process() {
Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try {
Event event = ch.take(); if (event != null) {
//获取body中的数据 String body = new String(event.getBody(), "UTF-8"); //如果日志中有以下关键字的不需要保存,过滤掉 if (body.contains("delete") || body.contains("drop") || body.contains("alert")) {
status = Status.BACKOFF; } else {
//存入Mysql SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)"); stmt.setString(1, createtime); stmt.setString(2, body); stmt.execute(); stmt.close(); status = Status.READY; } } else {
status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) {
txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally {
txn.close(); } return status; } /** * 获取配置文件中指定的参数 * * @param context */ @Override public void configure(Context context) {
mysqlurl = context.getString("mysqlurl"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() {
try {
//初始化数据库连接 con = DriverManager.getConnection(mysqlurl, username, password); super.start(); System.out.println("finish start"); } catch (Exception ex) {
ex.printStackTrace(); } } @Override public synchronized void stop() {
try {
con.close(); } catch (SQLException e) {
e.printStackTrace(); } super.stop(); }}
  1. 测试

① 程序打成jar包,上传jar包到flume的lib目录下

② 配置文件准备

vim mysqlsink.conf
a1.sources = r1a1.sinks = k1a1.channels = c1#配置sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/bigdata/flumeData/data.loga1.sources.r1.channels = c1#配置channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#配置sinka1.sinks.k1.channel = c1a1.sinks.k1.type = com.shockang.study.bigdata.flume.MysqlSinka1.sinks.k1.mysqlurl=jdbc:mysql://node1:3306/mysqlsource?useSSL=falsea1.sinks.k1.username=roota1.sinks.k1.password=123456a1.sinks.k1.tablename=flume2mysql

③ 启动flume配置

flume-ng agent -n a1 -c /opt/bigdata/flume/myconf -f /opt/bigdata/flume/myconf/mysqlsink.conf -Dflume.root.logger=info,console

④ 最后向文件中添加数据,观察mysql表中的数据

转载地址:http://skgji.baihongyu.com/

你可能感兴趣的文章
Hibernate中复合主键配置
查看>>
在Navicat for MySQL中修改表的编码格式
查看>>
Django+layui 实现多文件上传,文件下载
查看>>
dubbo 入坑笔记之命名空间错误
查看>>
JSP中实现关键字高亮显示
查看>>
form表单嵌套提交
查看>>
Error:(3, 32) java: 程序包org.springframework.boot不存在
查看>>
用python画一只可爱的布朗熊
查看>>
【spring】spring boot多数据源配置(方式二)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(一)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(二)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(三)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(四)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(五)
查看>>
【RPC】一步一步实现基于netty+zookeeper的RPC框架(六)
查看>>
生成支持分布式部署的唯一id代码实现
查看>>
支持分表的ORM框架实现
查看>>
jquery easyui datagrid subgrid edit
查看>>
java集合(ArrayList、vector、HashMap、HashTable)源码剖析
查看>>
补充另一版ArrayList的初始化过程
查看>>