I have the following dask dataframe created from Castra:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
Yielding:
                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e
What I'm trying to do is:
- Group by 
user_idandts - Resample it over a 3-hour period
 - In the resampling step, any merged rows should concatenate the texts
 
Example output:
                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e
I tried the following:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
And got the following error:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
I tried passing set_index('ts') in the pipe but it doesn't seem to be an attribute of Series.
Any ideas on how to achieve this?
TL;DR
If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.
I set the index (in the to_df() function) as follows: 
df.set_index('ts',drop=False,inplace=True)
And have:
  with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
     batches = partition_all(batch_size, f)
     df, frames = peek(map(self.to_df, batches))
     castra = Castra(S.CASTRA, template=df, categories=categories)
     castra.extend_sequence(frames, freq='3h')
Here are the resulting dtypes:
ts                datetime64[ns]
text                      object
user_id                  float64