-
Notifications
You must be signed in to change notification settings - Fork 220
Expand file tree
/
Copy pathDownloadCount.hs
More file actions
267 lines (229 loc) · 10.3 KB
/
DownloadCount.hs
File metadata and controls
267 lines (229 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
{-# LANGUAGE RankNTypes, NamedFieldPuns, RecordWildCards #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# OPTIONS_GHC -Wno-orphans #-}
-- | Download counts
--
-- We maintain
--
-- 1. In-memory (ACID): today's download counts per package version
--
-- 2. In-memory (cache): total download count over the last 30 days per package
-- (across all versions). This is computed once per day from the on-disk
-- statistics (3).
--
-- 3. On-disk: total download per package per version per day. These are stored
-- in safe-copy format, one file per package; this allows to quickly load
-- the statistics for a given package to compute custom reports.
--
-- 4. On-disk: total download per package per version per day, stored as a single
-- CSV file that we append (1) to once per day. Strictly speaking this is
-- redundant, as this information is also stored in (3).
module Distribution.Server.Features.DownloadCount (
DownloadFeature(..)
, DownloadResource(..)
, initDownloadFeature
, RecentDownloads
, TotalDownloads
) where
import Distribution.Server.Framework
import Distribution.Server.Framework.BackupRestore
import Distribution.Server.Features.DownloadCount.State
import Distribution.Server.Features.DownloadCount.Backup
import Distribution.Server.Features.Core
import Distribution.Server.Features.Users
import Distribution.Package
import Distribution.Server.Util.CountingMap (cmFromCSV, cmToList)
import Data.Time.Calendar (Day, addDays)
import Data.Time.Clock (getCurrentTime, utctDay)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import GHC.Generics (Generic)
import Data.Aeson (ToJSON)
import qualified Data.Aeson as Aeson
import Data.List (sortBy)
import Data.Function (on)
data DownloadFeature = DownloadFeature {
downloadFeatureInterface :: HackageFeature
, downloadResource :: DownloadResource
, totalPackageDownloads :: forall m. MonadIO m => m TotalDownloads
, recentPackageDownloads :: forall m. MonadIO m => m RecentDownloads
}
instance IsHackageFeature DownloadFeature where
getFeatureInterface = downloadFeatureInterface
data DownloadResource = DownloadResource {
topDownloads :: Resource
}
data PackageDownloads = PackageDownloads {
packageName :: !String
, downloads :: !Int
}
deriving stock (Eq, Ord, Generic)
deriving anyclass (ToJSON)
initDownloadFeature :: ServerEnv
-> IO (CoreFeature -> UserFeature -> IO DownloadFeature)
initDownloadFeature serverEnv@ServerEnv{serverStateDir} = do
inMemState <- inMemStateComponent serverStateDir
let onDiskState = onDiskStateComponent serverStateDir
(recentDownloads,
totalDownloads) <- computeRecentAndTotalDownloads =<< getState onDiskState
recentCache <- newMemStateWHNF recentDownloads
totalsCache <- newMemStateWHNF totalDownloads
downChan <- newChan
return $ \core users -> do
let feature = downloadFeature core users serverEnv inMemState
onDiskState totalsCache recentCache downChan
registerHook (packageDownloadHook core) (writeChan downChan)
return feature
inMemStateComponent :: FilePath -> IO (StateComponent AcidState InMemStats)
inMemStateComponent stateDir = do
initSt <- initInMemStats <$> getToday
st <- openLocalStateFrom (dcPath stateDir </> "inmem") initSt
return StateComponent {
stateDesc = "Today's download counts"
, stateHandle = st
, getState = query st GetInMemStats
, putState = update st . ReplaceInMemStats
, backupState = \_ -> inMemBackup
, restoreState = inMemRestore
, resetState = inMemStateComponent
}
mkOnDiskStatePath :: FilePath -> FilePath
mkOnDiskStatePath stateDir = dcPath stateDir </> "ondisk"
onDiskStateComponent :: FilePath -> StateComponent OnDiskState OnDiskStats
onDiskStateComponent stateDir = StateComponent {
stateDesc = "All time download counts"
, stateHandle = OnDiskState
, getState = readOnDiskStatsLazily (mkOnDiskStatePath stateDir)
, putState = \onDiskStats -> do
--TODO: we should extend the backup system so we can
-- write these files out incrementally
writeOnDiskStats (mkOnDiskStatePath stateDir) onDiskStats
reconstructLog (dcPath stateDir) onDiskStats
, backupState = \_ -> onDiskBackup
, restoreState = onDiskRestore
, resetState = return . onDiskStateComponent
}
downloadFeature :: CoreFeature
-> UserFeature
-> ServerEnv
-> StateComponent AcidState InMemStats
-> StateComponent OnDiskState OnDiskStats
-> MemState TotalDownloads
-> MemState RecentDownloads
-> Chan PackageId
-> DownloadFeature
downloadFeature CoreFeature{}
UserFeature{..}
ServerEnv{serverStateDir}
inMemState
onDiskState
totalDownloadsCache
recentDownloadsCache
downloadStream
= DownloadFeature{..}
where
downloadFeatureInterface = (emptyHackageFeature "download") {
featureResources = [ topDownloads downloadResource
, downloadCSV
]
, featurePostInit = void $ forkIO registerDownloads
, featureState = [ abstractAcidStateComponent inMemState
, abstractOnDiskStateComponent onDiskState
]
, featureCaches = [
CacheComponent {
cacheDesc = "recent package downloads cache",
getCacheMemSize = memSize <$> readMemState recentDownloadsCache
},
CacheComponent {
cacheDesc = "total package downloads cache",
getCacheMemSize = memSize <$> readMemState totalDownloadsCache
}
]
}
recentPackageDownloads :: MonadIO m => m RecentDownloads
recentPackageDownloads = readMemState recentDownloadsCache
totalPackageDownloads :: MonadIO m => m TotalDownloads
totalPackageDownloads = readMemState totalDownloadsCache
registerDownloads = forever $ do
pkg <- readChan downloadStream
today <- getToday
today' <- query (stateHandle inMemState) RecordedToday
--TODO: do this asyncronously rather than blocking this request
when (today /= today') $ do
-- For the first download each day we reset the in-memory stats and..
inMemStats <- getState inMemState
putState inMemState $ initInMemStats today
-- we can discard the large eventlog by writing a small checkpoint
createCheckpoint (stateHandle inMemState)
-- Write yesterday's downloads to the log
appendToLog (dcPath serverStateDir) inMemStats
let onDiskStateFile = mkOnDiskStatePath serverStateDir
-- Update the on-disk statistics and recompute recent downloads
onDiskStats' <- updateHistory inMemStats <$> readOnDiskStatsLazily onDiskStateFile
writeOnDiskStats onDiskStateFile onDiskStats'
--TODO: this is still stupid, writing it out only to read it back
-- we should be able to update the in memory ones incrementally
(recentDownloads,
totalDownloads) <- computeRecentAndTotalDownloads =<< readOnDiskStatsLazily onDiskStateFile
writeMemState recentDownloadsCache recentDownloads
writeMemState totalDownloadsCache totalDownloads
updateState inMemState $ RegisterDownload pkg
downloadResource = DownloadResource {
topDownloads = (resourceAt "/packages/top.:format")
{ resourceDesc = [ (GET, "Get top downloaded packages for the last 30 days")]
, resourceGet = [ ("json", serveDownloadTopJSON) ]
}
}
serveDownloadTopJSON :: DynamicPath -> ServerPartE Response
serveDownloadTopJSON _ = do
pkgList <- sortedPackages <$> recentPackageDownloads
pure $ toResponse $ Aeson.toJSON pkgList
sortedPackages :: RecentDownloads -> [PackageDownloads]
sortedPackages = fmap (\(p, c) -> PackageDownloads (unPackageName p) c) . sortBy (flip compare `on` snd) . cmToList
downloadCSV = (resourceAt "/packages/downloads.:format") {
resourceDesc = [ (GET, "Get download counts")
, (PUT, "Upload download counts (for import)")
]
, resourceGet = [ ("csv", getDownloadCounts) ]
, resourcePut = [ ("csv", putDownloadCounts) ]
}
getDownloadCounts :: DynamicPath -> ServerPartE Response
getDownloadCounts _path = do
guardAuthorised_ [InGroup adminGroup]
onDiskStats <- liftIO $ getState onDiskState
let [BackupByteString _ bs] = onDiskBackup onDiskStats
return $ toResponse bs
putDownloadCounts :: DynamicPath -> ServerPartE Response
putDownloadCounts _path = do
guardAuthorised_ [InGroup adminGroup]
fileContents <- expectCSV
csv <- importCSV "PUT input" fileContents
onDiskStats <- cmFromCSV csv
liftIO $ do
--TODO: if the onDiskStats are large, can we stream it?
writeOnDiskStats (mkOnDiskStatePath serverStateDir) onDiskStats
(recentDownloads,
totalDownloads) <- computeRecentAndTotalDownloads onDiskStats
writeMemState recentDownloadsCache recentDownloads
writeMemState totalDownloadsCache totalDownloads
reconstructLog (dcPath serverStateDir) onDiskStats
ok $ toResponse $ "Imported " ++ show (length csv) ++ " records\n"
{------------------------------------------------------------------------------
Auxiliary
------------------------------------------------------------------------------}
getToday :: IO Day
getToday = utctDay <$> getCurrentTime
getRecentDayRange :: Integer -> IO (Day, Day)
getRecentDayRange numDays = do
lastDay <- getToday
let firstDay = addDays (negate numDays) lastDay
return (firstDay, lastDay)
computeRecentAndTotalDownloads :: OnDiskStats -> IO (RecentDownloads, TotalDownloads)
computeRecentAndTotalDownloads onDiskStats = do
recentRange <- getRecentDayRange 30
return $ initRecentAndTotalDownloads recentRange onDiskStats
dcPath :: FilePath -> FilePath
dcPath stateDir = stateDir </> "db" </> "DownloadCount"