flume同时使用KafkaSource、KafkaSink导致的问题

KafkaSource 配置topic:topic1
KafkaSink 配置topic:topic2
从topic1拉取数据,经过简单处理后发到topic2
但是你会发现flume一直在循环读写topic1
原因就是KafkaSink中的这段代码:

1
2
3
if (eventTopic == null) {
eventTopic = topic;
}

首先从headers中获取TOPIC_HEADER(topic),然后优先使用。然而在KafkaSource中则会将topic1 PUT到Header中,所以导致循环读写topic1。

如何解决呢?

你可以重新KafkaSink,改掉上面那段代码。

或者配置一个拦截器,将KafkaSource 写到Header中的topic给替换掉:

1
2
3
4
agent.sources.so1.interceptors.i1.type = static
agent.sources.so1.interceptors.i1.key = topic
agent.sources.so1.interceptors.i1.preserveExisting = false
agent.sources.so1.interceptors.i1.value = topic2

《Java性能-权威指南》 笔记

在做性能测试时,需要确保输入参数是确定的,否则处理参数还会带来一定的性能损耗。

JVM主要接受两类标志(少数例外):布尔标志和附带参数标志。

  • 布尔标志语法:-XX:+FlagName表示开启,-XX:-FlagName表示关闭。
  • 附带参数标志语法:-XX:FlagName=something。例如-XX:NewRadio=N
阅读更多

HashMap和ConcurrentHashMap中的initialCapacity

HashMap 默认有一个初始大小(initialCapacity),这个初始大小是16。
在各种开发规范手册中都可以看到会建议设置这个大小。
例如:new HashMap(3);
那么我们设置了初始容量为3,HashMap的容量真的会初始化为3了吗?
答案是否定的。
为了提高Hash效率,Java中会重新计算这个值,获得一个>3的最小2的N次方,大于3的最小2的N次方就是4(关于为什么请参照
https://www.zhihu.com/question/28562088/answer/111668116)。
这个4是怎么来的呢?

阅读更多

TypeSafe Config

Typesafe的Config库,纯Java写成、零外部依赖、代码精简、功能灵活、API友好。支持Java properties、JSON、JSON超集格式HOCON以及环境变量。它也是Akka的配置管理库.

阅读更多

kafka那些坑

千万别用高版本的kafka client 去连低版本的kafka server。
否则回报一系列的(n)io异常,比如下面这个异常

  java.nio.BufferUnderflowException

不丢消息:producer有个ack参数,有三个值,分别代表:不在乎是否写入成功、写入leader成功、写入leader和所有reclpica成功;要求非常可靠的话可以牺牲性能设置成最后一种。
不重复发送:正常发都不会重复,只可能丢,看你这边怎么容错重发了,参考上一条。
消息只读一次:同样,正常读不会重复,如果在上一次读的过程中发生了异常,消息可能被消费,但是offset没有及时commit;这本身是两步,存在中间crash的风险