@@ -31,7 +31,6 @@ def __init__(self, cmd):
3131 self .logger = logging .getLogger (__name__ )
3232
3333 self .logger = logging .getLogger (__name__ )
34- self .ncds_client = None
3534
3635 def main (self ):
3736 self .security_cfg = load_auth_properties (self .auth_props_file )
@@ -41,42 +40,47 @@ def main(self):
4140 cmd_to_validate = ValidateInput (self .cmd )
4241 cmd_to_validate .validate_user_input ()
4342
43+ ncds_client = None
44+
4445 try :
45- self .ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
4646 if self .test_option == "TOP" :
4747 self .top_cmd ()
4848
4949 elif self .test_option == "SCHEMA" :
50+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
5051 # Dump the Schema for the self.topic
51- schema = self . ncds_client .get_schema_for_topic (self .topic )
52+ schema = ncds_client .get_schema_for_topic (self .topic )
5253 print ("Schema for the Topic:" + self .topic )
5354 if schema :
5455 print (schema )
5556 else :
5657 print (" Access to topic is not granted " )
5758
5859 elif self .test_option == "GETMSG" :
60+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
5961 print ("Finding the message" )
6062 if "auto.offset.reset" in self .kafka_cfg and self .kafka_cfg ["auto.offset.reset" ] == "latest" :
6163 print ("Need to get run GETMSG with 'earliest' offset" )
6264 sys .exit (0 )
63- msg = self . ncds_client .get_sample_messages (
65+ msg = ncds_client .get_sample_messages (
6466 self .topic , self .message_name , False )
6567 if msg is not None :
6668 print (msg )
6769 else :
6870 print ("Message Not Found ..." )
6971
7072 elif self .test_option == "GETALLMSGS" :
73+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
7174 print ("Finding the messages" )
7275 if "auto.offset.reset" in self .kafka_cfg and self .kafka_cfg ["auto.offset.reset" ] == "latest" :
7376 print ("Need to run GETMSG with 'earliest' offset" )
7477 sys .exit (0 )
75- self . ncds_client .get_sample_messages (
78+ ncds_client .get_sample_messages (
7679 self .topic , self .message_name , True )
7780
7881 elif self .test_option == "TOPICS" :
79- self .topics = self .ncds_client .list_topics_for_client ()
82+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
83+ self .topics = ncds_client .list_topics_for_client ()
8084 print ("List of streams available on Nasdaq Cloud Data Service:" )
8185 for self .topic in self .topics :
8286 print (self .topic )
@@ -90,9 +94,10 @@ def main(self):
9094 logging .exception (e )
9195
9296 def top_cmd (self ):
97+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
9398 numOfRecords = max (10 , min (int (self .num_top_messages ), 999 ))
94- records = self . ncds_client .top_messages (
95- self .topic ) if not self .timestamp else self . ncds_client .top_messages (self .topic , self .timestamp )
99+ records = ncds_client .top_messages (
100+ self .topic ) if not self .timestamp else ncds_client .top_messages (self .topic , self .timestamp )
96101 print ("Top " + str (numOfRecords ) +
97102 " Records for the Topic: " + self .topic )
98103 if records :
@@ -108,12 +113,13 @@ def top_cmd(self):
108113 print ("Access to topic is not granted" )
109114
110115 def cont_stream_cmd (self ):
111- consumer = self .ncds_client .ncds_kafka_consumer (
112- self .topic ) if not self .timestamp else self .ncds_client .ncds_kafka_consumer (self .topic , self .timestamp )
116+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
117+ consumer = ncds_client .ncds_kafka_consumer (
118+ self .topic ) if not self .timestamp else ncds_client .ncds_kafka_consumer (self .topic , self .timestamp )
113119
114120 try :
115121 while True :
116- message = consumer .poll (self .kafka_cfg [self .kafka_config_loader .TIMEOUT ])
122+ message = consumer .poll (self .kafka_cfg [self .kafka_config_loader .NUM_MESSAGES ])
117123 if message is None :
118124 print (f"No Records Found for the Topic: { self .topic } " )
119125 else :
@@ -134,8 +140,9 @@ def filter_stream_cmd(self):
134140 if self .msgnames is not None :
135141 msgname_set = set (self .msgnames .split ("," ))
136142
137- consumer = self .ncds_client .ncds_kafka_consumer (
138- self .topic ) if not self .timestamp else self .ncds_client .ncds_kafka_consumer (self .topic , self .timestamp )
143+ ncds_client = NCDSClient (self .security_cfg , self .kafka_cfg )
144+ consumer = ncds_client .ncds_kafka_consumer (
145+ self .topic ) if not self .timestamp else ncds_client .ncds_kafka_consumer (self .topic , self .timestamp )
139146
140147 try :
141148 while True :
0 commit comments