Spark写Phoenix

sparkStreaming写入Phoenix代码以及部署过程中遇到的问题以及解决方案

写入代码

1
2
3
4
5
6
rdd.flatMap(x => x).map(json => {
meetingSignalSchema(JsonNull.handleNull("rowkey", json).toString, JsonNull.handleNull("meeting_id", json).toString, JsonNull.handleNull("event", json).toString, JsonNull.handleNull("enterprise_id", json).toString, JsonNull.handleNull("department_id", json).toString, JsonNull.handleNull("department_name", json).toString, JsonNull.handleNull("enterprise_name", json).toString, JsonNull.handleNull("service_type", json).toString, JsonNull.handleNull("scheduled_id", json).toString, JsonNull.handleNull("scheduled_title", json).toString, JsonNull.handleNull("meeting_type", json).toString, JsonNull.handleNull("profile_type", json).toString, JsonNull.handleNull("charge_uri", json).toString, JsonNull.handleNull("virtual_number", json).toString, JsonNull.handleNull("create_time", json).toString, JsonNull.handleNull("charge_entity", json).toString, JsonNull.handleNull("first_caller", json).toString, JsonNull.handleNull("first_callee", json).toString, JsonNull.handleNull("time_begin", json).toString, JsonNull.handleNull("time_end", json).toString, JsonNull.handleNull("duration", json).toString, JsonNull.handleNull("display_name", json).toString, JsonNull.handleNull("call_number", json).toString, JsonNull.handleNull("charge_user_count", json).toString, JsonNull.handleNull("user_count_peak", json).toString, JsonNull.handleNull("session_free", json).toString, JsonNull.handleNull("session_free_count", json).toString, JsonNull.handleNull("session_max_count", json).toString, JsonNull.handleNull("sum_user_count", json).toString, "1", JsonNull.handleNull("meet_level", json).toString, JsonNull.handleNull("group_id", json).toString, JsonNull.handleNull("domain_id", json).toString, JsonNull.handleNull("visible_department", json).toString, JsonNull.handleNull("mp_cloud_conf_no", json).toString, JsonNull.handleNull("zone_num", json).toString)
})
.saveToPhoenix((environment + confTable).toUpperCase,
Seq("id", "meeting_id", "event", "enterprise_id", "department_id", "department_name", "enterprise_name", "service_type","scheduled_id", "scheduled_title", "meeting_type", "profile_type", "charge_uri", "virtual_number","create_time", "charge_entity", "first_caller", "first_callee", "time_begin", "time_end", "duration","display_name", "call_number", "charge_user_count", "user_count_peak", "session_free", "session_free_count","session_max_count", "sum_user_count", "suport_call_quality_statis", "meet_level", "group_id", "domain_id", "visible_department", "mp_cloud_conf_no", "zone_num"),
zkUrl = Some(hbaseAddress+":2181"))

可以直接通过dataframe写Phoenix,但是遇到Phoenix列名是小写,但是saveToPhoenix会将dataframe的列名映射为大写导致列名不匹配的异常


问题

以下两个问题均是在单独配置Phoenix的高版本CDH集群中出现的,在低版本官方支持集成的版本中未出现问题

问题1
1
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.phoenix.mapreduce.PhoenixOutputFormat not found

image.png

解决方法:在spark的classpath里面即/etc/conf/spark/classpath.txt中增加Phoenix library jar

1
2
3
4
5
6
7
8
9
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-4.14.0-cdh5.14.2-client.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-4.14.0-cdh5.14.2-queryserver.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-4.14.0-cdh5.14.2-server.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-4.14.0-cdh5.14.2-thin-client.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-core-4.14.0-cdh5.14.2-sources.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-core-4.14.0-cdh5.14.2.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-spark-4.14.0-cdh5.14.2-javadoc.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-spark-4.14.0-cdh5.14.2-sources.jar
/opt/apache-phoenix-4.14.0-cdh5.14.2-bin/phoenix-spark-4.14.0-cdh5.14.2.jar

问题2
1
java.sql.SQLException: ERROR 726 (43M10):  Inconsistent namespace mapping properties. Cannot initiate connection as SYSTEM:CATALOG is found but client does not have phoenix.schema.isNamespaceMappingEnabled enabled

image.png

解决方法:提交命令增加 –files /etc/hbase/conf/hbase-site.xml

-------------本文结束感谢您的阅读-------------