123 | root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 | package idoall.cloud.flume.sink; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class KafkaSink extends AbstractSink implements Configurable {
private static final Log logger = LogFactory.getLog(KafkaSink. class );
private String topic;
private Producer<String, String> producer;
public void configure(Context context) {
topic = "idoall_testTopic" ;
Properties props = new Properties();
props.setProperty( "metadata.broker.list" , "m1:9092,m2:9092,s1:9092,s2:9092" );
props.setProperty( "serializer.class" , "kafka.serializer.StringEncoder" );
props.put( "partitioner.class" , "idoall.cloud.kafka.Partitionertest" );
props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181/kafka" );
props.setProperty( "num.partitions" , "4" ); //
props.put( "request.required.acks" , "1" );
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
logger.info( "KafkaSink初始化完成." );
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if (e == null ) {
tx.rollback();
return Status.BACKOFF;
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
producer.send(data);
logger.info( "flume向kafka发送消息:" + new String(e.getBody()));
tx.commit();
return Status.READY;
} catch (Exception e) {
logger.error( "Flume KafkaSinkException:" , e);
tx.rollback();
return Status.BACKOFF;
} finally {
tx.close();
}
} } |
12345678910111213141516171819202122 | root@m1: /home/hadoop/flume-1 .5.0-bin # vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1. type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1. type = idoall.cloud.flume.sink.KafkaSink # Use a channel which buffers events in memory a1.channels.c1. type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
1 | root@m1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties & |
1234567891011 | root@m1: /home/hadoop # /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console #下面只截取部分日志信息 14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成. 14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 14 /08/19 11:36:34 INFO node.application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 14 /08/19 11:36:34 INFO node.Application: Starting Channel c1 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean. 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started 14 /08/19 11:36:34 INFO node.Application: Starting Sink k1 14 /08/19 11:36:34 INFO node.Application: Starting Source r1 14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting... |
1 | root@m1: /home/hadoop # echo "hello idoall.org syslog" | nc localhost 5140 |
123456789101112131415 | 14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成. 14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 14 /08/19 11:36:34 INFO node.Application: Starting Channel c1 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean. 14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started 14 /08/19 11:36:34 INFO node.Application: Starting Sink k1 14 /08/19 11:36:34 INFO node.Application: Starting Source r1 14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting... 14 /08/19 11:38:05 WARN source .SyslogUtils: Event created from Invalid Syslog data. 14 /08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id :3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic) 14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing 14 /08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092 14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing 14 /08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog |
1234567891011 | root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-Operation (NOP) logger implementation SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details. [2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager) [2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log) [2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2 .9.2-0.8.1.1 /kafka-logs with properties {segment.index.bytes -> 10485760, file .delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition) [2014-08-11 14:22:12,375] INFO Closing socket connection to /192 .168.1.50. (kafka.network.Processor) hello idoall.org syslog |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 | <? xml version = "1.0" encoding = "utf-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion >4.0.0</ modelVersion >
< groupId >idoall.cloud</ groupId >
< artifactId >idoall.cloud</ artifactId >
< version >0.0.1-SNAPSHOT</ version >
< packaging >jar</ packaging >
< name >idoall.cloud</ name >
< url >http://maven.apache.org</ url >
< properties >
< project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
</ properties >
< repositories >
< repository >
< id >github-releases</ id >
< url >http://oss.sonatype.org/content/repositories/github-releases/</ url >
</ repository >
< repository >
< id >clojars.org</ id >
< url >http://clojars.org/repo</ url >
</ repository >
</ repositories >
< dependencies >
< dependency >
< groupId >junit</ groupId >
< artifactId >junit</ artifactId >
< version >4.11</ version >
< scope >test</ scope >
</ dependency >
< dependency >
< groupId >com.sksamuel.kafka</ groupId >
< artifactId >kafka_2.10</ artifactId >
< version >0.8.0-beta1</ version >
</ dependency >
< dependency >
< groupId >log4j</ groupId >
< artifactId >log4j</ artifactId >
< version >1.2.14</ version >
</ dependency >
< dependency >
< groupId >storm</ groupId >
< artifactId >storm</ artifactId >
< version >0.9.0.1</ version >
<!-- keep storm out of the jar-with-dependencies -->
< scope >provided</ scope >
</ dependency >
< dependency >
< groupId >commons-collections</ grou |