博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka消费数据存入elasticsearch代码示例
阅读量:4310 次
发布时间:2019-06-06

本文共 9057 字,大约阅读时间需要 30 分钟。

package com.unimas.test; import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.I0Itec.zkclient.ZkClient;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.utils.ZkUtils;public class KafkaWarningAction {       private static final String topic = "avro-log-bj-jingnei1-dev-proxy-websense";private static final String group_id = "kafkawarning01";      private static final Integer threads = 5;      private static ConsumerConnector consumer;      private static ExecutorService executor; public static void main(String[] args) {
//创建消费者消费数据 Properties props = new Properties(); props.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181"); props.put("group.id", group_id); props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); consumer =Consumer.createJavaConsumerConnector(config); Map
topicCountMap = new HashMap
(); topicCountMap.put(topic, threads); Map
>> consumerMap = consumer.createMessageStreams(topicCountMap); List
> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threads);//创建线程池 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { shutdown(); } }); //写入ES EsProducer esProducer = new EsProducer(); esProducer.WriteToEs(streams,executor); }public static void shutdown(){ if(consumer!=null){ consumer.shutdown(); consumer = null; } if (executor != null){ executor.shutdown(); executor = null; } } }
package com.unimas.test;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;import java.util.concurrent.ExecutorService;import org.apache.avro.Schema;import org.apache.avro.generic.GenericDatumReader;import org.apache.avro.generic.GenericDatumWriter;import org.apache.avro.generic.GenericRecord;import org.apache.avro.io.DatumReader;import org.apache.avro.io.Decoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.io.EncoderFactory;import org.apache.avro.io.JsonEncoder;import org.elasticsearch.action.bulk.BulkItemResponse;import org.elasticsearch.action.bulk.BulkRequestBuilder;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.Client;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class EsProducer {    SimpleDateFormat _format = new SimpleDateFormat("yyyyMMdd");    long starttime = System.currentTimeMillis();    public void WriteToEs(List
> streams, ExecutorService executor) { for (final KafkaStream
kafkaStream : streams) { executor.submit(new Runnable() { @Override public void run() { Client client = Utils.getClient(); BulkRequestBuilder bulk = client.prepareBulk(); String typeName = "type-warning"; Schema _schema = null; String schemaName = "websense.avsc"; ConsumerIterator
it = kafkaStream.iterator(); while(it.hasNext()){ byte[] message = it.next().message(); String indexName = "warning-message-"; try { String schemaContesnt =Utils.readFileContent(schemaName); _schema = new Schema.Parser().parse(schemaContesnt); DatumReader
reader1 = new GenericDatumReader
(_schema); Decoder decoder1 = DecoderFactory.get().binaryDecoder(message, null); GenericRecord result = (GenericRecord) reader1.read(null, decoder1); GenericDatumWriter
w = new GenericDatumWriter(_schema); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(_schema, outputStream); w.write(result, jsonEncoder); jsonEncoder.flush(); String timestamp = _format.format(new Date(Long.parseLong( result.get("time_received").toString()))); indexName = indexName +timestamp; IndexRequest index = new IndexRequest(); index.index(indexName); index.type(typeName); index.source(new String(outputStream.toByteArray())); bulk.add(index); if (bulk.numberOfActions() >= 20000) { BulkResponse response = bulk.execute().actionGet(); long l1 = 0L; //long currentTime = System.currentTimeMillis(); //if(currentTime>=starttime+120*1000){ // System.out.println("stop........");      定时停止任务代码 // KafkaWarningAction.shutdown(); // break; //} if (response.hasFailures()) { BulkItemResponse[] arrayOfBulkItemResponse1 = response.getItems(); for (BulkItemResponse localBulkItemResponse : arrayOfBulkItemResponse1) { if (localBulkItemResponse.isFailed()) { l1 += 1L; System.out.println( "bulk failure message " + localBulkItemResponse.getFailureMessage()); } } } System.out.println("Inner Failure of article number is " + l1); bulk = client.prepareBulk(); } } catch (IOException e) { e.printStackTrace(); } } if (bulk.numberOfActions() > 0) { BulkResponse response = bulk.execute().actionGet(); long l1 = 0L; if (response.hasFailures()) { BulkItemResponse[] arrayOfBulkItemResponse1 = response.getItems(); for (BulkItemResponse localBulkItemResponse : arrayOfBulkItemResponse1) { if (localBulkItemResponse.isFailed()) { l1 += 1L; System.out.println( "bulk failure message " + localBulkItemResponse.getFailureMessage()); } } } System.out.println("Failure of article number is " + l1); } } }); } } }
package com.unimas.test;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.io.IOException;import java.net.InetAddress;import org.elasticsearch.client.Client;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.InetSocketTransportAddress;public class Utils {            static TransportClient client = null;                /*elasticSearch 1.5.0*///        static {//            String clusterName = "SOCBigData";//            String ip = "11.11.184.180";//            Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", true)//                    .put("client.transport.nodes_sampler_interval", 60)//                    .put("cluster.name",clusterName).build();//            client = new TransportClient(settings);//            client.addTransportAddress(new InetSocketTransportAddress(ip, 9300));//            client.connectedNodes();//        }                /*elasticSearch 2.0.0*/        static {            //Config config = Config.getInstance();//            String clusterName = config.getESClusterName();//            String ip = config.getESIp();            String clusterName = "udb_soc";            String ip = "11.11.11.11";            Settings settings = Settings.settingsBuilder().put("client.transport.sniff", false)                    .put("client.transport.nodes_sampler_interval", "60s")                    .put("cluster.name",clusterName).build();            client = TransportClient.builder().settings(settings).build();            for (String host : ip.split(",")) {                try {                    client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));                } catch (Exception e) {                    e.printStackTrace();                }            }            client.connectedNodes();        }                public final static Client getClient() {            return client;        }                            // 参数string为你的文件名        public static String readFileContent(String fileName) throws IOException {            String path = "E:\\workspace\\DataImportV2.1\\target";            File file = new File(path+"\\"+fileName);            BufferedReader bf = new BufferedReader(new FileReader(file));            String content = "";            StringBuilder sb = new StringBuilder();            while (content != null) {                content = bf.readLine();                if (content == null) {                    break;                }                sb.append(content.trim());            }                        bf.close();            return sb.toString();        }}

 

转载于:https://www.cnblogs.com/ygwx/p/5337835.html

你可能感兴趣的文章
C#修改JPG图片EXIF信息中的GPS信息
查看>>
从零开始的Docker ELK+Filebeat 6.4.0日志管理
查看>>
How it works(1) winston3源码阅读(A)
查看>>
How it works(2) autocannon源码阅读(A)
查看>>
How it works(3) Tilestrata源码阅读(A)
查看>>
How it works(12) Tileserver-GL源码阅读(A) 服务的初始化
查看>>
uni-app 全局变量的几种实现方式
查看>>
echarts 为例讲解 uni-app 如何引用 npm 第三方库
查看>>
uni-app跨页面、跨组件通讯
查看>>
springmvc-helloworld(idea)
查看>>
JDK下载(百度网盘)
查看>>
idea用得溜,代码才能码得快
查看>>
一篇掌握python魔法方法详解
查看>>
数据结构和算法5-非线性-树
查看>>
数据结构和算法6-非线性-图
查看>>
数据结构和算法7-搜索
查看>>
数据结构和算法8-排序
查看>>
windows缺少dll解决办法
查看>>
JPA多条件动态查询
查看>>
JPA自定义sql
查看>>