11import logging
22import time
33
4+ import ccxt
45import pendulum
6+
7+ from minos .aggregate import (
8+ Event ,
9+ )
10+ from minos .common import (
11+ ModelType ,
12+ )
513from minos .cqrs import (
614 CommandService ,
715)
8- from minos .common import ModelType
9-
1016from minos .networks import (
17+ BrokerMessageV1 ,
18+ BrokerMessageV1Payload ,
1119 Request ,
1220 enroute ,
13- BrokerMessageV1 ,
14- BrokerMessageV1Payload
15- )
16- from minos .aggregate import (
17- Event
1821)
22+
1923from ..aggregates import (
2024 CryptoAggregate ,
2125)
22- import ccxt
26+
2327logger = logging .getLogger (__name__ )
2428QuoteContent = ModelType .build ("QuoteContent" , {"ticker" : str , "close" : float , "volume" : float , "when" : str })
2529
@@ -32,17 +36,18 @@ async def set_crypto_coin(self, request: Request):
3236 event : Event = await request .content ()
3337 for ticker in event ["tickers" ]:
3438 logger .warning (ticker )
35- if ticker [' flag' ] == "crypto" :
36- now = pendulum .parse (' 1975-08-27T05:00:00' )
39+ if ticker [" flag" ] == "crypto" :
40+ now = pendulum .parse (" 1975-08-27T05:00:00" )
3741 logger .warning ("Added crypto to stock" )
3842 await CryptoAggregate .add_crypto_to_stock (ticker ["ticker" ], now .to_datetime_string ())
3943
4044 def call_remote (self , ticker , _from : float ):
41- kraken = ccxt .kraken ({
42- 'enableRateLimit' : True ,
43- 'apiKey' : 'MilT+OBywOBlgMPZnsCNSSlwxOGEtGfhnNXJl0pi87MCltuKRA+IoiVc' ,
44- 'secret' : 'sGQpkVLH6sIPy7sjuro1sFiMYHC+hhN7j/bWrNr+AHdTIZNR8jb+13+ZhVevo5hyCxRijUqhUWG47Ox0SokOig==' ,
45- }
45+ kraken = ccxt .kraken (
46+ {
47+ "enableRateLimit" : True ,
48+ "apiKey" : "MilT+OBywOBlgMPZnsCNSSlwxOGEtGfhnNXJl0pi87MCltuKRA+IoiVc" ,
49+ "secret" : "sGQpkVLH6sIPy7sjuro1sFiMYHC+hhN7j/bWrNr+AHdTIZNR8jb+13+ZhVevo5hyCxRijUqhUWG47Ox0SokOig==" ,
50+ }
4651 )
4752 now = kraken .milliseconds ()
4853 timeframe = "1h"
@@ -62,11 +67,19 @@ def call_remote(self, ticker, _from: float):
6267 fetch_since = (values [- 1 ][0 ] + 3600000 ) if len (values ) else (fetch_since + timedelta )
6368 data = data + values
6469 if len (values ):
65- logger .info ("{} candles in total from {} to {}" .format (len (values ), kraken .iso8601 (values [0 ][0 ]),
66- kraken .iso8601 (values [- 1 ][0 ])))
70+ logger .info (
71+ "{} candles in total from {} to {}" .format (
72+ len (values ), kraken .iso8601 (values [0 ][0 ]), kraken .iso8601 (values [- 1 ][0 ])
73+ )
74+ )
6775 else :
6876 logger .info ("{} candles in total from {}" .format (len (values ), kraken .iso8601 (fetch_since )))
69- except (ccxt .ExchangeError , ccxt .AuthenticationError , ccxt .ExchangeNotAvailable , ccxt .RequestTimeout ) as error :
77+ except (
78+ ccxt .ExchangeError ,
79+ ccxt .AuthenticationError ,
80+ ccxt .ExchangeNotAvailable ,
81+ ccxt .RequestTimeout ,
82+ ) as error :
7083 logger .error (error )
7184 time .sleep (30 )
7285 return kraken .filter_by_since_limit (data , since , None , key = 0 )
@@ -88,7 +101,7 @@ async def get_crypto_values(self, request: Request):
88101 await CryptoAggregate .update_time_ticker (ticker ["uuid" ], result_date .to_datetime_string ())
89102 when = result_date .to_datetime_string ()
90103 message = BrokerMessageV1 (
91- "QuotesChannel" , BrokerMessageV1Payload ( QuoteContent ( ticker [ "ticker" ], result [ 4 ],
92- result [5 ], when ))
104+ "QuotesChannel" ,
105+ BrokerMessageV1Payload ( QuoteContent ( ticker [ "ticker" ], result [ 4 ], result [5 ], when )),
93106 )
94107 await self .broker_publisher .send (message )
0 commit comments