package com.unimas.test; import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.Set;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.I0Itec.zkclient.ZkClient;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.utils.ZkUtils;public class KafkaWarningAction { private static final String topic = "avro-log-bj-jingnei1-dev-proxy-websense";private static final String group_id = "kafkawarning01"; private static final Integer threads = 5; private static ConsumerConnector consumer; private static ExecutorService executor; public static void main(String[] args) { //创建消费者消费数据 Properties props = new Properties(); props.put("zookeeper.connect", "host1:2181,host2:2181,host3:2181"); props.put("group.id", group_id); props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); consumer =Consumer.createJavaConsumerConnector(config); MaptopicCountMap = new HashMap (); topicCountMap.put(topic, threads); Map >> consumerMap = consumer.createMessageStreams(topicCountMap); List > streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(threads);//创建线程池 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { shutdown(); } }); //写入ES EsProducer esProducer = new EsProducer(); esProducer.WriteToEs(streams,executor); }public static void shutdown(){ if(consumer!=null){ consumer.shutdown(); consumer = null; } if (executor != null){ executor.shutdown(); executor = null; } } }
package com.unimas.test;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;import java.util.concurrent.ExecutorService;import org.apache.avro.Schema;import org.apache.avro.generic.GenericDatumReader;import org.apache.avro.generic.GenericDatumWriter;import org.apache.avro.generic.GenericRecord;import org.apache.avro.io.DatumReader;import org.apache.avro.io.Decoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.io.EncoderFactory;import org.apache.avro.io.JsonEncoder;import org.elasticsearch.action.bulk.BulkItemResponse;import org.elasticsearch.action.bulk.BulkRequestBuilder;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.Client;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class EsProducer { SimpleDateFormat _format = new SimpleDateFormat("yyyyMMdd"); long starttime = System.currentTimeMillis(); public void WriteToEs(List> streams, ExecutorService executor) { for (final KafkaStream kafkaStream : streams) { executor.submit(new Runnable() { @Override public void run() { Client client = Utils.getClient(); BulkRequestBuilder bulk = client.prepareBulk(); String typeName = "type-warning"; Schema _schema = null; String schemaName = "websense.avsc"; ConsumerIterator it = kafkaStream.iterator(); while(it.hasNext()){ byte[] message = it.next().message(); String indexName = "warning-message-"; try { String schemaContesnt =Utils.readFileContent(schemaName); _schema = new Schema.Parser().parse(schemaContesnt); DatumReader reader1 = new GenericDatumReader (_schema); Decoder decoder1 = DecoderFactory.get().binaryDecoder(message, null); GenericRecord result = (GenericRecord) reader1.read(null, decoder1); GenericDatumWriter
package com.unimas.test;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;import java.io.IOException;import java.net.InetAddress;import org.elasticsearch.client.Client;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.InetSocketTransportAddress;public class Utils { static TransportClient client = null; /*elasticSearch 1.5.0*/// static {// String clusterName = "SOCBigData";// String ip = "11.11.184.180";// Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", true)// .put("client.transport.nodes_sampler_interval", 60)// .put("cluster.name",clusterName).build();// client = new TransportClient(settings);// client.addTransportAddress(new InetSocketTransportAddress(ip, 9300));// client.connectedNodes();// } /*elasticSearch 2.0.0*/ static { //Config config = Config.getInstance();// String clusterName = config.getESClusterName();// String ip = config.getESIp(); String clusterName = "udb_soc"; String ip = "11.11.11.11"; Settings settings = Settings.settingsBuilder().put("client.transport.sniff", false) .put("client.transport.nodes_sampler_interval", "60s") .put("cluster.name",clusterName).build(); client = TransportClient.builder().settings(settings).build(); for (String host : ip.split(",")) { try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300)); } catch (Exception e) { e.printStackTrace(); } } client.connectedNodes(); } public final static Client getClient() { return client; } // 参数string为你的文件名 public static String readFileContent(String fileName) throws IOException { String path = "E:\\workspace\\DataImportV2.1\\target"; File file = new File(path+"\\"+fileName); BufferedReader bf = new BufferedReader(new FileReader(file)); String content = ""; StringBuilder sb = new StringBuilder(); while (content != null) { content = bf.readLine(); if (content == null) { break; } sb.append(content.trim()); } bf.close(); return sb.toString(); }}