本文共 5053 字,大约阅读时间需要 16 分钟。
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见
官方提供的sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些sink。如:需要把接受到的数据按照规则进行过滤之后写入到某张mysql表中,所以此时需要我们自己实现MySQLSink。
1、根据官方说明自定义 MysqlSink 需要继承 AbstractSink 类并实现 Configurable
2、实现对应的方法
--创建一个数据库CREATEDATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;--创建一个表,用户保存拉取目标表位置的信息CREATE TABLE mysqlsource.flume2mysql( id int(11) NOT NULL AUTO_INCREMENT, createTime varchar(64) NOT NULL, content varchar(255) NOT NULL, PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1.9.0 8.0.24 org.apache.flume flume-ng-core ${flume.version} mysql mysql-connector-java ${mysql.version} org.apache.commons commons-lang3 3.12.0
package com.shockang.study.bigdata.flume;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.SQLException;import java.text.SimpleDateFormat;import java.util.Date;/** * 自定义MysqlSink */public class MysqlSink extends AbstractSink implements Configurable { private String mysqlurl = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process() { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if (event != null) { //获取body中的数据 String body = new String(event.getBody(), "UTF-8"); //如果日志中有以下关键字的不需要保存,过滤掉 if (body.contains("delete") || body.contains("drop") || body.contains("alert")) { status = Status.BACKOFF; } else { //存入Mysql SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)"); stmt.setString(1, createtime); stmt.setString(2, body); stmt.execute(); stmt.close(); status = Status.READY; } } else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) { txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally { txn.close(); } return status; } /** * 获取配置文件中指定的参数 * * @param context */ @Override public void configure(Context context) { mysqlurl = context.getString("mysqlurl"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() { try { //初始化数据库连接 con = DriverManager.getConnection(mysqlurl, username, password); super.start(); System.out.println("finish start"); } catch (Exception ex) { ex.printStackTrace(); } } @Override public synchronized void stop() { try { con.close(); } catch (SQLException e) { e.printStackTrace(); } super.stop(); }}
① 程序打成jar包,上传jar包到flume的lib目录下
② 配置文件准备
vim mysqlsink.conf
a1.sources = r1a1.sinks = k1a1.channels = c1#配置sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/bigdata/flumeData/data.loga1.sources.r1.channels = c1#配置channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#配置sinka1.sinks.k1.channel = c1a1.sinks.k1.type = com.shockang.study.bigdata.flume.MysqlSinka1.sinks.k1.mysqlurl=jdbc:mysql://node1:3306/mysqlsource?useSSL=falsea1.sinks.k1.username=roota1.sinks.k1.password=123456a1.sinks.k1.tablename=flume2mysql
③ 启动flume配置
flume-ng agent -n a1 -c /opt/bigdata/flume/myconf -f /opt/bigdata/flume/myconf/mysqlsink.conf -Dflume.root.logger=info,console
④ 最后向文件中添加数据,观察mysql表中的数据
转载地址:http://skgji.baihongyu.com/