股票学习网

股票入门基础知识和炒股入门知识 - - 股票学习网!

土地指标交易(td指标源码大全)

2023-04-15 07:58分类:公司分析 阅读:

各地方要在区域市场一体化进程中积极探索,总结经验,结合本地的体制改革和统一市场建设推进情况,精选改革举措,形成典型案例,打造统一市场建设的工作亮点

各地方要着力破除要素市场分割和多轨运行,促进创新要素有序流动和合理配置,支撑地方产业发展

文 | 刘志成 欧阳慧

建设全国统一大市场是构建新发展格局的基础支撑和内在要求,有利于增强国内大循环内生动力和可靠性,有利于提升国际循环质量和水平。深入贯彻中央决策部署,主动服务和融入全国统一大市场是各级地方政府的职责使命。各地必须深刻认识到,不融入全国统一大市场就难以融入新发展格局,不融入新发展格局就无法实现高质量发展。当前和今后一个时期,各地要进一步凝聚共识,担当作为,在建设全国统一大市场中探索有效路径,构建地区经济发展新优势。

地方构建经济发展新优势的必由之路

建设全国统一大市场关键在于推动各地方市场、各专业市场形成一个相互依存、相互协调、相互促进的有机整体。全国统一大市场是以深化分工和充分竞争为基础,在全国范围内实现规则统一、区域协同,充分发挥我国超大规模市场优势的市场。建设全国统一大市场要求各地方摒弃各自为政、画地为牢的做法,破除地方保护和行政性垄断。只有加快融入全国统一大市场,才能使各地区市场以及不同地方各具优势和特色的专业市场形成一个有机衔接、相互协同的整体,不断拓展我国市场的广度、宽度和深度,不断巩固和扩展我国超大规模市场优势,进一步增强我国经济发展的韧性。加快融入全国统一大市场是地方构建经济发展新优势的必由之路。

加快建设全国统一大市场要尊重市场经济发展规律,充分发挥地方的主动性和积极性。建设全国统一大市场要求各地切实把思想和行动统一到党中央决策部署上来。一方面,坚持全国一盘棋,完善工作机制。建立健全部门协调机制,加大统筹协调力度,及时督促检查。尽快研究制定全国统一大市场建设标准指南,及时发布不当干预全国统一大市场建设问题清单。另一方面,充分发挥地方政府在统一大市场建设中的主动性和积极性。全国统一大市场是各地方市场构成的有机体,解决好妨碍统一市场建设的不当市场干预和不当竞争行为问题,需要地方政府的积极参与和大力支持。提升地方政府的积极性,既要加强对地方的正面引导,积极总结经验,形成正面案例,在更大范围内复制推广,又要完善激励约束机制,通过健全市场机制,服务地方发展,帮助地方在统一大市场建设中收获改革红利。

加快融入全国统一大市场是各地重塑区域发展新优势,打造区域发展新动能的历史性机遇。立足新发展阶段,各地方要充分认识到建设全国统一大市场不仅是塑造我国国际合作和竞争新优势的必然要求,也是地方重塑区域发展优势、增强区域发展动力的重大机遇。在全球化时代,各地只有在更大范围整合资源,才能在国内的区域竞争和国际竞争中形成优势占据先机。各地要强化战略思维、系统思维和法治思维,坚持区域协作互利共赢,实现区域间产业协同发展、优势资源共享,切实防止以“内循环”名义搞地区封锁的现象出现。只有在全国统一大市场建设的大局中加强区域协作、优化营商环境,才能够在更大范围内畅通要素流动,打造竞争优势,不断增强本地区竞争力和吸引力,更好发挥比较优势,激发发展动能,彰显地方特色。

把握中央精神,加强系统谋划

地方融入全国统一大市场,应把发展的重点从比拼优惠政策补贴力度、搞“政策洼地”向比拼公共服务供给质量、创“改革高地”转变,这是发展思路和工作方法的重大变革。

一是准确把握中央文件意图,深刻领会中央政策精神。中共中央、国务院印发的《关于加快建设全国统一大市场的意见》(以下简称《意见》)是建设全国统一大市场的纲领性文件,要完整准确全面理解《意见》的理论逻辑、工作方法和工作思路。建设全国统一大市场既是发展问题,又是改革问题,必须以攻坚克难的精神推动深层次改革。建设全国统一大市场应以市场经济而不是计划经济的思路推进,坚持市场化、法治化原则,充分发挥市场在资源配置中的决定性作用,更好发挥政府作用。建设全国统一大市场要坚持推动制度型开放,不是搞自封闭。

二是坚持高位推动,确保各项具体工作向纵深推进。建设全国统一大市场是中央的一项重大决策部署,社会各界对这项工作高度关注,也充满期待。无论是从贯彻落实中央决策部署的工作要求看,还是从主动回应人民群众关切的工作需要看,各地都应高度重视全国统一大市场建设。要坚持高位推动,把服务和融入全国统一大市场建设作为党委和政府的一项重要工作来抓;加快建立工作机制,完善工作分工,明确重点任务。重点是加快转变政府职能,把市场基础制度落实、基础设施互联互通、要素顺畅流通高效配置、市场监管协同配合统一执法等工作落到实处,为地方经济发展增添新动能。

三是坚持立破结合,在改革创新和完善机制上见真章。当前,阻碍全国统一大市场建设步伐的因素中,既有认识上的不足,也有体制上的障碍,须坚持用改革创新的办法解决市场建设中的难题。在制度上、政策上、思想观念上加强正面引导,加强市场领域的建章立制工作。深入推进省以下财政体制改革,完善政绩评价考核体系;深化要素市场化改革,推动要素市场的前沿探索和突破;持续推进“放管服”改革,进一步放宽市场准入,不断加强事中事后监管。冲破思想观念束缚,突破利益固化藩篱,通过改革逐步铲除市场分割、地区封锁和行业垄断滋生的土壤,在统一市场建设中深化分工,找准定位,提升资源配置效率。

四是优先开展区域协作,积极推进区域市场一体化。就地方而言,推进区域一体化也是在为建设统一大市场作贡献。优先推进区域合作,一方面可以提升区域市场一体化水平,区域互联互通水平的提升、区域市场监管的协同、区域要素市场的联接,本身就是对全国统一大市场建设的重大贡献;另一方面可以通过建立健全区域合作机制,形成典型示范,及时总结经验并向其他地区复制推广,引领全国统一大市场建设。各地方要在区域市场一体化进程中积极探索,总结经验,结合本地的体制改革和统一市场建设推进情况,精选改革举措,形成典型案例,打造统一市场建设的工作亮点。

观众在国家会议中心的中国服务贸易成就展专区内参观(2022年9月1日摄) 李鑫摄/本刊

着重把握六大工作重点

服务和融入全国统一大市场是一项涉及面广、关联度高、创新性强的工作,应加强系统谋划,把握工作重点,在建章立制和破除壁垒等各个方面取得实效,推动地方加快融入新发展格局,服务地方高质量发展。

一是围绕破除统一市场建设的制度障碍,在制度政策对接上取得新突破。落实好产权保护、市场准入、公平竞争和信用监管方面的政策。这既是完善市场基础制度的要求,也是地方各职能部门的具体工作。产权保护涉及到多部门行政行为,也涉及到具体的产权案件;市场准入和信用监管涉及到具体的行政审批行为和对市场主体的监管;公平竞争是在政府采购、招投标等各个领域需要保障的一项原则。各地方要按照《意见》的要求加强与中央确定的各项制度政策的对接,并及时开展自查清理,尽快废止妨碍全国统一大市场建设的规定和做法。

二是围绕构建高效联通的市场基础设施,在现代流通体系建设上取得新突破。建设现代流通体系是构建新发展格局的一项重要战略任务,也是建设全国统一大市场的一项基础性工作。当前,我国正在加快建设内畅外联的现代流通网络,《意见》对加强交通、能源、信息等基础设施和市场交易平台建设作出了全面部署。各地方要结合自身发展实际,把握统一大市场建设下基础设施软硬件建设的重大机遇,把握国务院一揽子稳增长稳投资政策带来的新机遇。同时,根据本地工作推进情况和国家的政策导向、投资重点提前谋划加快推进一批投资项目,加强本地区基础设施建设,推动本地区融入全国基础设施网络。

三是围绕推动产业分工优化,在地区间产业转移协调合作上取得新突破。在全国统一大市场之下,我国的产业分工将进一步深化,产业布局将持续调整优化,产业升级和产业转移也会持续推进。在此过程中,各地方要结合本地区产业发展现状和资源禀赋特征,加强地区间产业转移项目协调合作,推动本地区比较优势转化为竞争优势。在中央的统一领导下,积极参与产业转移重大问题协调解决机制。发达地区要推动劳动、资源密集型产业向中西部转移,加快发展高端化、智能化、绿色化产业集群。后发地区积极对接发达地区,承接产业转移。地区间产业有序转移,将为全国产业合理布局、分工深化提供有力支撑。

四是围绕在更大范围优化配置资源要素,推动要素市场化改革取得新突破。建设全国统一大市场对深化要素市场化配置改革、打造统一的要素和资源市场、实现要素统一高效配置作出了部署。各地方要着力破除要素市场分割和多轨运行,促进创新要素有序流动和合理配置,支撑地方产业发展。具备条件的地方可以积极参与要素市场化配置综合改革试点,在增量和存量建设用地统筹、特定土地指标交易、人口迁移和落户财政配套、地方性技术交易市场互联互通、数据要素市场探索等方面加快改革步伐,带动本地区经济高质量发展。

五是围绕提升市场监管效能,在市场监管协同联动和创新上取得新突破。统一市场监管是建设统一大市场的重点领域,也是基层工作的重要方面。各地方政府尤其是地方市场监管部门应结合地方工作实际,在监管的部门协同、区域联动等方面有所行动,在统一监管规则、统一监管执法等方面有所作为,通过创新市场监管的方式方法,丰富市场监管工具,提升市场监管效能。

六是围绕以优质制度供给创新招商引资,在优化营商环境上取得新突破。招商引资是地方做大经济规模、提升发展质量的重要工作抓手,良好的营商环境是招商引资取得成效的关键。建设全国统一大市场对各地招商引资政策规范性、竞争公平性提出了更高要求,明确提出要依法开展招商引资活动,防止招商引资恶性竞争行为。各地要持续优化招商引资规则与政策,从拼优惠政策转向拼营商环境,不断改进本地区政务服务和行政审批流程,提高服务和审批效率,降低本地区制度性交易成本,不断提高本地区先进要素吸引力,提升要素综合配置能力,以优质的制度供给和制度创新吸引更多优质企业投资。 (作者为中国宏观经济研究院市场与价格研究所研究员)■

深圳的一个城中村。 (视觉中国/图)

张建军买的回迁房正在变成一个烫手山芋。

回迁房,是指旧房拆迁后赔给拆迁户的房子。一些买家会从持有待拆物业的业主手中购买一定面积,相当于购买回迁指标,并代替原业主与开发商签订拆赔合同,等待拆迁时获赔相应面积的商品房。

2021年行情上行的时候,张建军花了两百多万元,在深圳龙岗区石灰围村买下一个95平方米的回迁房指标,期待能获赔同等面积的回迁房,用来自住。当时指标每平方米售价两万多元,比周边新房价格低一半。

“我们没钱买其它的房子。”张建军向南方周末记者坦言,他从外地来深圳工作十多年,在一所学校做老师,攒下的积蓄在深圳市区根本买不起商品房。

在百度贴吧“旧改吧”“城市更新吧”等论坛里,充斥着各种推介回迁房指标的广告语:“1:1赔红本商品房”“预计5年交楼享300%回报”“做拆一代,享城市红利”。

回迁房指标交易在深圳长期存在,2020年开始愈加火爆。这一年深圳房价快速上涨,政府调控收紧,购房门槛提高。回迁房指标的价格通常只有周边新房价格的一半,吸引了大量刚需购房者和投资客。

“2020年、2021年这段时间指标(交易)是最火的时候,很多外地人、深圳本地人看中了项目基本上都直接打钱过来说你帮我去找(指标)。”专门从事深圳回迁房指标交易的房地产中介蔡和畅对南方周末记者说。

然而,张建军买下石灰围的回迁房指标后,开发商爆雷,项目搁浅。2021年开始,深圳也多次发文打击回迁房指标交易。

这门由城市更新催生的灰色生意,风险陡增。

回迁房来源

张建军购买的95平方米是石灰围村一栋农民自建房的一部分,由当地村民在2015年与人合建——村民提供100平方米宅基地,合伙人出资建房。建成的房子共7层,村民分得3层,合伙人分得4层。

2020年,石灰围村被列入深圳城市更新计划,这栋7层小楼也在拆迁范围内。合伙人拿出近五百平方米建筑面积对外出售,除了张建军,还有另外4人分别买下不等面积。

这样的农民自建房在深圳大量存在,正是深圳回迁房指标的来源。

中国实行城乡二元制土地结构,城市土地属于国有性质,农村土地多为集体所有。国有土地需要经过招拍挂出让,合法建成的住宅商品房拥有70年产权,可以流通交易。集体土地上建的房子只能在集体内部进行转让。

广东合一城市更新研究院执行院长胡益红告诉南方周末记者,深圳地少人多,为了发展建设曾在1992年、2004年先后将农村集体用地全部转为国有用地。当时深圳的房地产市场已经发展起来,和日渐高涨的房价相比,失地农民获得的安置补偿标准较低,不少人采用“种房子”的方式占地。

按照1992年的《深圳经济特区征地拆迁补偿办法》,一亩(约667平方米)产值较高的一类水田补偿4000元。当年深圳房价最高的罗湖区均价已达4000元/平方米,相当于征地补偿的667倍。

2004年,一亩水田的补偿标准提高到24000元,相当于每平方米36元。此时深圳平均房价逼近6000元/平方米,是征地补偿的167倍。

结果是,深圳虽然在法律意义上已经没有集体用地,但村集体等组织依然占有全市近三分之一的城市规划建设用地。炒地皮、合作建房风行一时,大量未经合法审批的自建房在已经转为国有的土地上拔地而起。

深圳将这类物业称为“农村城市化历史遗留违法建筑”。但是客观来看,处于高速发展阶段的深圳,住房需求旺盛,这些历史违建同样发挥了作用。

深圳2015年公布的《市规划国土委关于查违和历史遗留违建处理工作的汇报》显示,截至2014年底,全市违法建筑37.30万栋,面积约为4.28亿平方米,占全市总建筑面积半壁江山。在所有违建物业中,住宅类违建达1.72亿平方米,占总体量的40.08%。

随着城市发展,深圳建设用地日渐不足。为了增加建设用地,深圳积极推动城市更新,通过拆旧建新腾挪土地。大量历史违建也在这一过程中被拆除,置换成回迁房。

形成产业链

在深圳的高房价下,历史违建物业变商品房意味着巨大的利润,由此催生出一条回迁房指标交易产业链。

这条产业链的顶端是持有大量历史违建的拆迁户。其中不少人并非本地村民,而是“种房子”的投资客。

早年间,他们四处寻找标的,与村民合作建房,或直接从本地村民手中收购楼房,手上持有大量建筑面积。平时他们将这些房子出租赚租金,一旦押中某地拆迁,又能获赔价值不菲的回迁房。

由于拆迁周期较长,他们也会选择在拆迁前出售一定面积。虽然售价比将来能拿到的回迁房市价低不少,但买家必须一次性全额付款,能够令卖家在短期内兑现现金流。

回迁房指标买卖通常由中介牵线达成。蔡和畅就曾遇到过手上持有上万平方米物业的投资客,拿着十几本测绘报告找他联系回迁房指标买家。“(投资客)中途可能用到钱,就提前套现一点。”

因为产权有问题,正规中介机构很少涉足,但深圳有专门从事回迁房指标买卖的中介群体。他们以外地人为主,熟悉深圳各片区情况,促成一单交易收1%左右的中介费。

湖南人唐韵升也是这个群体的一员,他告诉南方周末记者,自己1998年初中毕业就来了深圳,经同乡介绍开始做回迁房中介,已经干了十多年。“不是我吹牛,整个深圳片区我基本都是很熟的。”

唐韵升透露,中介往往与当地村委会熟识,获知哪里有房子可能要卖,就去和业主谈。房源信息只在网络论坛、朋友圈等隐蔽渠道发布。有些购房者直到付了钱也不知道中介公司具体叫什么名字。

回迁房指标的买家主要是两类人。一类是刚需购房者,因为资金不足冒险选择便宜的回迁房。另一类是专门的炒房客,除了通过“搏拆迁”换取高额回报,也会将回迁房指标层层转卖赚取差价。

刚需购房者更倾向于即将拆迁的项目,可以直接顶替原业主和开发商签订拆赔合同。炒房客则会考虑进度没那么快的项目,这类交易风险更高,但可以转卖指标获取短期收益,价格也更便宜。

政府确权代替民间确权

历史违建能买卖,一个重要原因在于,深圳之前在城市更新过程中对确权没有严格规定。

也就是说,一栋将被拆迁的房子即便被多次转卖,开发商只要和最后一任业主签订拆赔协议,回迁房就会照常赔付。

胡益红介绍,在长期的回迁房指标交易中,深圳形成了民间确权方式——在历史违建进行转让时,由原业主提出申请,管理集体资产的村股份公司核实认定,再由街道办进行公示。为了增强权威性,一些交易还会请律师或公证处进行见证。

但即便有律师或公证处,这种民间确权方式也没有法律效力。一旦发生纠纷,交易不受法律保护。原业主一房多卖、临时加价或反悔、售卖虚假房源等情况层出不穷。

负责深圳一处城市更新项目拆迁谈判的工作人员告诉南方周末记者,他曾遇到买家购买指标后迟迟未能拿到回迁房,闹到街道办要去上访,结果被“教育”了一番:“国家明文规定违法建筑不能买卖,你违法了还要去上访?”

胡益红表示,这类历史违建的确权类诉讼法院也不会受理。据2012年9月深圳市中级法院发布的《房地产审判工作白皮书》,深圳对此采取的是“先行政再司法”原则,如果对历史违建物业有权属争议,需要先向区规划土地监察局等行政主管部门提出确权申请。

2021年,随着回迁房指标被热炒,深圳接连出台政策打击此类交易。核心举措是对历史违建的确权问题进行明确规定。

新政要求政府部门介入确权工作,由开发商申请历史违建物业权利人核实,辖区街道办、区规划土地监察局等部门审核,报区城市更新局备案并公示。

经核实的历史违建物业权利人原则上不得变更,开发商和未经确认的物业权利人签订的搬迁补偿协议不得备案,未作确权的项目要补充开展这一工作。

对于新政颁布之前立项的城市更新项目,如果尚未确权且未确定开发商等实施主体,深圳市要求自2021年8月12日《深圳市住房和建设局、深圳市规划和自然资源局关于加强对住宅类历史遗留违法建筑交易查处的通知》发布之日起6个月内补充开展确权,防止回迁房炒作。

胡益红表示,新政主要是将确权工作前置于立项。尽管确权之前回迁房指标还可以倒卖,但买家要在立项之前就付款,等待十年左右才可能拿到回迁房,风险增加的同时投资回报降低,能够抑制炒作行为。

在实际操作中,对于上述通知发布前立项的城市更新项目,如果回迁房指标买家和开发商签订的拆赔合同已经在城市更新局备案,即便之前经过倒卖,也能获赔回迁房。

双刃剑

张建军买回迁房指标时同时签了两份合同,一份是和原业主的转让协议,另一份是和开发商的拆赔合同。

开发商是恒大集团(03333.HK)下属公司,合同就在该公司项目办公室签订。对方也知道他并不是真正的被拆迁人,但他和另外4位回迁房指标买家依然作为赔偿对象共同出现在同一份拆迁合同上。

对开发商而言,签约对象是不是真正的被拆迁人并不重要,尽快签约才要紧。《深圳经济特区城市更新条例》规定,开发商和拆迁户的签约率至少要达到95%才能启动拆迁。签约慢就意味着开发成本更高,能快速签约的回迁房指标购买者反倒更受开发商欢迎。

周兵在一家房企负责深圳城市更新项目,他告诉南方周末记者,城市更新项目的各项成本基本是固定的,竞争相对透明,拼的就是时间管理。“人家用一年就搞定了,你用三年才搞定,你这个项目的收益肯定是低于别人的。”

以过渡期安置费为例,开发商需要在征收拆迁户房屋后向对方支付一定金额作为过渡费,直至回迁房交付。张建军签的合同中,开发商每月需支付的过渡费共计一万多元。

尽管能节省时间成本,但回迁房指标交易对开发商而言是把双刃剑。周兵表示,回迁房指标交易的隐形市场也推高了拆迁户的补偿预期,导致一些拆迁户待价而沽,拖慢开发进度,城市更新的成本也随之提高。

他介绍,深圳大大小小的城市更新项目有近千个,但实际落地实施的并没有这么多。“除了拆赔标准谈不拢,权属关系复杂也是阻碍项目落地的重要因素。”

据胡益红所在的广东合一城市更新研究院统计,截至2022年12月5日,深圳已经完成立项、需要拆除重建的城市更新项目有996个,目前只有不到200个完成了确权。“这和回迁房指标交易有比较大关联,多数开发商都不知道确权给哪一手才公平合理。”

按照新政,确权先要由开发商提出申请,政府负责审核。在权属关系比较复杂的情况下,开发商首先就搞不清楚应该确权给谁。

从深圳市城市更新和土地整备局公布的数据来看,2021年以来城市更新进度明显放缓。2020年审批城市更新计划99项,涉及拆除用地面积超过一千公顷。2021年项目数量锐减至66项,涉及拆除用地面积八百余公顷。

周兵解释,这主要是因为2021年住建部发文要求城市更新行动中防止大拆大建,深圳对拆除重建类项目的审批有所收紧,确权要求也提高了立项门槛。

对于一些买了深圳回迁房指标的人来说,目前面临的问题是,不少项目由于开发商资金紧张陷入停滞。

张建军买下石灰围的回迁房指标后,开发商资金链出现问题,承诺的过渡安置费也未支付,而石灰围的城市更新项目也没有资金继续开发。

负责该项目拆迁谈判的一位工作人员告诉南方周末记者,目前该项目由央企深圳市招商平安资产管理有限责任公司接盘,但项目什么时候能有实质性进展尚不得而知。目前石灰围的签约率只有40%。

(应采访对象要求,唐韵升、周兵、张建军、蔡和畅为化名)

南方周末记者 卫琳聪

小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。

前言

TDengine 是由涛思数据开发并开源的一款高性能、分布式、支持 SQL 的时序数据库(Time-Series Database)。

除了核心的时序数据库功能外,TDengine 还提供缓存、数据订阅、流式计算等大数据平台所需要的系列功能。但是很多小伙伴出于架构的考虑,还是需要将数据导出到 Apache Flink、Apache Spark 等平台进行计算分析。

为了帮助大家对接,我们特别推出了保姆级课程,包学包会。

技术实现

2、代码实现

完整源码:
https://github.com/liuyq-617/TD-Flink

代码逻辑:

1) 自定义类 SourceFromTDengine

用途:数据源连接,数据读取

package com.taosdata.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import com.taosdata.model.Sensor; import java.sql.*; import java.util.Properties; public class SourceFromTDengine extends RichSourceFunction<Sensor> { Statement statement; private Connection connection; private String property; public SourceFromTDengine(){ super(); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String driver = "com.taosdata.jdbc.rs.RestfulDriver"; String host = "u05"; String username = "root"; String password = "taosdata"; String prop = System.getProperty("java.library.path"); Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class); LOG.info("java.library.path:{}", prop); System.out.println(prop); Class.forName( driver ); Properties properties = new Properties(); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata" , properties); statement = connection.createStatement(); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (statement != null) { statement.close(); } } @Override public void run(SourceContext<Sensor> sourceContext) throws Exception { try { String sql = "select * from tt.meters"; ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { Sensor sensor = new Sensor( resultSet.getLong(1), resultSet.getInt( "vol" ), resultSet.getFloat( "current" ), resultSet.getString( "location" ).trim()); sourceContext.collect( sensor ); } } catch (Exception e) { e.printStackTrace(); } } @Override public void cancel() { } }

2) 自定义类 SinkToTDengine

用途:数据源连接,数据写入

SinkToTDengine

package com.taosdata.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.taosdata.model.Sensor; import java.sql.*; import java.util.Properties; public class SinkToTDengine extends RichSinkFunction<Sensor> { Statement statement; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String driver = "com.taosdata.jdbc.rs.RestfulDriver"; String host = "TAOS-FQDN"; String username = "root"; String password = "taosdata"; String prop = System.getProperty("java.library.path"); System.out.println(prop); Class.forName( driver ); Properties properties = new Properties(); connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata" , properties); statement = connection.createStatement(); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (statement != null) { statement.close(); } } @Override public void invoke(Sensor sensor, Context context) throws Exception { try { String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)", sensor.getLocation(), sensor.getLocation(), sensor.getTs(), sensor.getVal(), sensor.getCurrent() ); statement.executeUpdate(sql); } catch (Exception e) { e.printStackTrace(); } } }

3) 自定义类 Sensor

用途:定义数据结构,用来接受数据

package com.taosdata.model; public class Sensor { public long ts; public int val; public float current; public String location; public Sensor() { } public Sensor(long ts, int val, float current, String location) { this.ts = ts; this.val = val; this.current = current; this.location = location; } public long getTs() { return ts; } public void setTs(long ts) { this.ts = ts; } public int getVal() { return val; } public void setVal(int val) { this.val = val; } public float getCurrent() { return current; } public void setCurrent(float current) { this.current = current; } public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } @Override public String toString() { return "Sensor{" + "ts=" + ts + ", val=" + val + ", current=" + current + ", location='" + location + '\'' + '}'; } }

4) 主程序类 ReadFromTDengine

用途:调用 Flink 进行读取和写入数据

package com.taosdata; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import com.taosdata.model.Sensor; import org.slf4j.LoggerFactory; import org.slf4j.Logger; public class ReadFromTDengine { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() ); SensorList.print(); SensorList.addSink( new com.taosdata.flink.SinkToTDengine() ); env.execute(); } }

3、简单测试 RESTful 接口

1) 环境准备:

a) Flink 安装&启动:

  • wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
  • tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
  • /usr/local/flink-1.14.3/bin/start-cluster.sh

b) TDengine Database 环境准备:

  • 创建原始数据: create database tt;create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
  • 创建目标数据库表: create database sinktest;create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));

2) 打包编译:

源码位置:
https://github.com/liuyq-617/TD-Flink

mvn clean package

3) 程序启动:

flink run target/test-flink-1.0-SNAPSHOT-dist.jar

  • 读取数据 vi log/flink-root-taskexecutor-0-xxxxx.out 查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
  • 写入数据 show sinktest.tables; 已经创建了beijing 子表select * from sinktest.beijing; 可以查询到刚插入的数据

4、使用 JNI 方式

举一反三的小伙伴此时已经猜到,只要把 JDBC URL 修改一下就可以了。

但是 Flink 每次分派作业时都在使用一个新的 ClassLoader,而我们在计算节点上就会得到“Native library already loaded in another classloader”错误。

为了避免此问题,可以将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就可以了。

  • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
  • flink run target/test-flink-1.0-SNAPSHOT.jar

5、小结

通过在项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可完成在 Flink 中对 TDengine 的读写操作。后面我们会有文章介绍 Spark 和 TDengine 的对接。

注:文中使用的是 JDBC 的 RESTful 接口,这样就不用在 Flink 的节点安装 TDengine,JNI 方式需要在 Flink 节点安装 TDengine Database 的客户端。


点击了解更多 TDengine Database 的具体细节。

小 T 导读:在对多款时序数据库进行了选型测试后,同程旅行自研的“夜鹰监控”搭载 TDengine 代替了现有存储设备,减少运维成本。本文分享了他们对建表模型的方案选择思路,接入 TDengine 后所遇到问题的解决经验以及落地效果展示。

 

同程旅行有一套自研的基础监控系统“夜鹰监控”。目前夜鹰监控使用情况为百万级别 endpoint、亿级 metric、每秒 200 万并发写入以及 2 万并发查询。其存储组件基于 RRD 存储,RRD 存储虽然拥有很好的性能,却也存在着一些问题——基于内存缓存定期写入 RRD,在机器重启后会丢失部分数据。

出现这一问题的原因是 RRD 写入为单点写入,当机器故障后无法实现自动切换,这一存储特性也导致无法展示更长时间的原始数据。 针对此问题,夜鹰监控做了很多高可用设计,但还是很难满足业务的需求,之后又进行了如下改造:

  • 引入了 ES 存储,为夜鹰监控提供 7 天内原始数据的查询,目前部署的 2 套存储。
  • RRD 提供给 API 调用,调用量在几万级 TPS。
  • ES 提供给夜鹰面板使用,保存 7 天原始数据,调用量在几百 QPS。

但随着基础监控系统接入指标的增长,目前 2 套存储系统在资源消耗方面一直在增长,同时业务对监控也提出了更多的聚合计算功能要求。基于此,我们需要寻找一个新的时序数据库来代替现有的存储系统,以减少运维成本。

在进行时序数据库选型时,实际需求主要有以下三点:

  • 性能强,可以支持千万级别并发写入、10 万级的并发读取
  • 高可用,可以横向扩展,不存在单点故障
  • 功能强,提供四则运算、最大、最小、平均、最新等聚合计算功能

通过对比 InfluxDB、TDengine、Prometheus、Druid、ClickHouse 等多款市面流行的时序数据库产品后,最终 TDengine 从中脱颖而出,能满足我们所有的选型要求。

一、基于 TDengine 的建表模型

夜鹰监控系统不仅存在系统指标数据,同时也会存在业务指标数据。前者诸如 CPU、内存、磁盘、网络一类,这类信息是可以预测的指标,其指标名是固定的,总数约为 2000 万个。后者则会通过夜鹰监控的 API 上传业务自身定义的指标,指标名是无法预测的,其特点是并发量不大却存在长尾效应,随着时间累积,一年可以达到一亿级。

而 TDengine 在创建表之前需要先规划表的结构,从上面的数据存储背景来看,如果要将海量的指标数量直接一次性扁平化全部创建,则会造成性能的下降。通过和 TDengine 技术支持人员沟通,他们给出了两个建表方案:

其一,将系统类基础指标聚合到一个超级表,一张表存放一个节点,多个指标一次性写入。这个方式的好处是表的数量可以降低到千万级别,但因为夜鹰监控的数据是单条上传的,很难做到一个表里面全部指标集齐再写入。并且不同的指标上传频率不同,如果再根据频率建不同的超级表,运维管理成本会非常高。

其二,将不同的指标建成一个一个的子表,5 千万个左右的指标汇聚成一个集群,分多个集群接入。这种方式的好处是表结构简单,但运维管理多个集群会很麻烦。不过我们也了解到涛思数据明年会发布 TDengine 3.0 版本,能支持超过上亿张表,那么这一方案就可以很好的进行数据迁移了。

最终,我们选择了第二个方案。同时为了减少搭建集群的数量,准备写个程序定期清理掉过期的子表。目前夜鹰监控的超级表结构如下。

夜鹰监控接入 TDengine 后,架构图如下。

二、接入 TDengine 之后的效果展示

在进行数据迁移时,我们先是将夜鹰监控数据转移到 Kafka 中,之后通过消费转换程序将 Kafka 的数据格式转为了 TDengine SQL 格式。 这个过程还遇到了如下三个小问题,解决思路放在这里给大家参考:

  • 连接方式问题。刚开始我们使用的是 go-connector sdk 的方式接入 TDengine,跑起来后发现,go-connector 依赖于 taos client 包以及 taos.cfg 配置文件里面的链接配置,同时因为 FQDN 的设置难以使用 VIP 负载均衡的配置方式。我们考虑到后面消费程序会部署到容器中,不宜产生过多的依赖,因此还是放弃了 go-connector 的连接方式,改为了 HTTP RESTful 的连接方式。
  • Kafka 消费数量问题。由于夜鹰监控上传到 Kafka 里面的数据是一条一个指标,再加上 200 万左右的并发,连接数很快就耗光了。通过和 TDengine 技术支持人员沟通,了解到可以改造成批量 SQL 的方式写入,最佳的实践效果是 400-600K 单个 SQL 的长度。经过计算,我们上传的指标条数大概为 5000 条左右,大小为 500K 。
  • 读取速度问题。因为每次要等到消费 5000 条数据,才会触发一次写入,这种情况也导致读取速度较慢。TDengine 技术支持人员再次给出了解决方案——使用 Taos 自己实现的 Queue,代码地址为:github.com/taosdata/go-demo-kafka/pkg/queue ,有需要的同学可以自行获取。

聚焦到实际效果上,TDengine 数据写入性能很强。原本我们的单套存储系统需要 10 多台高配机器,IO 平均 30% 最高 100% 的情况下才能写完数据;现在只需要 7 台机器,并且 CPU 消耗在 10% 左右、磁盘 IO 消耗在 1% 左右,这点非常的棒!

同时,其数据读取接入过程也很顺利。使用 RESTful 接口后,结合 TDengine 自带的强大聚合函数功能,很容易就能计算出想要的结果。

三、写在最后

在我们的项目中,TDengine 展现出了超强的性能和多元化的功能,不仅具备高效的写入性能、压缩率,其聚合函数功能也非常齐全,支持
Sum/Max/Min/Avg/Top/Last/First/Diff/Percentile 等多种函数,在架构上也设计的很合理,可以实现很好地横向扩展。同时,其自身监控也做的很不错,打造了基于 Grafana 的 TDengine 零依赖监控解决方案 TDinsight,在监控系统自身状态上展现出了很好的效果。

未来,我们也希望与 TDengine 展开更深层次的合作,在此也为其提出一些小小的建议,助力 TDengine 往更好的方向发展:

  • 官方文档还不够完善,新版的功能在文档中没有体现,很多用法缺少代码示例,个人理解起来比较晦涩难懂
  • 社区用户经验传递不是很多, 遇到一些问题时,Google 比较难以找到社区的解决案例

在接入的过程中,非常感谢 TDengine 的技术支持人员的全力支持。虽然目前 TDengine 还处于发展初期,也存在一些问题需要优化,不过其优异的性能还是给了我们一个大大的惊喜!总而言之,TDengine 是个非常不错的存储系统,相信在陶老师的带领下会发展的越来越好!

 

想了解更多 TDengine 的具体细节,欢迎大家在GitHub上查看相关源代码。

https://www.haobaihe.com

上一篇:铁矿石期货概念股(铁矿石期货技术分析)

下一篇:债转股需要交税吗(债转股如何操作转股)

相关推荐

返回顶部